laminar-db 0.18.11

Unified database facade for LaminarDB
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
//! Pipeline observability metrics types.
//!
//! Provides atomic counters for pipeline-loop aggregates and snapshot types
//! for querying source, stream, and pipeline-wide metrics from user code.

use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

/// The state of a streaming pipeline.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineState {
    /// Pipeline has been created but not started.
    Created,
    /// Pipeline is in the process of starting.
    Starting,
    /// Pipeline is actively processing events.
    Running,
    /// Pipeline is gracefully shutting down.
    ShuttingDown,
    /// Pipeline has stopped.
    Stopped,
}

impl std::fmt::Display for PipelineState {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Created => write!(f, "Created"),
            Self::Starting => write!(f, "Starting"),
            Self::Running => write!(f, "Running"),
            Self::ShuttingDown => write!(f, "ShuttingDown"),
            Self::Stopped => write!(f, "Stopped"),
        }
    }
}

/// Cache line size (bytes) for padding between hot/cold counter groups.
const CACHE_LINE_SIZE: usize = 64;

/// Shared atomic counters incremented by the pipeline processing loop.
///
/// Counters are separated into two groups on different cache lines to
/// prevent false sharing between Ring 0 (hot path) and Ring 2 (checkpoint):
///
/// - **Ring 0 group** (`events_ingested` … `total_batches`): incremented on
///   every processing cycle from the reactor thread.
/// - **Ring 2 group** (`checkpoints_completed` … `checkpoint_epoch`): updated
///   from the async checkpoint coordinator.
///
/// All reads and writes use `Ordering::Relaxed` — metrics are advisory,
/// not transactional.
#[repr(C)]
pub struct PipelineCounters {
    // ── Ring 0 counters (hot path, tight loop) ──
    /// Total events ingested from sources.
    pub events_ingested: AtomicU64,
    /// Total events emitted to streams/sinks.
    pub events_emitted: AtomicU64,
    /// Total events dropped (e.g. backpressure).
    pub events_dropped: AtomicU64,
    /// Total processing cycles completed.
    pub cycles: AtomicU64,
    /// Duration of the last processing cycle in nanoseconds.
    pub last_cycle_duration_ns: AtomicU64,
    /// Total batches processed.
    pub total_batches: AtomicU64,

    /// Queries using compiled `PhysicalExpr` (zero-overhead per cycle).
    pub queries_compiled: AtomicU64,
    /// Queries using cached logical plan (physical planning per cycle).
    pub queries_cached_plan: AtomicU64,

