rsigma_runtime/tap.rs
1//! Live event-tap capture for the daemon.
2//!
3//! A [`TapRegistry`] holds the set of active capture sessions. The engine hot
4//! path ([`LogProcessor::process_batch_with_format`]) loads the registry once
5//! per batch through an `ArcSwap` and, while at least one session is active,
6//! offers each event to the matching sessions with a non-blocking `try_send`.
7//! Delivery never blocks the engine: a full session channel drops the event
8//! and bumps a per-session counter, so a slow streaming client can never apply
9//! backpressure to detection.
10//!
11//! This module only captures and fans events out to bounded per-session
12//! channels. Redaction, serialization, and HTTP streaming live in the CLI, so
13//! the runtime stays free of the hashing and transport dependencies.
14//!
15//! [`LogProcessor::process_batch_with_format`]: crate::LogProcessor::process_batch_with_format
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::Duration;
20
21use arc_swap::ArcSwap;
22use parking_lot::Mutex;
23use tokio::sync::mpsc;
24
25/// Which point on the decode path a session captures from.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum TapStage {
28 /// The raw input line as received, before parsing. Captures every
29 /// non-empty line, including lines that fail to parse.
30 Raw,
31 /// The decoded event the engine evaluated (post-parse, post-event-filter),
32 /// serialized to JSON.
33 Decoded,
34}
35
36/// One captured item handed to a session's streaming task.
37#[derive(Debug)]
38pub enum TapPayload {
39 /// A raw input line (`raw` stage).
40 Raw(String),
41 /// A decoded event serialized to JSON (`decoded` stage).
42 Decoded(Box<serde_json::Value>),
43}
44
45/// A single active capture session: a stage filter plus a bounded channel into
46/// the streaming task, with capture / drop counters shared with the handle
47/// that owns the receiving end.
48pub(crate) struct TapSession {
49 id: u64,
50 pub(crate) stage: TapStage,
51 tx: mpsc::Sender<TapPayload>,
52 captured: Arc<AtomicU64>,
53 dropped: Arc<AtomicU64>,
54}
55
56impl TapSession {
57 /// Offer one payload without ever blocking. On a full or closed channel
58 /// the payload is dropped and the drop counter is bumped, so the engine
59 /// never waits on a slow consumer.
60 pub(crate) fn offer(&self, payload: TapPayload) {
61 match self.tx.try_send(payload) {
62 Ok(()) => {
63 self.captured.fetch_add(1, Ordering::Relaxed);
64 }
65 Err(_) => {
66 self.dropped.fetch_add(1, Ordering::Relaxed);
67 }
68 }
69 }
70}
71
72/// Handle returned to the streaming task on a successful
73/// [`TapRegistry::register`]. Owns the receiving end of the session channel
74/// and deregisters the session from the registry on drop, so a dropped client
75/// connection tears the session down automatically and stops the hot path from
76/// offering to a dead session.
77pub struct TapSessionHandle {
78 id: u64,
79 registry: Arc<TapRegistry>,
80 /// Receiving end of the session channel; the streaming task drains it.
81 pub rx: mpsc::Receiver<TapPayload>,
82 /// Events successfully queued for this session (delivered to the channel).
83 pub captured: Arc<AtomicU64>,
84 /// Events dropped because the session channel was full.
85 pub dropped: Arc<AtomicU64>,
86}
87
88impl Drop for TapSessionHandle {
89 fn drop(&mut self) {
90 self.registry.deregister(self.id);
91 }
92}
93
94/// The set of active capture sessions plus the daemon-wide tap limits.
95///
96/// Created once at daemon startup when the tap is enabled and installed on the
97/// [`LogProcessor`](crate::LogProcessor) through
98/// [`set_event_tap`](crate::LogProcessor::set_event_tap). Shared via `Arc`:
99/// the engine hot path reads the active-session snapshot, while the CLI
100/// session manager registers and deregisters sessions.
101pub struct TapRegistry {
102 buffer_events: usize,
103 max_sessions: usize,
104 max_duration: Duration,
105 next_id: AtomicU64,
106 /// Authoritative session list, guarded for add/remove.
107 sessions: Mutex<Vec<Arc<TapSession>>>,
108 /// Wait-free snapshot the hot path loads once per batch.
109 snapshot: ArcSwap<Vec<Arc<TapSession>>>,
110}
111
112impl TapRegistry {
113 /// Build an empty registry with the given per-session channel capacity,
114 /// concurrent-session cap, and maximum capture window.
115 pub fn new(buffer_events: usize, max_sessions: usize, max_duration: Duration) -> Arc<Self> {
116 Arc::new(Self {
117 buffer_events: buffer_events.max(1),
118 max_sessions,
119 max_duration,
120 next_id: AtomicU64::new(0),
121 sessions: Mutex::new(Vec::new()),
122 snapshot: ArcSwap::from_pointee(Vec::new()),
123 })
124 }
125
126 /// Largest capture window the daemon will honor (the `duration` query
127 /// param is rejected above this).
128 pub fn max_duration(&self) -> Duration {
129 self.max_duration
130 }
131
132 /// Number of currently-active sessions.
133 pub fn active_sessions(&self) -> usize {
134 self.snapshot.load().len()
135 }
136
137 /// Register a new session for `stage`. Returns `None` when the active
138 /// session count is already at the configured cap (the caller maps this to
139 /// a `409`).
140 pub fn register(self: &Arc<Self>, stage: TapStage) -> Option<TapSessionHandle> {
141 let mut sessions = self.sessions.lock();
142 if sessions.len() >= self.max_sessions {
143 return None;
144 }
145 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
146 let (tx, rx) = mpsc::channel(self.buffer_events);
147 let captured = Arc::new(AtomicU64::new(0));
148 let dropped = Arc::new(AtomicU64::new(0));
149 sessions.push(Arc::new(TapSession {
150 id,
151 stage,
152 tx,
153 captured: captured.clone(),
154 dropped: dropped.clone(),
155 }));
156 self.publish(&sessions);
157 Some(TapSessionHandle {
158 id,
159 registry: self.clone(),
160 rx,
161 captured,
162 dropped,
163 })
164 }
165
166 fn deregister(&self, id: u64) {
167 let mut sessions = self.sessions.lock();
168 let before = sessions.len();
169 sessions.retain(|s| s.id != id);
170 if sessions.len() != before {
171 self.publish(&sessions);
172 }
173 }
174
175 fn publish(&self, sessions: &[Arc<TapSession>]) {
176 self.snapshot.store(Arc::new(sessions.to_vec()));
177 }
178
179 /// Wait-free snapshot of the active sessions for one batch. The hot path
180 /// loads this once and reuses it for every line and event in the batch.
181 pub(crate) fn sessions_snapshot(&self) -> arc_swap::Guard<Arc<Vec<Arc<TapSession>>>> {
182 self.snapshot.load()
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[tokio::test]
191 async fn register_respects_session_cap() {
192 let reg = TapRegistry::new(8, 2, Duration::from_secs(30));
193 let a = reg.register(TapStage::Decoded);
194 let b = reg.register(TapStage::Raw);
195 assert!(a.is_some());
196 assert!(b.is_some());
197 assert_eq!(reg.active_sessions(), 2);
198 assert!(reg.register(TapStage::Decoded).is_none(), "third over cap");
199 }
200
201 #[tokio::test]
202 async fn dropping_handle_deregisters_session() {
203 let reg = TapRegistry::new(8, 2, Duration::from_secs(30));
204 let handle = reg.register(TapStage::Decoded).expect("registered");
205 assert_eq!(reg.active_sessions(), 1);
206 drop(handle);
207 assert_eq!(reg.active_sessions(), 0);
208 // A slot freed up, so a new session registers.
209 assert!(reg.register(TapStage::Raw).is_some());
210 }
211
212 #[tokio::test]
213 async fn offer_delivers_until_full_then_drops() {
214 let reg = TapRegistry::new(2, 1, Duration::from_secs(30));
215 let handle = reg.register(TapStage::Raw).expect("registered");
216 let snapshot = reg.sessions_snapshot();
217 let session = &snapshot[0];
218
219 session.offer(TapPayload::Raw("a".into()));
220 session.offer(TapPayload::Raw("b".into()));
221 // Channel capacity is 2; the third is dropped, not blocked.
222 session.offer(TapPayload::Raw("c".into()));
223
224 assert_eq!(handle.captured.load(Ordering::Relaxed), 2);
225 assert_eq!(handle.dropped.load(Ordering::Relaxed), 1);
226 }
227}