Skip to main content

laminar_db/
metrics.rs

1//! Pipeline observability metrics types.
2//!
3//! Provides atomic counters for pipeline-loop aggregates and snapshot types
4//! for querying source, stream, and pipeline-wide metrics from user code.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9/// The state of a streaming pipeline.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PipelineState {
12    /// Pipeline has been created but not started.
13    Created,
14    /// Pipeline is in the process of starting.
15    Starting,
16    /// Pipeline is actively processing events.
17    Running,
18    /// Pipeline is gracefully shutting down.
19    ShuttingDown,
20    /// Pipeline has stopped.
21    Stopped,
22}
23
24impl std::fmt::Display for PipelineState {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Self::Created => write!(f, "Created"),
28            Self::Starting => write!(f, "Starting"),
29            Self::Running => write!(f, "Running"),
30            Self::ShuttingDown => write!(f, "ShuttingDown"),
31            Self::Stopped => write!(f, "Stopped"),
32        }
33    }
34}
35
36/// Cache line size (bytes) for padding between hot/cold counter groups.
37const CACHE_LINE_SIZE: usize = 64;
38
39/// Shared atomic counters incremented by the pipeline processing loop.
40///
41/// Counters are separated into two groups on different cache lines to
42/// prevent false sharing between Ring 0 (hot path) and Ring 2 (checkpoint):
43///
44/// - **Ring 0 group** (`events_ingested` … `total_batches`): incremented on
45///   every processing cycle from the reactor thread.
46/// - **Ring 2 group** (`checkpoints_completed` … `checkpoint_epoch`): updated
47///   from the async checkpoint coordinator.
48///
49/// All reads and writes use `Ordering::Relaxed` — metrics are advisory,
50/// not transactional.
51#[repr(C)]
52pub struct PipelineCounters {
53    // ── Ring 0 counters (hot path, tight loop) ──
54    /// Total events ingested from sources.
55    pub events_ingested: AtomicU64,
56    /// Total events emitted to streams/sinks.
57    pub events_emitted: AtomicU64,
58    /// Total events dropped (e.g. backpressure).
59    pub events_dropped: AtomicU64,
60    /// Total processing cycles completed.
61    pub cycles: AtomicU64,
62    /// Duration of the last processing cycle in nanoseconds.
63    pub last_cycle_duration_ns: AtomicU64,
64    /// Total batches processed.
65    pub total_batches: AtomicU64,
66
67    // ── Cache line padding ──
68    // Ring 0 group is 6 × 8 = 48 bytes. Pad to a full cache line boundary
69    // so Ring 2 counters start on a separate cache line.
70    _pad: [u8; CACHE_LINE_SIZE - (6 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],
71
72    // ── Ring 2 counters (checkpoint coordinator, async) ──
73    /// Total checkpoints completed successfully.
74    pub checkpoints_completed: AtomicU64,
75    /// Total checkpoints that failed.
76    pub checkpoints_failed: AtomicU64,
77    /// Duration of the last checkpoint in milliseconds.
78    pub last_checkpoint_duration_ms: AtomicU64,
79    /// Current checkpoint epoch.
80    pub checkpoint_epoch: AtomicU64,
81}
82
83impl PipelineCounters {
84    /// Create zeroed counters.
85    #[must_use]
86    pub fn new() -> Self {
87        Self {
88            events_ingested: AtomicU64::new(0),
89            events_emitted: AtomicU64::new(0),
90            events_dropped: AtomicU64::new(0),
91            cycles: AtomicU64::new(0),
92            last_cycle_duration_ns: AtomicU64::new(0),
93            total_batches: AtomicU64::new(0),
94            _pad: [0; CACHE_LINE_SIZE - (6 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],
95            checkpoints_completed: AtomicU64::new(0),
96            checkpoints_failed: AtomicU64::new(0),
97            last_checkpoint_duration_ms: AtomicU64::new(0),
98            checkpoint_epoch: AtomicU64::new(0),
99        }
100    }
101
102    /// Take a snapshot of all counters.
103    #[must_use]
104    pub fn snapshot(&self) -> CounterSnapshot {
105        CounterSnapshot {
106            events_ingested: self.events_ingested.load(Ordering::Relaxed),
107            events_emitted: self.events_emitted.load(Ordering::Relaxed),
108            events_dropped: self.events_dropped.load(Ordering::Relaxed),
109            cycles: self.cycles.load(Ordering::Relaxed),
110            last_cycle_duration_ns: self.last_cycle_duration_ns.load(Ordering::Relaxed),
111            total_batches: self.total_batches.load(Ordering::Relaxed),
112            checkpoints_completed: self.checkpoints_completed.load(Ordering::Relaxed),
113            checkpoints_failed: self.checkpoints_failed.load(Ordering::Relaxed),
114            last_checkpoint_duration_ms: self.last_checkpoint_duration_ms.load(Ordering::Relaxed),
115            checkpoint_epoch: self.checkpoint_epoch.load(Ordering::Relaxed),
116        }
117    }
118}
119
120impl Default for PipelineCounters {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126/// A point-in-time snapshot of [`PipelineCounters`].
127#[derive(Debug, Clone, Copy)]
128pub struct CounterSnapshot {
129    /// Total events ingested.
130    pub events_ingested: u64,
131    /// Total events emitted.
132    pub events_emitted: u64,
133    /// Total events dropped.
134    pub events_dropped: u64,
135    /// Total processing cycles.
136    pub cycles: u64,
137    /// Last cycle duration in nanoseconds.
138    pub last_cycle_duration_ns: u64,
139    /// Total batches processed.
140    pub total_batches: u64,
141    /// Total checkpoints completed.
142    pub checkpoints_completed: u64,
143    /// Total checkpoints failed.
144    pub checkpoints_failed: u64,
145    /// Last checkpoint duration in milliseconds.
146    pub last_checkpoint_duration_ms: u64,
147    /// Current checkpoint epoch.
148    pub checkpoint_epoch: u64,
149}
150
151/// Pipeline-wide metrics snapshot.
152#[derive(Debug, Clone)]
153pub struct PipelineMetrics {
154    /// Total events ingested across all sources.
155    pub total_events_ingested: u64,
156    /// Total events emitted to streams/sinks.
157    pub total_events_emitted: u64,
158    /// Total events dropped.
159    pub total_events_dropped: u64,
160    /// Total processing cycles completed.
161    pub total_cycles: u64,
162    /// Total batches processed.
163    pub total_batches: u64,
164    /// Time since the pipeline was created.
165    pub uptime: Duration,
166    /// Current pipeline state.
167    pub state: PipelineState,
168    /// Duration of the last processing cycle in nanoseconds.
169    pub last_cycle_duration_ns: u64,
170    /// Number of registered sources.
171    pub source_count: usize,
172    /// Number of registered streams.
173    pub stream_count: usize,
174    /// Number of registered sinks.
175    pub sink_count: usize,
176    /// Global pipeline watermark (minimum across all source watermarks).
177    pub pipeline_watermark: i64,
178}
179
180/// Metrics for a single registered source.
181#[derive(Debug, Clone)]
182pub struct SourceMetrics {
183    /// Source name.
184    pub name: String,
185    /// Total events pushed to this source (sequence number).
186    pub total_events: u64,
187    /// Number of events currently buffered.
188    pub pending: usize,
189    /// Buffer capacity.
190    pub capacity: usize,
191    /// Whether the source is experiencing backpressure (>80% full).
192    pub is_backpressured: bool,
193    /// Current watermark value.
194    pub watermark: i64,
195    /// Buffer utilization ratio (0.0 to 1.0).
196    pub utilization: f64,
197}
198
199/// Metrics for a single registered stream.
200#[derive(Debug, Clone)]
201pub struct StreamMetrics {
202    /// Stream name.
203    pub name: String,
204    /// Total events pushed to this stream.
205    pub total_events: u64,
206    /// Number of events currently buffered.
207    pub pending: usize,
208    /// Buffer capacity.
209    pub capacity: usize,
210    /// Whether the stream is experiencing backpressure (>80% full).
211    pub is_backpressured: bool,
212    /// Current watermark value.
213    pub watermark: i64,
214    /// SQL query that defines this stream, if any.
215    pub sql: Option<String>,
216}
217
218/// Backpressure threshold: a buffer is considered backpressured when
219/// its utilization exceeds this fraction.
220const BACKPRESSURE_THRESHOLD: f64 = 0.8;
221
222/// Compute whether a buffer is backpressured given pending and capacity.
223#[must_use]
224#[allow(clippy::cast_precision_loss)]
225pub(crate) fn is_backpressured(pending: usize, capacity: usize) -> bool {
226    capacity > 0 && (pending as f64 / capacity as f64) > BACKPRESSURE_THRESHOLD
227}
228
229/// Compute buffer utilization as a ratio (0.0 to 1.0).
230#[must_use]
231#[allow(clippy::cast_precision_loss)]
232pub(crate) fn utilization(pending: usize, capacity: usize) -> f64 {
233    if capacity == 0 {
234        0.0
235    } else {
236        pending as f64 / capacity as f64
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[test]
245    fn test_pipeline_counters_default() {
246        let c = PipelineCounters::new();
247        let s = c.snapshot();
248        assert_eq!(s.events_ingested, 0);
249        assert_eq!(s.events_emitted, 0);
250        assert_eq!(s.events_dropped, 0);
251        assert_eq!(s.cycles, 0);
252        assert_eq!(s.total_batches, 0);
253        assert_eq!(s.last_cycle_duration_ns, 0);
254    }
255
256    #[test]
257    fn test_pipeline_counters_increment() {
258        let c = PipelineCounters::new();
259        c.events_ingested.fetch_add(100, Ordering::Relaxed);
260        c.events_emitted.fetch_add(50, Ordering::Relaxed);
261        c.events_dropped.fetch_add(3, Ordering::Relaxed);
262        c.cycles.fetch_add(10, Ordering::Relaxed);
263        c.total_batches.fetch_add(5, Ordering::Relaxed);
264        c.last_cycle_duration_ns.store(1234, Ordering::Relaxed);
265
266        let s = c.snapshot();
267        assert_eq!(s.events_ingested, 100);
268        assert_eq!(s.events_emitted, 50);
269        assert_eq!(s.events_dropped, 3);
270        assert_eq!(s.cycles, 10);
271        assert_eq!(s.total_batches, 5);
272        assert_eq!(s.last_cycle_duration_ns, 1234);
273    }
274
275    #[test]
276    fn test_pipeline_counters_concurrent_access() {
277        use std::sync::Arc;
278        let c = Arc::new(PipelineCounters::new());
279        let c2 = Arc::clone(&c);
280
281        let t = std::thread::spawn(move || {
282            for _ in 0..1000 {
283                c2.events_ingested.fetch_add(1, Ordering::Relaxed);
284            }
285        });
286
287        for _ in 0..1000 {
288            c.events_ingested.fetch_add(1, Ordering::Relaxed);
289        }
290
291        t.join().unwrap();
292        assert_eq!(c.events_ingested.load(Ordering::Relaxed), 2000);
293    }
294
295    #[test]
296    fn test_pipeline_state_display() {
297        assert_eq!(PipelineState::Created.to_string(), "Created");
298        assert_eq!(PipelineState::Starting.to_string(), "Starting");
299        assert_eq!(PipelineState::Running.to_string(), "Running");
300        assert_eq!(PipelineState::ShuttingDown.to_string(), "ShuttingDown");
301        assert_eq!(PipelineState::Stopped.to_string(), "Stopped");
302    }
303
304    #[test]
305    fn test_pipeline_state_equality() {
306        assert_eq!(PipelineState::Running, PipelineState::Running);
307        assert_ne!(PipelineState::Created, PipelineState::Running);
308    }
309
310    #[test]
311    fn test_backpressure_detection() {
312        // Empty buffer: not backpressured
313        assert!(!is_backpressured(0, 100));
314        // 50% full: not backpressured
315        assert!(!is_backpressured(50, 100));
316        // 80% full: not backpressured (threshold is >0.8, not >=)
317        assert!(!is_backpressured(80, 100));
318        // 81% full: backpressured
319        assert!(is_backpressured(81, 100));
320        // Full: backpressured
321        assert!(is_backpressured(100, 100));
322        // Zero capacity: not backpressured
323        assert!(!is_backpressured(0, 0));
324    }
325
326    #[test]
327    fn test_utilization() {
328        assert!((utilization(0, 100) - 0.0).abs() < f64::EPSILON);
329        assert!((utilization(50, 100) - 0.5).abs() < f64::EPSILON);
330        assert!((utilization(100, 100) - 1.0).abs() < f64::EPSILON);
331        assert!((utilization(0, 0) - 0.0).abs() < f64::EPSILON);
332    }
333
334    #[test]
335    fn test_pipeline_metrics_clone() {
336        let m = PipelineMetrics {
337            total_events_ingested: 100,
338            total_events_emitted: 50,
339            total_events_dropped: 0,
340            total_cycles: 10,
341            total_batches: 5,
342            uptime: Duration::from_secs(60),
343            state: PipelineState::Running,
344            last_cycle_duration_ns: 500,
345            source_count: 2,
346            stream_count: 1,
347            sink_count: 1,
348            pipeline_watermark: i64::MIN,
349        };
350        let m2 = m.clone();
351        assert_eq!(m2.total_events_ingested, 100);
352        assert_eq!(m2.state, PipelineState::Running);
353    }
354
355    #[test]
356    fn test_source_metrics_debug() {
357        let m = SourceMetrics {
358            name: "trades".to_string(),
359            total_events: 1000,
360            pending: 50,
361            capacity: 1024,
362            is_backpressured: false,
363            watermark: 12345,
364            utilization: 0.05,
365        };
366        let dbg = format!("{m:?}");
367        assert!(dbg.contains("trades"));
368        assert!(dbg.contains("1000"));
369    }
370
371    #[test]
372    fn test_cache_line_separation() {
373        let c = PipelineCounters::new();
374        let base = &raw const c as usize;
375        let ring0_start = &raw const c.events_ingested as usize;
376        let ring2_start = &raw const c.checkpoints_completed as usize;
377
378        // Ring 0 starts at offset 0
379        assert_eq!(ring0_start - base, 0);
380        // Ring 2 starts at least 64 bytes from Ring 0
381        assert!(
382            ring2_start - ring0_start >= 64,
383            "Ring 2 counters must be on a separate cache line (offset: {})",
384            ring2_start - ring0_start
385        );
386    }
387
388    #[test]
389    fn test_checkpoint_counters() {
390        let c = PipelineCounters::new();
391        c.checkpoints_completed.fetch_add(5, Ordering::Relaxed);
392        c.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
393        c.last_checkpoint_duration_ms.store(250, Ordering::Relaxed);
394        c.checkpoint_epoch.store(10, Ordering::Relaxed);
395
396        let s = c.snapshot();
397        assert_eq!(s.checkpoints_completed, 5);
398        assert_eq!(s.checkpoints_failed, 1);
399        assert_eq!(s.last_checkpoint_duration_ms, 250);
400        assert_eq!(s.checkpoint_epoch, 10);
401    }
402
403    #[test]
404    fn test_stream_metrics_with_sql() {
405        let m = StreamMetrics {
406            name: "avg_price".to_string(),
407            total_events: 500,
408            pending: 0,
409            capacity: 1024,
410            is_backpressured: false,
411            watermark: 0,
412            sql: Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol".to_string()),
413        };
414        assert_eq!(
415            m.sql.as_deref(),
416            Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol")
417        );
418    }
419}