    // ── Cache line padding ──
    // Ring 0 group is 8 × 8 = 64 bytes. Pad to a full cache line boundary
    // so Ring 2 counters start on a separate cache line.
    _pad: [u8; CACHE_LINE_SIZE - (8 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],

    // ── Ring 2 counters (checkpoint coordinator, async) ──
    /// Total checkpoints completed successfully.
    pub checkpoints_completed: AtomicU64,
    /// Total checkpoints that failed.
    pub checkpoints_failed: AtomicU64,
    /// Duration of the last checkpoint in milliseconds.
    pub last_checkpoint_duration_ms: AtomicU64,
    /// Current checkpoint epoch.
    pub checkpoint_epoch: AtomicU64,
    /// Maximum configured state bytes per operator (0 = unlimited).
    pub max_state_bytes: AtomicU64,
    /// Cycle duration p50 in nanoseconds (updated periodically).
    pub cycle_p50_ns: AtomicU64,
    /// Cycle duration p95 in nanoseconds (updated periodically).
    pub cycle_p95_ns: AtomicU64,
    /// Cycle duration p99 in nanoseconds (updated periodically).
    pub cycle_p99_ns: AtomicU64,

    // ── Sink 2PC timing (checkpoint coordinator, async) ──
    /// Duration of the last sink pre-commit phase in microseconds.
    pub sink_precommit_duration_us: AtomicU64,
    /// Duration of the last sink commit phase in microseconds.
    pub sink_commit_duration_us: AtomicU64,

    // ── Checkpoint size / lag ──
    /// Size of the last checkpoint in bytes (sidecar + manifest).
    pub last_checkpoint_size_bytes: AtomicU64,
    /// Wall-clock timestamp (ms since epoch) of the last successful checkpoint.
    pub last_checkpoint_timestamp_ms: AtomicU64,
}

impl PipelineCounters {
    /// Create zeroed counters.
    #[must_use]
    pub fn new() -> Self {
        Self {
            events_ingested: AtomicU64::new(0),
            events_emitted: AtomicU64::new(0),
            events_dropped: AtomicU64::new(0),
            cycles: AtomicU64::new(0),
            last_cycle_duration_ns: AtomicU64::new(0),
            total_batches: AtomicU64::new(0),
            queries_compiled: AtomicU64::new(0),
            queries_cached_plan: AtomicU64::new(0),
            _pad: [0; CACHE_LINE_SIZE - (8 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],
            checkpoints_completed: AtomicU64::new(0),
            checkpoints_failed: AtomicU64::new(0),
            last_checkpoint_duration_ms: AtomicU64::new(0),
            checkpoint_epoch: AtomicU64::new(0),
            max_state_bytes: AtomicU64::new(0),
            cycle_p50_ns: AtomicU64::new(0),
            cycle_p95_ns: AtomicU64::new(0),
            cycle_p99_ns: AtomicU64::new(0),
            sink_precommit_duration_us: AtomicU64::new(0),
            sink_commit_duration_us: AtomicU64::new(0),
            last_checkpoint_size_bytes: AtomicU64::new(0),
            last_checkpoint_timestamp_ms: AtomicU64::new(0),
        }
    }

    /// Take a snapshot of all counters.
    #[must_use]
    pub fn snapshot(&self) -> CounterSnapshot {
        CounterSnapshot {
            events_ingested: self.events_ingested.load(Ordering::Relaxed),
            events_emitted: self.events_emitted.load(Ordering::Relaxed),
            events_dropped: self.events_dropped.load(Ordering::Relaxed),
            cycles: self.cycles.load(Ordering::Relaxed),
            last_cycle_duration_ns: self.last_cycle_duration_ns.load(Ordering::Relaxed),
            total_batches: self.total_batches.load(Ordering::Relaxed),
            queries_compiled: self.queries_compiled.load(Ordering::Relaxed),
            queries_cached_plan: self.queries_cached_plan.load(Ordering::Relaxed),
            checkpoints_completed: self.checkpoints_completed.load(Ordering::Relaxed),
            checkpoints_failed: self.checkpoints_failed.load(Ordering::Relaxed),
            last_checkpoint_duration_ms: self.last_checkpoint_duration_ms.load(Ordering::Relaxed),
            checkpoint_epoch: self.checkpoint_epoch.load(Ordering::Relaxed),
            max_state_bytes: self.max_state_bytes.load(Ordering::Relaxed),
            cycle_p50_ns: self.cycle_p50_ns.load(Ordering::Relaxed),
            cycle_p95_ns: self.cycle_p95_ns.load(Ordering::Relaxed),
            cycle_p99_ns: self.cycle_p99_ns.load(Ordering::Relaxed),
            sink_precommit_duration_us: self.sink_precommit_duration_us.load(Ordering::Relaxed),
            sink_commit_duration_us: self.sink_commit_duration_us.load(Ordering::Relaxed),
            last_checkpoint_size_bytes: self.last_checkpoint_size_bytes.load(Ordering::Relaxed),
            last_checkpoint_timestamp_ms: self.last_checkpoint_timestamp_ms.load(Ordering::Relaxed),
        }
    }
}

impl Default for PipelineCounters {
    fn default() -> Self {
        Self::new()
    }
}

/// A point-in-time snapshot of [`PipelineCounters`].
#[derive(Debug, Clone, Copy)]
pub struct CounterSnapshot {
    /// Total events ingested.
    pub events_ingested: u64,
    /// Total events emitted.
    pub events_emitted: u64,
    /// Total events dropped.
    pub events_dropped: u64,
    /// Total processing cycles.
    pub cycles: u64,
    /// Last cycle duration in nanoseconds.
    pub last_cycle_duration_ns: u64,
    /// Total batches processed.
    pub total_batches: u64,
    /// Queries using compiled `PhysicalExpr` (zero-overhead per cycle).
    pub queries_compiled: u64,
    /// Queries using cached logical plan (physical planning per cycle).
    pub queries_cached_plan: u64,
    /// Total checkpoints completed.
    pub checkpoints_completed: u64,
    /// Total checkpoints failed.
    pub checkpoints_failed: u64,
    /// Last checkpoint duration in milliseconds.
    pub last_checkpoint_duration_ms: u64,
    /// Current checkpoint epoch.
    pub checkpoint_epoch: u64,
    /// Maximum configured state bytes per operator (0 = unlimited).
    pub max_state_bytes: u64,
    /// Cycle duration p50 in nanoseconds.
    pub cycle_p50_ns: u64,
    /// Cycle duration p95 in nanoseconds.
    pub cycle_p95_ns: u64,
    /// Cycle duration p99 in nanoseconds.
    pub cycle_p99_ns: u64,
    /// Last sink pre-commit duration in microseconds.
    pub sink_precommit_duration_us: u64,
    /// Last sink commit duration in microseconds.
    pub sink_commit_duration_us: u64,
    /// Last checkpoint size in bytes.
    pub last_checkpoint_size_bytes: u64,
    /// Wall-clock timestamp (ms since epoch) of last successful checkpoint.
    pub last_checkpoint_timestamp_ms: u64,
}

/// Pipeline-wide metrics snapshot.
#[derive(Debug, Clone)]
pub struct PipelineMetrics {
    /// Total events ingested across all sources.
    pub total_events_ingested: u64,
    /// Total events emitted to streams/sinks.
    pub total_events_emitted: u64,
    /// Total events dropped.
    pub total_events_dropped: u64,
    /// Total processing cycles completed.
    pub total_cycles: u64,
    /// Total batches processed.
    pub total_batches: u64,
    /// Time since the pipeline was created.
    pub uptime: Duration,
    /// Current pipeline state.
    pub state: PipelineState,
    /// Duration of the last processing cycle in nanoseconds.
    pub last_cycle_duration_ns: u64,
    /// Number of registered sources.
    pub source_count: usize,
    /// Number of registered streams.
    pub stream_count: usize,
    /// Number of registered sinks.
    pub sink_count: usize,
    /// Global pipeline watermark (minimum across all source watermarks).
    pub pipeline_watermark: i64,
}

/// Metrics for a single registered source.
#[derive(Debug, Clone)]
pub struct SourceMetrics {
    /// Source name.
    pub name: String,
    /// Total events pushed to this source (sequence number).
    pub total_events: u64,
    /// Number of events currently buffered.
    pub pending: usize,
    /// Buffer capacity.
    pub capacity: usize,
    /// Whether the source is experiencing backpressure (>80% full).
    pub is_backpressured: bool,
    /// Current watermark value.
    pub watermark: i64,
    /// Buffer utilization ratio (0.0 to 1.0).
    pub utilization: f64,
}

/// Metrics for a single registered stream.
#[derive(Debug, Clone)]
pub struct StreamMetrics {
    /// Stream name.
    pub name: String,
    /// Total events pushed to this stream.
    pub total_events: u64,
    /// Number of events currently buffered.
    pub pending: usize,
    /// Buffer capacity.
    pub capacity: usize,
    /// Whether the stream is experiencing backpressure (>80% full).
    pub is_backpressured: bool,
    /// Current watermark value.
    pub watermark: i64,
    /// SQL query that defines this stream, if any.
    pub sql: Option<String>,
}

/// Backpressure threshold: a buffer is considered backpressured when
/// its utilization exceeds this fraction.
const BACKPRESSURE_THRESHOLD: f64 = 0.8;

/// Compute whether a buffer is backpressured given pending and capacity.
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub(crate) fn is_backpressured(pending: usize, capacity: usize) -> bool {
    capacity > 0 && (pending as f64 / capacity as f64) > BACKPRESSURE_THRESHOLD
}

/// Compute buffer utilization as a ratio (0.0 to 1.0).
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub(crate) fn utilization(pending: usize, capacity: usize) -> f64 {
    if capacity == 0 {
        0.0
    } else {
        pending as f64 / capacity as f64
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_pipeline_counters_default() {
        let c = PipelineCounters::new();
        let s = c.snapshot();
        assert_eq!(s.events_ingested, 0);
        assert_eq!(s.events_emitted, 0);
        assert_eq!(s.events_dropped, 0);
        assert_eq!(s.cycles, 0);
        assert_eq!(s.total_batches, 0);
        assert_eq!(s.last_cycle_duration_ns, 0);
    }

    #[test]
    fn test_pipeline_counters_increment() {
        let c = PipelineCounters::new();
        c.events_ingested.fetch_add(100, Ordering::Relaxed);
        c.events_emitted.fetch_add(50, Ordering::Relaxed);
        c.events_dropped.fetch_add(3, Ordering::Relaxed);
        c.cycles.fetch_add(10, Ordering::Relaxed);
        c.total_batches.fetch_add(5, Ordering::Relaxed);
        c.last_cycle_duration_ns.store(1234, Ordering::Relaxed);

        let s = c.snapshot();
        assert_eq!(s.events_ingested, 100);
        assert_eq!(s.events_emitted, 50);
        assert_eq!(s.events_dropped, 3);
        assert_eq!(s.cycles, 10);
        assert_eq!(s.total_batches, 5);
        assert_eq!(s.last_cycle_duration_ns, 1234);
    }

    #[test]
    fn test_pipeline_counters_concurrent_access() {
        use std::sync::Arc;
        let c = Arc::new(PipelineCounters::new());
        let c2 = Arc::clone(&c);

        let t = std::thread::spawn(move || {
            for _ in 0..1000 {
                c2.events_ingested.fetch_add(1, Ordering::Relaxed);
            }
        });

        for _ in 0..1000 {
            c.events_ingested.fetch_add(1, Ordering::Relaxed);
        }

        t.join().unwrap();
        assert_eq!(c.events_ingested.load(Ordering::Relaxed), 2000);
    }

    #[test]
    fn test_pipeline_state_display() {
        assert_eq!(PipelineState::Created.to_string(), "Created");
        assert_eq!(PipelineState::Starting.to_string(), "Starting");
        assert_eq!(PipelineState::Running.to_string(), "Running");
        assert_eq!(PipelineState::ShuttingDown.to_string(), "ShuttingDown");
        assert_eq!(PipelineState::Stopped.to_string(), "Stopped");
    }

    #[test]
    fn test_pipeline_state_equality() {
        assert_eq!(PipelineState::Running, PipelineState::Running);
        assert_ne!(PipelineState::Created, PipelineState::Running);
    }

    #[test]
    fn test_backpressure_detection() {
        // Empty buffer: not backpressured
        assert!(!is_backpressured(0, 100));
        // 50% full: not backpressured
        assert!(!is_backpressured(50, 100));
        // 80% full: not backpressured (threshold is >0.8, not >=)
        assert!(!is_backpressured(80, 100));
        // 81% full: backpressured
        assert!(is_backpressured(81, 100));
        // Full: backpressured
        assert!(is_backpressured(100, 100));
        // Zero capacity: not backpressured
        assert!(!is_backpressured(0, 0));
    }

    #[test]
    fn test_utilization() {
        assert!((utilization(0, 100) - 0.0).abs() < f64::EPSILON);
        assert!((utilization(50, 100) - 0.5).abs() < f64::EPSILON);
        assert!((utilization(100, 100) - 1.0).abs() < f64::EPSILON);
        assert!((utilization(0, 0) - 0.0).abs() < f64::EPSILON);
    }

    #[test]
    fn test_pipeline_metrics_clone() {
        let m = PipelineMetrics {
            total_events_ingested: 100,
            total_events_emitted: 50,
            total_events_dropped: 0,
            total_cycles: 10,
            total_batches: 5,
            uptime: Duration::from_secs(60),
            state: PipelineState::Running,
            last_cycle_duration_ns: 500,
            source_count: 2,
            stream_count: 1,
            sink_count: 1,
            pipeline_watermark: i64::MIN,
        };
        let m2 = m.clone();
        assert_eq!(m2.total_events_ingested, 100);
        assert_eq!(m2.state, PipelineState::Running);
    }

    #[test]
    fn test_source_metrics_debug() {
        let m = SourceMetrics {
            name: "trades".to_string(),
            total_events: 1000,
            pending: 50,
            capacity: 1024,
            is_backpressured: false,
            watermark: 12345,
            utilization: 0.05,
        };
        let dbg = format!("{m:?}");
        assert!(dbg.contains("trades"));
        assert!(dbg.contains("1000"));
    }

    #[test]
    fn test_cache_line_separation() {
        let c = PipelineCounters::new();
        let base = &raw const c as usize;
        let ring0_start = &raw const c.events_ingested as usize;
        let ring2_start = &raw const c.checkpoints_completed as usize;

        // Ring 0 starts at offset 0
        assert_eq!(ring0_start - base, 0);
        // Ring 2 starts at least 64 bytes from Ring 0
        assert!(
            ring2_start - ring0_start >= 64,
            "Ring 2 counters must be on a separate cache line (offset: {})",
            ring2_start - ring0_start
        );
    }

    #[test]
    fn test_checkpoint_counters() {
        let c = PipelineCounters::new();
        c.checkpoints_completed.fetch_add(5, Ordering::Relaxed);
        c.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
        c.last_checkpoint_duration_ms.store(250, Ordering::Relaxed);
        c.checkpoint_epoch.store(10, Ordering::Relaxed);

        let s = c.snapshot();
        assert_eq!(s.checkpoints_completed, 5);
        assert_eq!(s.checkpoints_failed, 1);
        assert_eq!(s.last_checkpoint_duration_ms, 250);
        assert_eq!(s.checkpoint_epoch, 10);
    }

    #[test]
    fn test_stream_metrics_with_sql() {
        let m = StreamMetrics {
            name: "avg_price".to_string(),
            total_events: 500,
            pending: 0,
            capacity: 1024,
            is_backpressured: false,
            watermark: 0,
            sql: Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol".to_string()),
        };
        assert_eq!(
            m.sql.as_deref(),
            Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol")
        );
    }
}