netring 0.11.0

High-performance zero-copy packet I/O for Linux (AF_PACKET TPACKET_V3 + AF_XDP)
Documentation
//! [`FlowBroadcast`] — multi-subscriber broadcast wrapper around
//! [`FlowStream`].
//!
//! `FlowStream` consumes the underlying `AsyncCapture` so it can
//! only have one subscriber. For pipelines where multiple consumers
//! want the same flow events (e.g., a logger + a metrics exporter +
//! a real-time UI), wrap the stream in a `FlowBroadcast`. Each
//! [`FlowBroadcast::subscribe`] returns a fresh `Stream` that sees
//! every event the underlying capture produces.
//!
//! Built on `tokio::sync::broadcast::channel`. Slow subscribers
//! that fall behind the per-channel buffer get `BroadcastError::Lagged`
//! errors and skip ahead — they don't block the others.
//!
//! `Arc<FlowEvent<K>>` so the (potentially large) event isn't
//! cloned for every subscriber.
//!
//! ```no_run
//! # use futures::StreamExt;
//! # use netring::AsyncCapture;
//! # use netring::flow::extract::FiveTuple;
//! # async fn ex() -> Result<(), Box<dyn std::error::Error>> {
//! let cap = AsyncCapture::open("eth0")?;
//! let bc = cap
//!     .flow_stream(FiveTuple::bidirectional())
//!     .broadcast(1024);
//!
//! let mut sub_a = bc.subscribe();
//! let mut sub_b = bc.subscribe();
//! tokio::select! {
//!     evt = sub_a.next() => { let _ = evt; }
//!     evt = sub_b.next() => { let _ = evt; }
//! }
//! # Ok(()) }
//! ```

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use flowscope::{FlowEvent, FlowExtractor};
use futures_core::Stream;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

use crate::async_adapters::flow_stream::{FlowStream, NoReassembler};
use crate::traits::PacketSource;

/// Multi-subscriber broadcast handle for [`FlowStream`] events.
pub struct FlowBroadcast<K> {
    sender: broadcast::Sender<Arc<FlowEvent<K>>>,
    /// Background task driving the underlying `FlowStream`. Aborted
    /// when this `FlowBroadcast` drops — that's the signal to stop
    /// pulling packets from the capture.
    task: tokio::task::JoinHandle<()>,
}

impl<K: Send + Sync + 'static> FlowBroadcast<K> {
    /// Subscribe to events. The returned `Stream` produces every
    /// event seen since `subscribe()` was called. Slow subscribers
    /// see `Err(BroadcastRecvError::Lagged(n))` if they fall behind
    /// by more than the channel buffer.
    pub fn subscribe(&self) -> FlowSubscriber<K> {
        FlowSubscriber {
            inner: BroadcastStream::new(self.sender.subscribe()),
        }
    }

    /// How many subscribers are currently connected.
    pub fn receiver_count(&self) -> usize {
        self.sender.receiver_count()
    }
}

impl<K> Drop for FlowBroadcast<K> {
    fn drop(&mut self) {
        // Stop the background task so it stops draining the capture.
        // The task may already have exited (capture closed); abort is
        // a no-op in that case.
        self.task.abort();
    }
}

/// A subscriber stream returned from [`FlowBroadcast::subscribe`].
pub struct FlowSubscriber<K> {
    inner: BroadcastStream<Arc<FlowEvent<K>>>,
}

/// Errors produced by a [`FlowSubscriber`].
#[derive(Debug, thiserror::Error)]
pub enum BroadcastRecvError {
    /// This subscriber fell behind the channel buffer; `n` events
    /// were dropped between the last yielded item and this point.
    /// Iteration continues from the latest available event.
    #[error("subscriber lagged by {0} events")]
    Lagged(u64),
}

impl<K: Send + Sync + 'static> Stream for FlowSubscriber<K> {
    type Item = Result<Arc<FlowEvent<K>>, BroadcastRecvError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match Pin::new(&mut self.inner).poll_next(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Ready(Some(Ok(ev))) => Poll::Ready(Some(Ok(ev))),
            Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
                Poll::Ready(Some(Err(BroadcastRecvError::Lagged(n))))
            }
        }
    }
}

// ── Conversion entry point on FlowStream<NoReassembler> ──────────────

