use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineState {
Created,
Starting,
Running,
ShuttingDown,
Stopped,
}
impl std::fmt::Display for PipelineState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Created => write!(f, "Created"),
Self::Starting => write!(f, "Starting"),
Self::Running => write!(f, "Running"),
Self::ShuttingDown => write!(f, "ShuttingDown"),
Self::Stopped => write!(f, "Stopped"),
}
}
}
const CACHE_LINE_SIZE: usize = 64;
#[repr(C)]
pub struct PipelineCounters {
pub events_ingested: AtomicU64,
pub events_emitted: AtomicU64,
pub events_dropped: AtomicU64,
pub cycles: AtomicU64,
pub last_cycle_duration_ns: AtomicU64,
pub total_batches: AtomicU64,
pub queries_compiled: AtomicU64,
pub queries_cached_plan: AtomicU64,
_pad: [u8; CACHE_LINE_SIZE - (8 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],
pub checkpoints_completed: AtomicU64,
pub checkpoints_failed: AtomicU64,
pub last_checkpoint_duration_ms: AtomicU64,
pub checkpoint_epoch: AtomicU64,
pub max_state_bytes: AtomicU64,
pub cycle_p50_ns: AtomicU64,
pub cycle_p95_ns: AtomicU64,
pub cycle_p99_ns: AtomicU64,
pub sink_precommit_duration_us: AtomicU64,
pub sink_commit_duration_us: AtomicU64,
pub sink_write_failures: AtomicU64,
pub sink_write_timeouts: AtomicU64,
pub sink_task_channel_closed: AtomicU64,
pub cycles_backpressured: AtomicU64,
pub last_checkpoint_size_bytes: AtomicU64,
pub last_checkpoint_timestamp_ms: AtomicU64,
pub mv_updates: AtomicU64,
pub mv_bytes_stored: AtomicU64,
}
impl PipelineCounters {
#[must_use]
pub fn new() -> Self {
Self {
events_ingested: AtomicU64::new(0),
events_emitted: AtomicU64::new(0),
events_dropped: AtomicU64::new(0),
cycles: AtomicU64::new(0),
last_cycle_duration_ns: AtomicU64::new(0),
total_batches: AtomicU64::new(0),
queries_compiled: AtomicU64::new(0),
queries_cached_plan: AtomicU64::new(0),
_pad: [0; CACHE_LINE_SIZE - (8 * std::mem::size_of::<AtomicU64>()) % CACHE_LINE_SIZE],
checkpoints_completed: AtomicU64::new(0),
checkpoints_failed: AtomicU64::new(0),
last_checkpoint_duration_ms: AtomicU64::new(0),
checkpoint_epoch: AtomicU64::new(0),
max_state_bytes: AtomicU64::new(0),
cycle_p50_ns: AtomicU64::new(0),
cycle_p95_ns: AtomicU64::new(0),
cycle_p99_ns: AtomicU64::new(0),
sink_precommit_duration_us: AtomicU64::new(0),
sink_commit_duration_us: AtomicU64::new(0),
sink_write_failures: AtomicU64::new(0),
sink_write_timeouts: AtomicU64::new(0),
sink_task_channel_closed: AtomicU64::new(0),
cycles_backpressured: AtomicU64::new(0),
last_checkpoint_size_bytes: AtomicU64::new(0),
last_checkpoint_timestamp_ms: AtomicU64::new(0),
mv_updates: AtomicU64::new(0),
mv_bytes_stored: AtomicU64::new(0),
}
}
#[must_use]
pub fn snapshot(&self) -> CounterSnapshot {
CounterSnapshot {
events_ingested: self.events_ingested.load(Ordering::Relaxed),
events_emitted: self.events_emitted.load(Ordering::Relaxed),
events_dropped: self.events_dropped.load(Ordering::Relaxed),
cycles: self.cycles.load(Ordering::Relaxed),
last_cycle_duration_ns: self.last_cycle_duration_ns.load(Ordering::Relaxed),
total_batches: self.total_batches.load(Ordering::Relaxed),
queries_compiled: self.queries_compiled.load(Ordering::Relaxed),
queries_cached_plan: self.queries_cached_plan.load(Ordering::Relaxed),
checkpoints_completed: self.checkpoints_completed.load(Ordering::Relaxed),
checkpoints_failed: self.checkpoints_failed.load(Ordering::Relaxed),
last_checkpoint_duration_ms: self.last_checkpoint_duration_ms.load(Ordering::Relaxed),
checkpoint_epoch: self.checkpoint_epoch.load(Ordering::Relaxed),
max_state_bytes: self.max_state_bytes.load(Ordering::Relaxed),
cycle_p50_ns: self.cycle_p50_ns.load(Ordering::Relaxed),
cycle_p95_ns: self.cycle_p95_ns.load(Ordering::Relaxed),
cycle_p99_ns: self.cycle_p99_ns.load(Ordering::Relaxed),
sink_precommit_duration_us: self.sink_precommit_duration_us.load(Ordering::Relaxed),
sink_commit_duration_us: self.sink_commit_duration_us.load(Ordering::Relaxed),
sink_write_failures: self.sink_write_failures.load(Ordering::Relaxed),
sink_write_timeouts: self.sink_write_timeouts.load(Ordering::Relaxed),
sink_task_channel_closed: self.sink_task_channel_closed.load(Ordering::Relaxed),
cycles_backpressured: self.cycles_backpressured.load(Ordering::Relaxed),
last_checkpoint_size_bytes: self.last_checkpoint_size_bytes.load(Ordering::Relaxed),
last_checkpoint_timestamp_ms: self.last_checkpoint_timestamp_ms.load(Ordering::Relaxed),
mv_updates: self.mv_updates.load(Ordering::Relaxed),
mv_bytes_stored: self.mv_bytes_stored.load(Ordering::Relaxed),
}
}
}
impl Default for PipelineCounters {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy)]
pub struct CounterSnapshot {
pub events_ingested: u64,
pub events_emitted: u64,
pub events_dropped: u64,
pub cycles: u64,
pub last_cycle_duration_ns: u64,
pub total_batches: u64,
pub queries_compiled: u64,
pub queries_cached_plan: u64,
pub checkpoints_completed: u64,
pub checkpoints_failed: u64,
pub last_checkpoint_duration_ms: u64,
pub checkpoint_epoch: u64,
pub max_state_bytes: u64,
pub cycle_p50_ns: u64,
pub cycle_p95_ns: u64,
pub cycle_p99_ns: u64,
pub sink_precommit_duration_us: u64,
pub sink_commit_duration_us: u64,
pub sink_write_failures: u64,
pub sink_write_timeouts: u64,
pub sink_task_channel_closed: u64,
pub cycles_backpressured: u64,
pub last_checkpoint_size_bytes: u64,
pub last_checkpoint_timestamp_ms: u64,
pub mv_updates: u64,
pub mv_bytes_stored: u64,
}
#[derive(Debug, Clone)]
pub struct PipelineMetrics {
pub total_events_ingested: u64,
pub total_events_emitted: u64,
pub total_events_dropped: u64,
pub total_cycles: u64,
pub total_batches: u64,
pub uptime: Duration,
pub state: PipelineState,
pub last_cycle_duration_ns: u64,
pub source_count: usize,
pub stream_count: usize,
pub sink_count: usize,
pub pipeline_watermark: i64,
pub mv_updates: u64,
pub mv_bytes_stored: u64,
}
#[derive(Debug, Clone)]
pub struct SourceMetrics {
pub name: String,
pub total_events: u64,
pub pending: usize,
pub capacity: usize,
pub is_backpressured: bool,
pub watermark: i64,
pub utilization: f64,
}
#[derive(Debug, Clone)]
pub struct StreamMetrics {
pub name: String,
pub total_events: u64,
pub pending: usize,
pub capacity: usize,
pub is_backpressured: bool,
pub watermark: i64,
pub sql: Option<String>,
}
const BACKPRESSURE_THRESHOLD: f64 = 0.8;
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub(crate) fn is_backpressured(pending: usize, capacity: usize) -> bool {
capacity > 0 && (pending as f64 / capacity as f64) > BACKPRESSURE_THRESHOLD
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub(crate) fn utilization(pending: usize, capacity: usize) -> f64 {
if capacity == 0 {
0.0
} else {
pending as f64 / capacity as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_counters_default() {
let c = PipelineCounters::new();
let s = c.snapshot();
assert_eq!(s.events_ingested, 0);
assert_eq!(s.events_emitted, 0);
assert_eq!(s.events_dropped, 0);
assert_eq!(s.cycles, 0);
assert_eq!(s.total_batches, 0);
assert_eq!(s.last_cycle_duration_ns, 0);
}
#[test]
fn test_pipeline_counters_increment() {
let c = PipelineCounters::new();
c.events_ingested.fetch_add(100, Ordering::Relaxed);
c.events_emitted.fetch_add(50, Ordering::Relaxed);
c.events_dropped.fetch_add(3, Ordering::Relaxed);
c.cycles.fetch_add(10, Ordering::Relaxed);
c.total_batches.fetch_add(5, Ordering::Relaxed);
c.last_cycle_duration_ns.store(1234, Ordering::Relaxed);
let s = c.snapshot();
assert_eq!(s.events_ingested, 100);
assert_eq!(s.events_emitted, 50);
assert_eq!(s.events_dropped, 3);
assert_eq!(s.cycles, 10);
assert_eq!(s.total_batches, 5);
assert_eq!(s.last_cycle_duration_ns, 1234);
}
#[test]
fn test_pipeline_counters_concurrent_access() {
use std::sync::Arc;
let c = Arc::new(PipelineCounters::new());
let c2 = Arc::clone(&c);
let t = std::thread::spawn(move || {
for _ in 0..1000 {
c2.events_ingested.fetch_add(1, Ordering::Relaxed);
}
});
for _ in 0..1000 {
c.events_ingested.fetch_add(1, Ordering::Relaxed);
}
t.join().unwrap();
assert_eq!(c.events_ingested.load(Ordering::Relaxed), 2000);
}
#[test]
fn test_pipeline_state_display() {
assert_eq!(PipelineState::Created.to_string(), "Created");
assert_eq!(PipelineState::Starting.to_string(), "Starting");
assert_eq!(PipelineState::Running.to_string(), "Running");
assert_eq!(PipelineState::ShuttingDown.to_string(), "ShuttingDown");
assert_eq!(PipelineState::Stopped.to_string(), "Stopped");
}
#[test]
fn test_pipeline_state_equality() {
assert_eq!(PipelineState::Running, PipelineState::Running);
assert_ne!(PipelineState::Created, PipelineState::Running);
}
#[test]
fn test_backpressure_detection() {
assert!(!is_backpressured(0, 100));
assert!(!is_backpressured(50, 100));
assert!(!is_backpressured(80, 100));
assert!(is_backpressured(81, 100));
assert!(is_backpressured(100, 100));
assert!(!is_backpressured(0, 0));
}
#[test]
fn test_utilization() {
assert!((utilization(0, 100) - 0.0).abs() < f64::EPSILON);
assert!((utilization(50, 100) - 0.5).abs() < f64::EPSILON);
assert!((utilization(100, 100) - 1.0).abs() < f64::EPSILON);
assert!((utilization(0, 0) - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_pipeline_metrics_clone() {
let m = PipelineMetrics {
total_events_ingested: 100,
total_events_emitted: 50,
total_events_dropped: 0,
total_cycles: 10,
total_batches: 5,
uptime: Duration::from_secs(60),
state: PipelineState::Running,
last_cycle_duration_ns: 500,
source_count: 2,
stream_count: 1,
sink_count: 1,
pipeline_watermark: i64::MIN,
mv_updates: 0,
mv_bytes_stored: 0,
};
let m2 = m.clone();
assert_eq!(m2.total_events_ingested, 100);
assert_eq!(m2.state, PipelineState::Running);
}
#[test]
fn test_source_metrics_debug() {
let m = SourceMetrics {
name: "trades".to_string(),
total_events: 1000,
pending: 50,
capacity: 1024,
is_backpressured: false,
watermark: 12345,
utilization: 0.05,
};
let dbg = format!("{m:?}");
assert!(dbg.contains("trades"));
assert!(dbg.contains("1000"));
}
#[test]
fn test_cache_line_separation() {
let c = PipelineCounters::new();
let base = &raw const c as usize;
let ring0_start = &raw const c.events_ingested as usize;
let ring2_start = &raw const c.checkpoints_completed as usize;
assert_eq!(ring0_start - base, 0);
assert!(
ring2_start - ring0_start >= 64,
"Ring 2 counters must be on a separate cache line (offset: {})",
ring2_start - ring0_start
);
}
#[test]
fn test_checkpoint_counters() {
let c = PipelineCounters::new();
c.checkpoints_completed.fetch_add(5, Ordering::Relaxed);
c.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
c.last_checkpoint_duration_ms.store(250, Ordering::Relaxed);
c.checkpoint_epoch.store(10, Ordering::Relaxed);
let s = c.snapshot();
assert_eq!(s.checkpoints_completed, 5);
assert_eq!(s.checkpoints_failed, 1);
assert_eq!(s.last_checkpoint_duration_ms, 250);
assert_eq!(s.checkpoint_epoch, 10);
}
#[test]
fn test_stream_metrics_with_sql() {
let m = StreamMetrics {
name: "avg_price".to_string(),
total_events: 500,
pending: 0,
capacity: 1024,
is_backpressured: false,
watermark: 0,
sql: Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol".to_string()),
};
assert_eq!(
m.sql.as_deref(),
Some("SELECT symbol, AVG(price) FROM trades GROUP BY symbol")
);
}
}