use futures_util::StreamExt as _;
use stop_token::future::FutureExt as _;
impl_veilid_log_facility!("net");
use super::*;
#[derive(Debug)]
pub(super) enum RelayWorkerRequestKind {
Relay {
relay_nr: FilteredNodeRef,
data: Bytes,
relay_kind: RelayKind,
},
}
#[derive(Debug)]
pub(super) struct RelayWorkerRequest {
enqueued_ts: Timestamp,
span: Span,
kind: RelayWorkerRequestKind,
}
impl NetworkManager {
pub(super) fn startup_relay_workers(&self) -> EyreResult<()> {
let mut inner = self.inner.lock();
let channel = flume::bounded(self.queue_size as usize);
inner.relay_send_channel = Some(channel.0.clone());
inner.relay_stop_source = Some(StopSource::new());
veilid_log!(self debug "Starting {} relay workers", self.concurrency);
for task_n in 0..self.concurrency {
let registry = self.registry();
let receiver = channel.1.clone();
let stop_token = inner.relay_stop_source.as_ref().unwrap_or_log().token();
let jh = spawn(&format!("relay worker {}", task_n), async move {
let this = registry.network_manager();
Box::pin(this.relay_worker(stop_token, receiver)).await
});
inner.relay_worker_join_handles.push(jh);
}
Ok(())
}
pub(super) async fn shutdown_relay_workers(&self) {
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
for h in inner.relay_worker_join_handles.drain(..) {
unord.push(h);
}
drop(inner.relay_stop_source.take());
}
veilid_log!(self debug "Stopping {} relay workers", unord.len());
while unord.next().await.is_some() {}
}
pub(super) async fn relay_worker(
&self,
stop_token: StopToken,
receiver: flume::Receiver<RelayWorkerRequest>,
) {
while let Ok(Ok(request)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
let relay_request_span = tracing::trace_span!("relay request");
relay_request_span.follows_from(request.span);
let dequeue_ts = Timestamp::now_non_decreasing();
let dequeue_latency = dequeue_ts.duration_since(request.enqueued_ts);
match request.kind {
RelayWorkerRequestKind::Relay {
relay_nr,
data,
relay_kind,
} => {
match relay_kind {
RelayKind::Inbound => {
veilid_log!(self trace "inbound relaying {} bytes to {}", data.len(), relay_nr);
if let Err(e) =
pin_future!(self.send_inbound_relay_data(relay_nr, data)).await
{
veilid_log!(self debug "failed to inbound relay envelope: {}" ,e);
}
}
RelayKind::Outbound => {
veilid_log!(self trace "outbound relaying {} bytes to {}", data.len(), relay_nr);
if let Err(e) = pin_future!(self.send_data(relay_nr, data)).await {
veilid_log!(self debug "failed to outbound relay envelope: {}" ,e);
}
}
}
}
}
let process_ts = Timestamp::now_non_decreasing();
let process_latency = process_ts.duration_since(dequeue_ts);
self.stats_relay_processed(dequeue_latency, process_latency)
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "rpc", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub(super) fn enqueue_relay(
&self,
relay_nr: FilteredNodeRef,
data: Bytes,
relay_kind: RelayKind,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.relay_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RelayWorkerRequest {
enqueued_ts: Timestamp::now_non_decreasing(),
span: Span::current(),
kind: RelayWorkerRequestKind::Relay {
relay_nr,
data,
relay_kind,
},
})
.map_err(|e| eyre!("failed to enqueue relay: {}", e))?;
Ok(())
}
}