use core::time::Duration;
use netcore::smoltcp;
use tracing::Instrument;
#[tracing::instrument(skip_all, fields(?poll_max_delay))]
pub fn run_blocking(
netstack: &mut netcore::Netstack,
dev: &mut impl smoltcp::phy::Device,
mut now: impl FnMut() -> smoltcp::time::Instant,
poll_max_delay: Duration,
) {
loop {
let now = now();
let span = tracing::trace_span!("loop", %now, delay = tracing::field::Empty).entered();
netstack.poll_device_io(now, &mut *dev);
let delay = netstack
.poll_delay(now)
.unwrap_or(Duration::MAX)
.min(poll_max_delay);
span.record("delay", tracing::field::debug(delay));
tracing::trace!("wait");
match netstack.wait_for_cmd_blocking(Some(delay)) {
Ok(cmd) => {
netstack.process_one_cmd(cmd);
netstack.process_cmds();
}
Err(netcore::flume::RecvTimeoutError::Timeout) => {
}
Err(netcore::flume::RecvTimeoutError::Disconnected) => {
unreachable!("internal command channel closed")
}
}
}
}
#[tracing::instrument(skip_all)]
pub async fn run(
netstack: &mut netcore::Netstack,
dev: &mut (impl netcore::AsyncWakeDevice + smoltcp::phy::Device + Unpin),
mut now: impl FnMut() -> smoltcp::time::Instant,
sleep: impl AsyncFn(Duration) + Clone,
) {
use futures_util::FutureExt;
loop {
let now = now();
let delay = netstack.poll_delay(now);
let span = tracing::trace_span!("loop", %now, ?delay);
let cmd_fut = netstack.wait_for_cmd();
tracing::trace!(parent: &span, "select");
futures_util::select![
_ = netstack.wait_io_async(now, dev)
.instrument(span.clone())
.fuse() =>
{
tracing::trace!(parent: &span, "device wakeup");
},
_ = option_timeout(sleep.clone(), delay).instrument(span.clone()).fuse() => {
tracing::trace!(parent: &span, "timeout wakeup");
},
cmd = cmd_fut.instrument(span.clone()).fuse() => {
span.in_scope(|| {
tracing::trace!("command wakeup");
if let Some(cmd) = cmd {
netstack.process_one_cmd(cmd);
} else {
unreachable!("internal command channel closed");
}
netstack.process_cmds();
});
}
]
}
}
async fn option_timeout(sleep: impl AsyncFn(Duration), dur: Option<Duration>) {
match dur {
Some(dur) => sleep(dur).await,
_ => core::future::pending().await,
}
}