use crate::{
log,
platform::{PlatformRef, SubstreamDirection},
};
use alloc::{boxed::Box, string::String};
use core::{pin, time::Duration};
use futures_lite::FutureExt as _;
use futures_util::{StreamExt as _, future, stream::FuturesUnordered};
use smoldot::{libp2p::collection::SubstreamFate, network::service};
pub(super) async fn single_stream_connection_task<TPlat: PlatformRef>(
connection: TPlat::Stream,
address_string: String,
platform: TPlat,
connection_id: service::ConnectionId,
connection_task: service::SingleStreamConnectionTask<TPlat::Instant>,
coordinator_to_connection: async_channel::Receiver<service::CoordinatorToConnection>,
connection_to_coordinator: async_channel::Sender<(
service::ConnectionId,
service::ConnectionToCoordinator,
)>,
) {
let mut coordinator_to_connection = pin::pin!(coordinator_to_connection);
let mut socket = pin::pin!(connection);
let mut message_sending = pin::pin!(None);
let mut connection_task = Some(connection_task);
loop {
futures_lite::future::yield_now().await;
if message_sending.is_none() && connection_task.is_some() {
let mut task = connection_task.take().unwrap();
match platform.read_write_access(socket.as_mut()) {
Ok(mut socket_read_write) => {
let read_bytes_before = socket_read_write.read_bytes;
let written_bytes_before = socket_read_write.write_bytes_queued;
let write_closed = socket_read_write.write_bytes_queueable.is_none();
task.read_write(&mut *socket_read_write);
if socket_read_write.read_bytes != read_bytes_before
|| socket_read_write.write_bytes_queued != written_bytes_before
|| (!write_closed && socket_read_write.write_bytes_queueable.is_none())
{
log!(
&platform,
Trace,
"connections",
"connection-activity",
address = address_string,
read = socket_read_write.read_bytes - read_bytes_before,
written = socket_read_write.write_bytes_queued - written_bytes_before,
wake_up_after = ?socket_read_write.wake_up_after.as_ref().map(|w| {
if *w > socket_read_write.now {
w.clone() - socket_read_write.now.clone()
} else {
Duration::new(0, 0)
}
}),
write_closed = socket_read_write.write_bytes_queueable.is_none(),
);
}
}
Err(err) => {
if !task.is_reset_called() {
log!(
&platform,
Trace,
"connections",
"reset",
address = address_string,
reason = ?err
);
task.reset();
}
}
}
let (task_update, message) = task.pull_message_to_coordinator();
connection_task = task_update;
debug_assert!(message_sending.is_none());
if let Some(message) = message {
message_sending.set(Some(
connection_to_coordinator.send((connection_id, message)),
));
}
}
enum WakeUpReason {
CoordinatorMessage(service::CoordinatorToConnection),
CoordinatorDead,
SocketEvent,
MessageSent,
}
let wake_up_reason: WakeUpReason = {
if connection_task.is_none() && message_sending.is_none() {
log!(
&platform,
Trace,
"connections",
"shutdown",
address = address_string
);
return;
}
let coordinator_message = async {
match coordinator_to_connection.next().await {
Some(msg) => WakeUpReason::CoordinatorMessage(msg),
None => WakeUpReason::CoordinatorDead,
}
};
let socket_event = {
let fut = if message_sending.as_ref().as_pin_ref().is_none() {
Some(platform.wait_read_write_again(socket.as_mut()))
} else {
None
};
async {
if let Some(fut) = fut {
fut.await;
WakeUpReason::SocketEvent
} else {
future::pending().await
}
}
};
let message_sent = async {
let result = if let Some(message_sending) = message_sending.as_mut().as_pin_mut() {
message_sending.await
} else {
future::pending().await
};
message_sending.set(None);
if result.is_ok() {
WakeUpReason::MessageSent
} else {
WakeUpReason::CoordinatorDead
}
};
coordinator_message.or(socket_event).or(message_sent).await
};
match wake_up_reason {
WakeUpReason::CoordinatorMessage(message) => {
let connection_task = connection_task.as_mut().unwrap_or_else(|| unreachable!());
connection_task.inject_coordinator_message(&platform.now(), message);
}
WakeUpReason::CoordinatorDead => {
log!(
&platform,
Trace,
"connections",
"shutdown",
address = address_string
);
return;
}
WakeUpReason::SocketEvent => {}
WakeUpReason::MessageSent => {}
}
}
}
pub(super) async fn webrtc_multi_stream_connection_task<TPlat: PlatformRef>(
mut connection: TPlat::MultiStream,
address_string: String,
platform: TPlat,
connection_id: service::ConnectionId,
mut connection_task: service::MultiStreamConnectionTask<TPlat::Instant, usize>,
coordinator_to_connection: async_channel::Receiver<service::CoordinatorToConnection>,
connection_to_coordinator: async_channel::Sender<(
service::ConnectionId,
service::ConnectionToCoordinator,
)>,
) {
let mut message_sending = pin::pin!(None);
let mut pending_opening_out_substreams = 0;
let mut when_substreams_rw_ready = FuturesUnordered::<
pin::Pin<Box<dyn Future<Output = (pin::Pin<Box<TPlat::Stream>>, usize)> + Send>>,
>::new();
let mut next_substream_id = 0;
let mut coordinator_to_connection = pin::pin!(coordinator_to_connection);
loop {
for _ in 0..connection_task
.desired_outbound_substreams()
.saturating_sub(pending_opening_out_substreams)
{
log!(
&platform,
Trace,
"connections",
"substream-open-start",
address = address_string
);
platform.open_out_substream(&mut connection);
pending_opening_out_substreams += 1;
}
enum WakeUpReason<TPlat: PlatformRef> {
CoordinatorMessage(service::CoordinatorToConnection),
CoordinatorDead,
SocketEvent(pin::Pin<Box<TPlat::Stream>>, usize),
MessageSent,
NewSubstream(TPlat::Stream, SubstreamDirection),
ConnectionReset,
}
let wake_up_reason: WakeUpReason<TPlat> = {
let coordinator_message = async {
match coordinator_to_connection.next().await {
Some(msg) => WakeUpReason::CoordinatorMessage(msg),
None => WakeUpReason::CoordinatorDead,
}
};
let socket_event = {
let fut = if message_sending.as_ref().as_pin_ref().is_none()
&& !when_substreams_rw_ready.is_empty()
{
Some(when_substreams_rw_ready.select_next_some())
} else {
None
};
async move {
if let Some(fut) = fut {
let (stream, substream_id) = fut.await;
WakeUpReason::SocketEvent(stream, substream_id)
} else {
future::pending().await
}
}
};
let message_sent = async {
let result: Result<(), _> =
if let Some(message_sending) = message_sending.as_mut().as_pin_mut() {
message_sending.await
} else {
future::pending().await
};
message_sending.set(None);
if result.is_ok() {
WakeUpReason::MessageSent
} else {
WakeUpReason::CoordinatorDead
}
};
let next_substream = async {
if connection_task.is_reset_called() {
future::pending().await
} else {
match platform.next_substream(&mut connection).await {
Some((stream, direction)) => WakeUpReason::NewSubstream(stream, direction),
None => WakeUpReason::ConnectionReset,
}
}
};
coordinator_message
.or(socket_event)
.or(message_sent)
.or(next_substream)
.await
};
match wake_up_reason {
WakeUpReason::CoordinatorMessage(message) => {
connection_task.inject_coordinator_message(&platform.now(), message);
}
WakeUpReason::CoordinatorDead => {
log!(
&platform,
Trace,
"connections",
"shutdown",
address = address_string
);
return;
}
WakeUpReason::SocketEvent(mut socket, substream_id) => {
debug_assert!(message_sending.is_none());
let substream_fate = match platform.read_write_access(socket.as_mut()) {
Ok(mut socket_read_write) => {
let read_bytes_before = socket_read_write.read_bytes;
let written_bytes_before = socket_read_write.write_bytes_queued;
let write_closed = socket_read_write.write_bytes_queueable.is_none();
let substream_fate = connection_task
.substream_read_write(&substream_id, &mut *socket_read_write);
if socket_read_write.read_bytes != read_bytes_before
|| socket_read_write.write_bytes_queued != written_bytes_before
|| (!write_closed && socket_read_write.write_bytes_queueable.is_none())
{
log!(
&platform,
Trace,
"connections",
"connection-activity",
address = address_string,
read = socket_read_write.read_bytes - read_bytes_before,
written = socket_read_write.write_bytes_queued - written_bytes_before,
wake_up_after = ?socket_read_write.wake_up_after.as_ref().map(|w| {
if *w > socket_read_write.now {
w.clone() - socket_read_write.now.clone()
} else {
Duration::new(0, 0)
}
}),
write_close = ?socket_read_write.write_bytes_queueable.is_none(),
);
}
if let SubstreamFate::Reset = substream_fate {
log!(
&platform,
Trace,
"connections",
"reset-substream",
address = address_string,
substream_id
);
}
substream_fate
}
Err(err) => {
log!(
&platform,
Trace,
"connections",
"substream-reset-by-remote",
address = address_string,
substream_id,
error = ?err
);
connection_task.reset_substream(&substream_id);
SubstreamFate::Reset
}
};
let (task_update, message) = connection_task.pull_message_to_coordinator();
if let Some(task_update) = task_update {
connection_task = task_update;
debug_assert!(message_sending.is_none());
if let Some(message) = message {
message_sending.set(Some(
connection_to_coordinator.send((connection_id, message)),
));
}
} else {
log!(
&platform,
Trace,
"connections",
"shutdown",
address = address_string
);
return;
}
if let SubstreamFate::Continue = substream_fate {
when_substreams_rw_ready.push({
let platform = platform.clone();
Box::pin(async move {
platform.wait_read_write_again(socket.as_mut()).await;
(socket, substream_id)
})
});
}
}
WakeUpReason::MessageSent => {}
WakeUpReason::ConnectionReset => {
debug_assert!(!connection_task.is_reset_called());
log!(
&platform,
Trace,
"connections",
"reset",
address = address_string
);
connection_task.reset();
}
WakeUpReason::NewSubstream(substream, direction) => {
let outbound = match direction {
SubstreamDirection::Outbound => true,
SubstreamDirection::Inbound => false,
};
let substream_id = next_substream_id;
next_substream_id += 1;
log!(
&platform,
Trace,
"connections",
"substream-opened",
address = address_string,
substream_id,
?direction
);
connection_task.add_substream(substream_id, outbound);
if outbound {
pending_opening_out_substreams -= 1;
}
when_substreams_rw_ready
.push(Box::pin(async move { (Box::pin(substream), substream_id) }));
}
}
}
}