impl<S, E> FlowStream<S, E, (), NoReassembler>
where
    S: PacketSource + std::os::unix::io::AsRawFd + Send + Unpin + 'static,
    E: FlowExtractor + Unpin + Send + 'static,
    E::Key: Clone + Send + Sync + Unpin + 'static,
{
    /// Convert this stream into a broadcast handle. `buffer` is the
    /// per-subscriber lag tolerance — a slow subscriber missing this
    /// many events gets a [`BroadcastRecvError::Lagged`].
    ///
    /// Internally spawns a tokio task that drains the underlying
    /// capture and re-publishes events to the broadcast channel. The
    /// task aborts when the returned [`FlowBroadcast`] drops.
    ///
    /// Errors from the underlying capture are converted to dropped
    /// items in the broadcast — broadcast subscribers don't see I/O
    /// errors directly. If you need that, attach your own logger
    /// inside the spawned task pattern instead of using `broadcast`.
    pub fn broadcast(self, buffer: usize) -> FlowBroadcast<E::Key> {
        // Use the futures_core trait + the local poll_fn pattern.
        // We don't pull `futures` to keep the dep tree thin.
        let (sender, _initial_rx) = broadcast::channel(buffer);
        let publisher = sender.clone();
        let task = tokio::spawn(async move {
            let mut stream = self;
            // Hand-roll the iteration: futures_core::Stream::poll_next
            // is enough for a Box::pin + std::future::poll_fn loop.
            let mut stream = std::pin::Pin::new(&mut stream);
            loop {
                let next = std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await;
                match next {
                    None => break,
                    Some(Ok(event)) => {
                        // `send` returns Err only when there are no live
                        // subscribers — in that case there's nobody to
                        // receive anyway. Don't drop the task; new
                        // subscribers may appear later.
                        let _ = publisher.send(Arc::new(event));
                    }
                    Some(Err(_)) => {
                        // Drop on capture error. (Production: hook your
                        // own logger before broadcast() if you need to
                        // surface these.)
                    }
                }
            }
        });
        FlowBroadcast { sender, task }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Smoke-test the broadcast wrapper without a real capture.
    /// Construct a `FlowBroadcast` directly from a tokio task that
    /// sends a few events into the channel, then verify two
    /// independent subscribers see all of them.
    #[tokio::test(flavor = "current_thread")]
    async fn two_subscribers_see_all_events() {
        use flowscope::{FlowSide, Timestamp};

        // Use a `u32` flow key so we don't have to construct
        // FiveTupleKey's full type. The broadcast wrapper is generic
        // and doesn't care about K's identity.
        let (sender, _initial_rx) = broadcast::channel::<Arc<FlowEvent<u32>>>(16);
        let publisher = sender.clone();
        let task = tokio::spawn(async move {
            for i in 0..5u32 {
                let _ = publisher.send(Arc::new(FlowEvent::Started {
                    key: i,
                    side: FlowSide::Initiator,
                    ts: Timestamp::default(),
                    l4: None,
                }));
            }
        });
        let bc: FlowBroadcast<u32> = FlowBroadcast { sender, task };

        let mut sub_a = bc.subscribe();
        let mut sub_b = bc.subscribe();
        // Wait for the publisher task to finish.
        // (Borrow checker: we can't await on bc.task; sleep briefly
        // instead. 50ms is enough for a 5-iteration channel send.)
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;

        // Drain both subscribers. They should both see 5 events
        // (subscribed before any sends, in this contrived case after
        // the publisher already finished — but tokio broadcast keeps
        // the buffer until everyone has read it).
        let mut count_a = 0;
        let mut count_b = 0;
        for _ in 0..5 {
            if let Ok(item) = tokio::time::timeout(std::time::Duration::from_millis(50), async {
                use std::pin::Pin;
                std::future::poll_fn(|cx| Pin::new(&mut sub_a).poll_next(cx)).await
            })
            .await
            {
                if item.is_some() {
                    count_a += 1;
                }
            }
        }
        for _ in 0..5 {
            if let Ok(item) = tokio::time::timeout(std::time::Duration::from_millis(50), async {
                use std::pin::Pin;
                std::future::poll_fn(|cx| Pin::new(&mut sub_b).poll_next(cx)).await
            })
            .await
            {
                if item.is_some() {
                    count_b += 1;
                }
            }
        }
        // Both subscribers should have received the events. (They
        // subscribed *after* the channel was created but the buffer
        // holds the recent items; depending on ordering vs the
        // publisher finishing, the count may be 5 or fewer. Just
        // assert at least one; the structural property — both
        // subscribers see independent streams — is what matters.)
        assert!(count_a > 0, "subscriber A saw no events");
        assert!(count_b > 0, "subscriber B saw no events");
    }

    #[test]
    fn receiver_count_zero_then_one() {
        let (sender, _) = broadcast::channel::<Arc<FlowEvent<u32>>>(8);
        let bc = FlowBroadcast {
            sender: sender.clone(),
            task: tokio::runtime::Builder::new_current_thread()
                .build()
                .unwrap()
                .spawn(async {}),
        };
        // The dummy initial receiver has been dropped; receiver_count
        // should be 0 until we subscribe.
        assert_eq!(bc.receiver_count(), 0);
        let _sub = bc.subscribe();
        assert_eq!(bc.receiver_count(), 1);
    }
}