#![cfg(feature = "std")]
#![cfg_attr(docsrs, doc(cfg(feature = "std")))]
use super::{
with_buffers, Address, ConnectionType, IpAddr, MultiStreamAddress, MultiStreamWebRtcConnection,
PlatformRef, SubstreamDirection,
};
use alloc::{borrow::Cow, sync::Arc};
use core::{pin::Pin, str, time::Duration};
use futures_util::{future, FutureExt as _};
use smoldot::libp2p::websocket;
use std::{
io,
net::SocketAddr,
thread,
time::{Instant, UNIX_EPOCH},
};
pub struct DefaultPlatform {
client_name: String,
client_version: String,
tasks_executor: Arc<smol::Executor<'static>>,
shutdown_notify: event_listener::Event,
}
impl DefaultPlatform {
pub fn new(client_name: String, client_version: String) -> Arc<Self> {
let tasks_executor = Arc::new(smol::Executor::new());
let shutdown_notify = event_listener::Event::new();
for n in 0..thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
{
let on_shutdown = shutdown_notify.listen();
let tasks_executor = tasks_executor.clone();
let spawn_result = thread::Builder::new()
.name(format!("smoldot-light-{}", n))
.spawn(move || smol::block_on(tasks_executor.run(on_shutdown)));
if let Err(err) = spawn_result {
panic!("Failed to spawn execution thread: {err}");
}
}
Arc::new(DefaultPlatform {
client_name,
client_version,
tasks_executor,
shutdown_notify,
})
}
}
impl PlatformRef for Arc<DefaultPlatform> {
type Delay = futures_util::future::Map<smol::Timer, fn(Instant) -> ()>;
type Instant = Instant;
type MultiStream = std::convert::Infallible; type Stream = Stream;
type StreamConnectFuture = future::Ready<Self::Stream>;
type MultiStreamConnectFuture = future::Pending<MultiStreamWebRtcConnection<Self::MultiStream>>;
type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a, Instant>;
type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
type StreamErrorRef<'a> = &'a io::Error;
type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
fn now_from_unix_epoch(&self) -> Duration {
UNIX_EPOCH.elapsed().unwrap()
}
fn now(&self) -> Self::Instant {
Instant::now()
}
fn fill_random_bytes(&self, buffer: &mut [u8]) {
rand::RngCore::fill_bytes(&mut rand::thread_rng(), buffer);
}
fn sleep(&self, duration: Duration) -> Self::Delay {
smol::Timer::after(duration).map(|_| ())
}
fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
smol::Timer::at(when).map(|_| ())
}
fn spawn_task(
&self,
_task_name: Cow<str>,
task: impl future::Future<Output = ()> + Send + 'static,
) {
self.tasks_executor.spawn(task).detach();
}
fn client_name(&self) -> Cow<str> {
Cow::Borrowed(&self.client_name)
}
fn client_version(&self) -> Cow<str> {
Cow::Borrowed(&self.client_version)
}
fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
matches!(
connection_type,
ConnectionType::TcpIpv4
| ConnectionType::TcpIpv6
| ConnectionType::TcpDns
| ConnectionType::WebSocketIpv4 { .. }
| ConnectionType::WebSocketIpv6 { .. }
| ConnectionType::WebSocketDns { secure: false, .. }
)
}
fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
let (tcp_socket_addr, host_if_websocket): (
either::Either<SocketAddr, (String, u16)>,
Option<String>,
) = match multiaddr {
Address::TcpDns { hostname, port } => {
(either::Right((hostname.to_string(), port)), None)
}
Address::TcpIp {
ip: IpAddr::V4(ip),
port,
} => (either::Left(SocketAddr::from((ip, port))), None),
Address::TcpIp {
ip: IpAddr::V6(ip),
port,
} => (either::Left(SocketAddr::from((ip, port))), None),
Address::WebSocketDns {
hostname,
port,
secure: false,
} => (
either::Right((hostname.to_string(), port)),
Some(format!("{}:{}", hostname, port)),
),
Address::WebSocketIp {
ip: IpAddr::V4(ip),
port,
} => {
let addr = SocketAddr::from((ip, port));
(either::Left(addr), Some(addr.to_string()))
}
Address::WebSocketIp {
ip: IpAddr::V6(ip),
port,
} => {
let addr = SocketAddr::from((ip, port));
(either::Left(addr), Some(addr.to_string()))
}
_ => unreachable!(),
};
let socket_future = async {
let tcp_socket = match tcp_socket_addr {
either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await,
either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await,
};
if let Ok(tcp_socket) = &tcp_socket {
let _ = tcp_socket.set_nodelay(true);
}
match (tcp_socket, host_if_websocket) {
(Ok(tcp_socket), Some(host)) => {
websocket::websocket_client_handshake(websocket::Config {
tcp_socket,
host: &host,
url: "/",
})
.await
.map(TcpOrWs::Right)
}
(Ok(tcp_socket), None) => Ok(TcpOrWs::Left(tcp_socket)),
(Err(err), _) => Err(err),
}
};
future::ready(Stream(with_buffers::WithBuffers::new(Box::pin(
socket_future,
))))
}
fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture {
panic!()
}
fn open_out_substream(&self, c: &mut Self::MultiStream) {
match *c {}
}
fn next_substream(&self, c: &'_ mut Self::MultiStream) -> Self::NextSubstreamFuture<'_> {
match *c {}
}
fn read_write_access<'a>(
&self,
stream: Pin<&'a mut Self::Stream>,
) -> Result<Self::ReadWriteAccess<'a>, &'a io::Error> {
let stream = stream.project();
stream.0.read_write_access(Instant::now())
}
fn wait_read_write_again<'a>(
&self,
stream: Pin<&'a mut Self::Stream>,
) -> Self::StreamUpdateFuture<'a> {
let stream = stream.project();
Box::pin(stream.0.wait_read_write_again(|when| async move {
smol::Timer::at(when).await;
}))
}
}
impl Drop for DefaultPlatform {
fn drop(&mut self) {
self.shutdown_notify.notify(usize::max_value());
}
}
#[pin_project::pin_project]
pub struct Stream(
#[pin]
with_buffers::WithBuffers<
future::BoxFuture<'static, Result<TcpOrWs, io::Error>>,
TcpOrWs,
Instant,
>,
);
type TcpOrWs = future::Either<smol::net::TcpStream, websocket::Connection<smol::net::TcpStream>>;