veilid-core 0.5.3

Core library used to create a Veilid node and operate it as part of an application
Documentation
use futures_util::StreamExt as _;
use stop_token::future::FutureExt as _;

impl_veilid_log_facility!("net");

use super::*;

#[derive(Debug)]
pub(super) enum RelayWorkerRequestKind {
    Relay {
        relay_nr: FilteredNodeRef,
        data: Bytes,
        relay_kind: RelayKind,
    },
}

#[derive(Debug)]
pub(super) struct RelayWorkerRequest {
    enqueued_ts: Timestamp,
    span: Span,
    kind: RelayWorkerRequestKind,
}

impl NetworkManager {
    pub(super) fn startup_relay_workers(&self) -> EyreResult<()> {
        let mut inner = self.inner.lock();

        // Relay workers
        let channel = flume::bounded(self.queue_size as usize);
        inner.relay_send_channel = Some(channel.0.clone());
        inner.relay_stop_source = Some(StopSource::new());

        // spin up N workers
        veilid_log!(self debug "Starting {} relay workers", self.concurrency);
        for task_n in 0..self.concurrency {
            let registry = self.registry();
            let receiver = channel.1.clone();
            let stop_token = inner.relay_stop_source.as_ref().unwrap_or_log().token();
            let jh = spawn(&format!("relay worker {}", task_n), async move {
                let this = registry.network_manager();
                Box::pin(this.relay_worker(stop_token, receiver)).await
            });
            inner.relay_worker_join_handles.push(jh);
        }
        Ok(())
    }

    pub(super) async fn shutdown_relay_workers(&self) {
        // Stop the relay workers
        let mut unord = FuturesUnordered::new();
        {
            let mut inner = self.inner.lock();
            // take the join handles out
            for h in inner.relay_worker_join_handles.drain(..) {
                unord.push(h);
            }
            // drop the stop
            drop(inner.relay_stop_source.take());
        }
        veilid_log!(self debug "Stopping {} relay workers", unord.len());

        // Wait for them to complete
        while unord.next().await.is_some() {}
    }

    pub(super) async fn relay_worker(
        &self,
        stop_token: StopToken,
        receiver: flume::Receiver<RelayWorkerRequest>,
    ) {
        while let Ok(Ok(request)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
            let relay_request_span = tracing::trace_span!("relay request");
            relay_request_span.follows_from(request.span);

            // Measure dequeue time
            let dequeue_ts = Timestamp::now_non_decreasing();
            let dequeue_latency = dequeue_ts.duration_since(request.enqueued_ts);

            // Process request kind
            match request.kind {
                RelayWorkerRequestKind::Relay {
                    relay_nr,
                    data,
                    relay_kind,
                } => {
                    match relay_kind {
                        RelayKind::Inbound => {
                            // Inbound relay the packet to the desired destination
                            veilid_log!(self trace "inbound relaying {} bytes to {}", data.len(), relay_nr);
                            if let Err(e) =
                                pin_future!(self.send_inbound_relay_data(relay_nr, data)).await
                            {
                                veilid_log!(self debug "failed to inbound relay envelope: {}" ,e);
                            }
                        }
                        RelayKind::Outbound => {
                            // Outbound relay the packet to the desired destination
                            // Because the source was in the client allowlist, this node has opted in
                            // to making new flows and connections for the purposes of relaying
                            veilid_log!(self trace "outbound relaying {} bytes to {}", data.len(), relay_nr);
                            if let Err(e) = pin_future!(self.send_data(relay_nr, data)).await {
                                veilid_log!(self debug "failed to outbound relay envelope: {}" ,e);
                            }
                        }
                    }
                }
            }

            // Measure process time
            let process_ts = Timestamp::now_non_decreasing();
            let process_latency = process_ts.duration_since(dequeue_ts);

            // Accounting
            self.stats_relay_processed(dequeue_latency, process_latency)
        }
    }

    #[cfg_attr(
        feature = "instrument",
        instrument(level = "trace", target = "rpc", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
    )]
    pub(super) fn enqueue_relay(
        &self,
        relay_nr: FilteredNodeRef,
        data: Bytes,
        relay_kind: RelayKind,
    ) -> EyreResult<()> {
        let _guard = self
            .startup_context
            .startup_lock
            .enter()
            .wrap_err("not started up")?;

        let send_channel = {
            let inner = self.inner.lock();
            let Some(send_channel) = inner.relay_send_channel.as_ref().cloned() else {
                bail!("send channel is closed");
            };
            send_channel
        };
        send_channel
            .try_send(RelayWorkerRequest {
                enqueued_ts: Timestamp::now_non_decreasing(),
                span: Span::current(),
                kind: RelayWorkerRequestKind::Relay {
                    relay_nr,
                    data,
                    relay_kind,
                },
            })
            .map_err(|e| eyre!("failed to enqueue relay: {}", e))?;
        Ok(())
    }
}