use async_trait::async_trait;
use asynchronous_codec::Framed;
use futures::future::{FutureExt, RemoteHandle};
use futures::stream;
use futures::task::{Spawn, SpawnError};
use futures::{AsyncRead, AsyncWrite, Future};
use std::borrow::Cow;
use std::fmt::Debug;
use std::io::{self, Result as IoResult};
use std::net;
use tor_general_addr::unix;
use web_time_compat::{Duration, Instant, InstantExt, SystemTime, SystemTimeExt};
#[cfg(feature = "tls-server")]
use tor_cert_x509::TlsKeyAndCert;
pub trait Runtime:
Sync
+ Send
+ Spawn
+ Blocking
+ Clone
+ SleepProvider
+ CoarseTimeProvider
+ NetStreamProvider<net::SocketAddr>
+ NetStreamProvider<unix::SocketAddr>
+ TlsProvider<<Self as NetStreamProvider<net::SocketAddr>>::Stream>
+ UdpProvider
+ Debug
+ 'static
{
}
impl<T> Runtime for T where
T: Sync
+ Send
+ Spawn
+ Blocking
+ Clone
+ SleepProvider
+ CoarseTimeProvider
+ NetStreamProvider<net::SocketAddr>
+ NetStreamProvider<unix::SocketAddr>
+ TlsProvider<<Self as NetStreamProvider<net::SocketAddr>>::Stream>
+ UdpProvider
+ Debug
+ 'static
{
}
pub trait ToplevelRuntime: Runtime + ToplevelBlockOn {}
impl<T: Runtime + ToplevelBlockOn> ToplevelRuntime for T {}
pub trait SleepProvider: Clone + Send + Sync + 'static {
type SleepFuture: Future<Output = ()> + Send + 'static;
#[must_use = "sleep() returns a future, which does nothing unless used"]
fn sleep(&self, duration: Duration) -> Self::SleepFuture;
fn now(&self) -> Instant {
Instant::get()
}
fn wallclock(&self) -> SystemTime {
SystemTime::get()
}
fn block_advance<T: Into<String>>(&self, _reason: T) {}
fn release_advance<T: Into<String>>(&self, _reason: T) {}
fn allow_one_advance(&self, _dur: Duration) {}
}
pub trait CoarseTimeProvider: Clone + Send + Sync + 'static {
fn now_coarse(&self) -> crate::coarse_time::CoarseInstant;
}
pub trait ToplevelBlockOn: Clone + Send + Sync + 'static {
fn block_on<F: Future>(&self, future: F) -> F::Output;
}
pub trait Blocking: Clone + Send + Sync + 'static {
fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static;
type ThreadHandle<T: Send + 'static>: Future<Output = T>;
fn reenter_block_on<F>(&self, future: F) -> F::Output
where
F: Future,
F::Output: Send + 'static;
fn blocking_io<F, T>(&self, f: F) -> impl Future<Output = T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
self.spawn_blocking(f)
}
}
pub trait SpawnExt: Spawn {
#[track_caller]
fn spawn<Fut>(&self, future: Fut) -> Result<(), SpawnError>
where
Fut: Future<Output = ()> + Send + 'static,
{
use tracing::Instrument as _;
self.spawn_obj(Box::new(future.in_current_span()).into())
}
#[track_caller]
fn spawn_with_handle<Fut>(
&self,
future: Fut,
) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
where
Fut: Future + Send + 'static,
<Fut as Future>::Output: Send,
{
let (future, handle) = future.remote_handle();
self.spawn(future)?;
Ok(handle)
}
}
impl<T: Spawn> SpawnExt for T {}
pub trait StreamOps {
fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
Err(UnsupportedStreamOp {
op: "set_tcp_notsent_lowat",
reason: "unsupported object type",
}
.into())
}
fn new_handle(&self) -> Box<dyn StreamOps + Send + Unpin> {
Box::new(NoOpStreamOpsHandle)
}
}
#[derive(Copy, Clone, Debug, Default)]
#[non_exhaustive]
pub struct NoOpStreamOpsHandle;
impl StreamOps for NoOpStreamOpsHandle {
fn new_handle(&self) -> Box<dyn StreamOps + Send + Unpin> {
Box::new(*self)
}
}
impl<T: StreamOps, C> StreamOps for Framed<T, C> {
fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
let inner: &T = self;
inner.set_tcp_notsent_lowat(notsent_lowat)
}
fn new_handle(&self) -> Box<dyn StreamOps + Send + Unpin> {
let inner: &T = self;
inner.new_handle()
}
}
#[derive(Clone, Debug, thiserror::Error)]
#[error("Operation {op} not supported: {reason}")]
pub struct UnsupportedStreamOp {
op: &'static str,
reason: &'static str,
}
impl UnsupportedStreamOp {
pub fn new(op: &'static str, reason: &'static str) -> Self {
Self { op, reason }
}
}
impl From<UnsupportedStreamOp> for io::Error {
fn from(value: UnsupportedStreamOp) -> Self {
io::Error::new(io::ErrorKind::Unsupported, value)
}
}
#[async_trait]
pub trait NetStreamProvider<ADDR = net::SocketAddr>: Clone + Send + Sync + 'static {
type Stream: AsyncRead + AsyncWrite + StreamOps + Send + Sync + Unpin + 'static;
type Listener: NetStreamListener<ADDR, Stream = Self::Stream> + Send + Sync + Unpin + 'static;
async fn connect(&self, addr: &ADDR) -> IoResult<Self::Stream>;
async fn listen(&self, addr: &ADDR) -> IoResult<Self::Listener>;
}
pub trait NetStreamListener<ADDR = net::SocketAddr> {
type Stream: AsyncRead + AsyncWrite + StreamOps + Send + Sync + Unpin + 'static;
type Incoming: stream::Stream<Item = IoResult<(Self::Stream, ADDR)>>
+ Send
+ Sync
+ Unpin
+ 'static;
fn incoming(self) -> Self::Incoming;
fn local_addr(&self) -> IoResult<ADDR>;
}
#[async_trait]
pub trait UdpProvider: Clone + Send + Sync + 'static {
type UdpSocket: UdpSocket + Send + Sync + Unpin + 'static;
async fn bind(&self, addr: &net::SocketAddr) -> IoResult<Self::UdpSocket>;
}
#[async_trait]
pub trait UdpSocket {
async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, net::SocketAddr)>;
async fn send(&self, buf: &[u8], target: &net::SocketAddr) -> IoResult<usize>;
fn local_addr(&self) -> IoResult<net::SocketAddr>;
}
pub trait CertifiedConn {
fn export_keying_material(
&self,
len: usize,
label: &[u8],
context: Option<&[u8]>,
) -> IoResult<Vec<u8>>;
fn peer_certificate(&self) -> IoResult<Option<Cow<'_, [u8]>>>;
fn own_certificate(&self) -> IoResult<Option<Cow<'_, [u8]>>>;
}
#[async_trait]
pub trait TlsConnector<S> {
type Conn: AsyncRead + AsyncWrite + CertifiedConn + Unpin + Send + 'static;
async fn negotiate_unvalidated(&self, stream: S, sni_hostname: &str) -> IoResult<Self::Conn>;
}
pub trait TlsProvider<S: StreamOps>: Clone + Send + Sync + 'static {
type Connector: TlsConnector<S, Conn = Self::TlsStream> + Send + Sync + Unpin;
type TlsStream: AsyncRead + AsyncWrite + StreamOps + CertifiedConn + Unpin + Send + 'static;
type Acceptor: TlsConnector<S, Conn = Self::TlsServerStream> + Send + Sync + Unpin;
type TlsServerStream: AsyncRead
+ AsyncWrite
+ StreamOps
+ CertifiedConn
+ Unpin
+ Send
+ 'static;
fn tls_connector(&self) -> Self::Connector;
fn tls_acceptor(&self, settings: TlsAcceptorSettings) -> IoResult<Self::Acceptor>;
fn supports_keying_material_export(&self) -> bool;
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct TlsAcceptorSettings {
#[cfg(feature = "tls-server")]
pub(crate) identity: TlsKeyAndCert,
#[cfg(not(feature = "tls-server"))]
unconstructable: void::Void,
}
impl TlsAcceptorSettings {
#[allow(clippy::unnecessary_wraps)]
#[cfg(feature = "tls-server")]
pub fn new(identity: TlsKeyAndCert) -> std::io::Result<Self> {
Ok(Self { identity })
}
pub fn cert_der(&self) -> &[u8] {
#[cfg(not(feature = "tls-server"))]
{
void::unreachable(self.unconstructable);
}
#[cfg(feature = "tls-server")]
self.identity.certificates_der()[0]
}
}
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
#[error("This TlsProvider does not support running as a server")]
pub struct TlsServerUnsupported {}
impl From<TlsServerUnsupported> for io::Error {
fn from(value: TlsServerUnsupported) -> Self {
io::Error::new(io::ErrorKind::Unsupported, value)
}
}