1use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PipelineState {
12 Created,
14 Starting,
16 Running,
18 ShuttingDown,
20 Stopped,
22}
23
24impl std::fmt::Display for PipelineState {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 Self::Created => write!(f, "Created"),
28 Self::Starting => write!(f, "Starting"),
29 Self::Running => write!(f, "Running"),
30 Self::ShuttingDown => write!(f, "ShuttingDown"),
31 Self::Stopped => write!(f, "Stopped"),
32 }
33 }
34}
35
36const CACHE_LINE_SIZE: usize = 64;
38
39#[repr(C)]
52pub struct PipelineCounters {
53 pub events_ingested: AtomicU64,
56 pub events_emitted: AtomicU64,
58 pub events_dropped: AtomicU64,
60 pub cycles: AtomicU64,
62 pub last_cycle_duration_ns: AtomicU64,
64 pub total_batches: AtomicU64,
66
67 _pad: [u8; CACHE_LINE_SIZE - (6 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],
71
72 pub checkpoints_completed: AtomicU64,
75 pub checkpoints_failed: AtomicU64,
77 pub last_checkpoint_duration_ms: AtomicU64,
79 pub checkpoint_epoch: AtomicU64,
81}
82
83impl PipelineCounters {
84 #[must_use]
86 pub fn new() -> Self {
87 Self {
88 events_ingested: AtomicU64::new(0),
89 events_emitted: AtomicU64::new(0),
90 events_dropped: AtomicU64::new(0),
91 cycles: AtomicU64::new(0),
92 last_cycle_duration_ns: AtomicU64::new(0),
93 total_batches: AtomicU64::new(0),
94 _pad: [0; CACHE_LINE_SIZE - (6 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],
95 checkpoints_completed: AtomicU64::new(0),
96 checkpoints_failed: AtomicU64::new(0),
97 last_checkpoint_duration_ms: AtomicU64::new(0),
98 checkpoint_epoch: AtomicU64::new(0),
99 }
100 }
101
102 #[must_use]
104 pub fn snapshot(&self) -> CounterSnapshot {
105 CounterSnapshot {
106 events_ingested: self.events_ingested.load(Ordering::Relaxed),
107 events_emitted: self.events_emitted.load(Ordering::Relaxed),
108 events_dropped: self.events_dropped.load(Ordering::Relaxed),
109 cycles: self.cycles.load(Ordering::Relaxed),
110 last_cycle_duration_ns: self.last_cycle_duration_ns.load(Ordering::Relaxed),
111 total_batches: self.total_batches.load(Ordering::Relaxed),
112 checkpoints_completed: self.checkpoints_completed.load(Ordering::Relaxed),
113 checkpoints_failed: self.checkpoints_failed.load(Ordering::Relaxed),
114 last_checkpoint_duration_ms: self.last_checkpoint_duration_ms.load(Ordering::Relaxed),
115 checkpoint_epoch: self.checkpoint_epoch.load(Ordering::Relaxed),
116 }
117 }
118}
119
120impl Default for PipelineCounters {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126#[derive(Debug, Clone, Copy)]
128pub struct CounterSnapshot {
129 pub events_ingested: u64,
131 pub events_emitted: u64,
133 pub events_dropped: u64,
135 pub cycles: u64,
137 pub last_cycle_duration_ns: u64,
139 pub total_batches: u64,
141 pub checkpoints_completed: u64,
143 pub checkpoints_failed: u64,
145 pub last_checkpoint_duration_ms: u64,
147 pub checkpoint_epoch: u64,
149}
150
151#[derive(Debug, Clone)]
153pub struct PipelineMetrics {
154 pub total_events_ingested: u64,
156 pub total_events_emitted: u64,
158 pub total_events_dropped: u64,
160 pub total_cycles: u64,
162 pub total_batches: u64,
164 pub uptime: Duration,
166 pub state: PipelineState,
168 pub last_cycle_duration_ns: u64,
170 pub source_count: usize,
172 pub stream_count: usize,
174 pub sink_count: usize,
176 pub pipeline_watermark: i64,
178}
179
180#[derive(Debug, Clone)]
182pub struct SourceMetrics {
183 pub name: String,
185 pub total_events: u64,
187 pub pending: usize,
189 pub capacity: usize,
191 pub is_backpressured: bool,
193 pub watermark: i64,
195 pub utilization: f64,
197}
198
199#[derive(Debug, Clone)]
201pub struct StreamMetrics {
202 pub name: String,
204 pub total_events: u64,
206 pub pending: usize,
208 pub capacity: usize,
210 pub is_backpressured: bool,
212 pub watermark: i64,
214 pub sql: Option<String>,
216}
217
218const BACKPRESSURE_THRESHOLD: f64 = 0.8;
221
222#[must_use]
224#[allow(clippy::cast_precision_loss)]
225pub(crate) fn is_backpressured(pending: usize, capacity: usize) -> bool {
226 capacity > 0 && (pending as f64 / capacity as f64) > BACKPRESSURE_THRESHOLD
227}
228
229#[must_use]
231#[allow(clippy::cast_precision_loss)]
232pub(crate) fn utilization(pending: usize, capacity: usize) -> f64 {
233 if capacity == 0 {
234 0.0
235 } else {
236 pending as f64 / capacity as f64
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[test]
245 fn test_pipeline_counters_default() {
246 let c = PipelineCounters::new();
247 let s = c.snapshot();
248 assert_eq!(s.events_ingested, 0);
249 assert_eq!(s.events_emitted, 0);
250 assert_eq!(s.events_dropped, 0);
251 assert_eq!(s.cycles, 0);
252 assert_eq!(s.total_batches, 0);
253 assert_eq!(s.last_cycle_duration_ns, 0);
254 }
255
256 #[test]
257 fn test_pipeline_counters_increment() {
258 let c = PipelineCounters::new();
259 c.events_ingested.fetch_add(100, Ordering::Relaxed);
260 c.events_emitted.fetch_add(50, Ordering::Relaxed);
261 c.events_dropped.fetch_add(3, Ordering::Relaxed);
262 c.cycles.fetch_add(10, Ordering::Relaxed);
263 c.total_batches.fetch_add(5, Ordering::Relaxed);
264 c.last_cycle_duration_ns.store(1234, Ordering::Relaxed);
265
266 let s = c.snapshot();
267 assert_eq!(s.events_ingested, 100);
268 assert_eq!(s.events_emitted, 50);
269 assert_eq!(s.events_dropped, 3);
270 assert_eq!(s.cycles, 10);
271 assert_eq!(s.total_batches, 5);
272 assert_eq!(s.last_cycle_duration_ns, 1234);
273 }
274
275 #[test]
276 fn test_pipeline_counters_concurrent_access() {
277 use std::sync::Arc;
278 let c = Arc::new(PipelineCounters::new());
279 let c2 = Arc::clone(&c);
280
281 let t = std::thread::spawn(move || {
282 for _ in 0..1000 {
283 c2.events_ingested.fetch_add(1, Ordering::Relaxed);
284 }
285 });
286
287 for _ in 0..1000 {
288 c.events_ingested.fetch_add(1, Ordering::Relaxed);
289 }
290
291 t.join().unwrap();
292 assert_eq!(c.events_ingested.load(Ordering::Relaxed), 2000);
293 }
294
295 #[test]
296 fn test_pipeline_state_display() {
297 assert_eq!(PipelineState::Created.to_string(), "Created");
298 assert_eq!(PipelineState::Starting.to_string(), "Starting");
299 assert_eq!(PipelineState::Running.to_string(), "Running");
300 assert_eq!(PipelineState::ShuttingDown.to_string(), "ShuttingDown");
301 assert_eq!(PipelineState::Stopped.to_string(), "Stopped");
302 }
303
304 #[test]
305 fn test_pipeline_state_equality() {
306 assert_eq!(PipelineState::Running, PipelineState::Running);
307 assert_ne!(PipelineState::Created, PipelineState::Running);
308 }
309
310 #[test]
311 fn test_backpressure_detection() {
312 assert!(!is_backpressured(0, 100));
314 assert!(!is_backpressured(50, 100));
316 assert!(!is_backpressured(80, 100));
318 assert!(is_backpressured(81, 100));
320 assert!(is_backpressured(100, 100));
322 assert!(!is_backpressured(0, 0));
324 }
325
326 #[test]
327 fn test_utilization() {
328 assert!((utilization(0, 100) - 0.0).abs() < f64::EPSILON);
329 assert!((utilization(50, 100) - 0.5).abs() < f64::EPSILON);
330 assert!((utilization(100, 100) - 1.0).abs() < f64::EPSILON);
331 assert!((utilization(0, 0) - 0.0).abs() < f64::EPSILON);
332 }
333
334 #[test]
335 fn test_pipeline_metrics_clone() {
336 let m = PipelineMetrics {
337 total_events_ingested: 100,
338 total_events_emitted: 50,
339 total_events_dropped: 0,
340 total_cycles: 10,
341 total_batches: 5,
342 uptime: Duration::from_secs(60),
343 state: PipelineState::Running,
344 last_cycle_duration_ns: 500,
345 source_count: 2,
346 stream_count: 1,
347 sink_count: 1,
348 pipeline_watermark: i64::MIN,
349 };
350 let m2 = m.clone();
351 assert_eq!(m2.total_events_ingested, 100);
352 assert_eq!(m2.state, PipelineState::Running);
353 }
354
355 #[test]
356 fn test_source_metrics_debug() {
357 let m = SourceMetrics {
358 name: "trades".to_string(),
359 total_events: 1000,
360 pending: 50,
361 capacity: 1024,
362 is_backpressured: false,
363 watermark: 12345,
364 utilization: 0.05,
365 };
366 let dbg = format!("{m:?}");
367 assert!(dbg.contains("trades"));
368 assert!(dbg.contains("1000"));
369 }
370
371 #[test]
372 fn test_cache_line_separation() {
373 let c = PipelineCounters::new();
374 let base = &raw const c as usize;
375 let ring0_start = &raw const c.events_ingested as usize;
376 let ring2_start = &raw const c.checkpoints_completed as usize;
377
378 assert_eq!(ring0_start - base, 0);
380 assert!(
382 ring2_start - ring0_start >= 64,
383 "Ring 2 counters must be on a separate cache line (offset: {})",
384 ring2_start - ring0_start
385 );
386 }
387
388 #[test]
389 fn test_checkpoint_counters() {
390 let c = PipelineCounters::new();
391 c.checkpoints_completed.fetch_add(5, Ordering::Relaxed);
392 c.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
393 c.last_checkpoint_duration_ms.store(250, Ordering::Relaxed);
394 c.checkpoint_epoch.store(10, Ordering::Relaxed);
395
396 let s = c.snapshot();
397 assert_eq!(s.checkpoints_completed, 5);
398 assert_eq!(s.checkpoints_failed, 1);
399 assert_eq!(s.last_checkpoint_duration_ms, 250);
400 assert_eq!(s.checkpoint_epoch, 10);
401 }
402
403 #[test]
404 fn test_stream_metrics_with_sql() {
405 let m = StreamMetrics {
406 name: "avg_price".to_string(),
407 total_events: 500,
408 pending: 0,
409 capacity: 1024,
410 is_backpressured: false,
411 watermark: 0,
412 sql: Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol".to_string()),
413 };
414 assert_eq!(
415 m.sql.as_deref(),
416 Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol")
417 );
418 }
419}