Skip to main content

rsigma_runtime/
tap.rs

1//! Live event-tap capture for the daemon.
2//!
3//! A [`TapRegistry`] holds the set of active capture sessions. The engine hot
4//! path ([`LogProcessor::process_batch_with_format`]) loads the registry once
5//! per batch through an `ArcSwap` and, while at least one session is active,
6//! offers each event to the matching sessions with a non-blocking `try_send`.
7//! Delivery never blocks the engine: a full session channel drops the event
8//! and bumps a per-session counter, so a slow streaming client can never apply
9//! backpressure to detection.
10//!
11//! This module only captures and fans events out to bounded per-session
12//! channels. Redaction, serialization, and HTTP streaming live in the CLI, so
13//! the runtime stays free of the hashing and transport dependencies.
14//!
15//! [`LogProcessor::process_batch_with_format`]: crate::LogProcessor::process_batch_with_format
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::Duration;
20
21use arc_swap::ArcSwap;
22use parking_lot::Mutex;
23use tokio::sync::mpsc;
24
25/// Which point on the decode path a session captures from.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum TapStage {
28    /// The raw input line as received, before parsing. Captures every
29    /// non-empty line, including lines that fail to parse.
30    Raw,
31    /// The decoded event the engine evaluated (post-parse, post-event-filter),
32    /// serialized to JSON.
33    Decoded,
34}
35
36/// One captured item handed to a session's streaming task.
37#[derive(Debug)]
38pub enum TapPayload {
39    /// A raw input line (`raw` stage).
40    Raw(String),
41    /// A decoded event serialized to JSON (`decoded` stage).
42    Decoded(Box<serde_json::Value>),
43}
44
45/// A single active capture session: a stage filter plus a bounded channel into
46/// the streaming task, with capture / drop counters shared with the handle
47/// that owns the receiving end.
48pub(crate) struct TapSession {
49    id: u64,
50    pub(crate) stage: TapStage,
51    tx: mpsc::Sender<TapPayload>,
52    captured: Arc<AtomicU64>,
53    dropped: Arc<AtomicU64>,
54}
55
56impl TapSession {
57    /// Offer one payload without ever blocking. On a full or closed channel
58    /// the payload is dropped and the drop counter is bumped, so the engine
59    /// never waits on a slow consumer.
60    pub(crate) fn offer(&self, payload: TapPayload) {
61        match self.tx.try_send(payload) {
62            Ok(()) => {
63                self.captured.fetch_add(1, Ordering::Relaxed);
64            }
65            Err(_) => {
66                self.dropped.fetch_add(1, Ordering::Relaxed);
67            }
68        }
69    }
70}
71
72/// Handle returned to the streaming task on a successful
73/// [`TapRegistry::register`]. Owns the receiving end of the session channel
74/// and deregisters the session from the registry on drop, so a dropped client
75/// connection tears the session down automatically and stops the hot path from
76/// offering to a dead session.
77pub struct TapSessionHandle {
78    id: u64,
79    registry: Arc<TapRegistry>,
80    /// Receiving end of the session channel; the streaming task drains it.
81    pub rx: mpsc::Receiver<TapPayload>,
82    /// Events successfully queued for this session (delivered to the channel).
83    pub captured: Arc<AtomicU64>,
84    /// Events dropped because the session channel was full.
85    pub dropped: Arc<AtomicU64>,
86}
87
88impl Drop for TapSessionHandle {
89    fn drop(&mut self) {
90        self.registry.deregister(self.id);
91    }
92}
93
94/// The set of active capture sessions plus the daemon-wide tap limits.
95///
96/// Created once at daemon startup when the tap is enabled and installed on the
97/// [`LogProcessor`](crate::LogProcessor) through
98/// [`set_event_tap`](crate::LogProcessor::set_event_tap). Shared via `Arc`:
99/// the engine hot path reads the active-session snapshot, while the CLI
100/// session manager registers and deregisters sessions.
101pub struct TapRegistry {
102    buffer_events: usize,
103    max_sessions: usize,
104    max_duration: Duration,
105    next_id: AtomicU64,
106    /// Authoritative session list, guarded for add/remove.
107    sessions: Mutex<Vec<Arc<TapSession>>>,
108    /// Wait-free snapshot the hot path loads once per batch.
109    snapshot: ArcSwap<Vec<Arc<TapSession>>>,
110}
111
112impl TapRegistry {
113    /// Build an empty registry with the given per-session channel capacity,
114    /// concurrent-session cap, and maximum capture window.
115    pub fn new(buffer_events: usize, max_sessions: usize, max_duration: Duration) -> Arc<Self> {
116        Arc::new(Self {
117            buffer_events: buffer_events.max(1),
118            max_sessions,
119            max_duration,
120            next_id: AtomicU64::new(0),
121            sessions: Mutex::new(Vec::new()),
122            snapshot: ArcSwap::from_pointee(Vec::new()),
123        })
124    }
125
126    /// Largest capture window the daemon will honor (the `duration` query
127    /// param is rejected above this).
128    pub fn max_duration(&self) -> Duration {
129        self.max_duration
130    }
131
132    /// Number of currently-active sessions.
133    pub fn active_sessions(&self) -> usize {
134        self.snapshot.load().len()
135    }
136
137    /// Register a new session for `stage`. Returns `None` when the active
138    /// session count is already at the configured cap (the caller maps this to
139    /// a `409`).
140    pub fn register(self: &Arc<Self>, stage: TapStage) -> Option<TapSessionHandle> {
141        let mut sessions = self.sessions.lock();
142        if sessions.len() >= self.max_sessions {
143            return None;
144        }
145        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
146        let (tx, rx) = mpsc::channel(self.buffer_events);
147        let captured = Arc::new(AtomicU64::new(0));
148        let dropped = Arc::new(AtomicU64::new(0));
149        sessions.push(Arc::new(TapSession {
150            id,
151            stage,
152            tx,
153            captured: captured.clone(),
154            dropped: dropped.clone(),
155        }));
156        self.publish(&sessions);
157        Some(TapSessionHandle {
158            id,
159            registry: self.clone(),
160            rx,
161            captured,
162            dropped,
163        })
164    }
165
166    fn deregister(&self, id: u64) {
167        let mut sessions = self.sessions.lock();
168        let before = sessions.len();
169        sessions.retain(|s| s.id != id);
170        if sessions.len() != before {
171            self.publish(&sessions);
172        }
173    }
174
175    fn publish(&self, sessions: &[Arc<TapSession>]) {
176        self.snapshot.store(Arc::new(sessions.to_vec()));
177    }
178
179    /// Wait-free snapshot of the active sessions for one batch. The hot path
180    /// loads this once and reuses it for every line and event in the batch.
181    pub(crate) fn sessions_snapshot(&self) -> arc_swap::Guard<Arc<Vec<Arc<TapSession>>>> {
182        self.snapshot.load()
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[tokio::test]
191    async fn register_respects_session_cap() {
192        let reg = TapRegistry::new(8, 2, Duration::from_secs(30));
193        let a = reg.register(TapStage::Decoded);
194        let b = reg.register(TapStage::Raw);
195        assert!(a.is_some());
196        assert!(b.is_some());
197        assert_eq!(reg.active_sessions(), 2);
198        assert!(reg.register(TapStage::Decoded).is_none(), "third over cap");
199    }
200
201    #[tokio::test]
202    async fn dropping_handle_deregisters_session() {
203        let reg = TapRegistry::new(8, 2, Duration::from_secs(30));
204        let handle = reg.register(TapStage::Decoded).expect("registered");
205        assert_eq!(reg.active_sessions(), 1);
206        drop(handle);
207        assert_eq!(reg.active_sessions(), 0);
208        // A slot freed up, so a new session registers.
209        assert!(reg.register(TapStage::Raw).is_some());
210    }
211
212    #[tokio::test]
213    async fn offer_delivers_until_full_then_drops() {
214        let reg = TapRegistry::new(2, 1, Duration::from_secs(30));
215        let handle = reg.register(TapStage::Raw).expect("registered");
216        let snapshot = reg.sessions_snapshot();
217        let session = &snapshot[0];
218
219        session.offer(TapPayload::Raw("a".into()));
220        session.offer(TapPayload::Raw("b".into()));
221        // Channel capacity is 2; the third is dropped, not blocked.
222        session.offer(TapPayload::Raw("c".into()));
223
224        assert_eq!(handle.captured.load(Ordering::Relaxed), 2);
225        assert_eq!(handle.dropped.load(Ordering::Relaxed), 1);
226    }
227}