use std::sync::Arc;
use crate::db::{
LaminarDB, STATE_CREATED, STATE_RUNNING, STATE_SHUTTING_DOWN, STATE_STARTING, STATE_STOPPED,
};
use crate::error::DbError;
impl LaminarDB {
pub fn pipeline_state(&self) -> &'static str {
match self.state.load(std::sync::atomic::Ordering::Acquire) {
STATE_CREATED => "Created",
STATE_STARTING => "Starting",
STATE_RUNNING => "Running",
STATE_SHUTTING_DOWN => "ShuttingDown",
STATE_STOPPED => "Stopped",
_ => "Unknown",
}
}
#[must_use]
pub fn metrics(&self) -> crate::metrics::PipelineMetrics {
let snap = self.counters.snapshot();
crate::metrics::PipelineMetrics {
total_events_ingested: snap.events_ingested,
total_events_emitted: snap.events_emitted,
total_events_dropped: snap.events_dropped,
total_cycles: snap.cycles,
total_batches: snap.total_batches,
uptime: self.start_time.elapsed(),
state: self.pipeline_state_enum(),
last_cycle_duration_ns: snap.last_cycle_duration_ns,
source_count: self.catalog.list_sources().len(),
stream_count: self.catalog.list_streams().len(),
sink_count: self.catalog.list_sinks().len(),
pipeline_watermark: self.pipeline_watermark(),
mv_updates: snap.mv_updates,
mv_bytes_stored: snap.mv_bytes_stored,
}
}
#[must_use]
pub fn source_metrics(&self, name: &str) -> Option<crate::metrics::SourceMetrics> {
let entry = self.catalog.get_source(name)?;
let pending = entry.source.pending();
let capacity = entry.source.capacity();
Some(crate::metrics::SourceMetrics {
name: entry.name.clone(),
total_events: entry.source.sequence(),
pending,
capacity,
is_backpressured: crate::metrics::is_backpressured(pending, capacity),
watermark: entry.source.current_watermark(),
utilization: crate::metrics::utilization(pending, capacity),
})
}
#[must_use]
pub fn all_source_metrics(&self) -> Vec<crate::metrics::SourceMetrics> {
self.catalog
.list_sources()
.iter()
.filter_map(|name| self.source_metrics(name))
.collect()
}
#[must_use]
pub fn stream_metrics(&self, name: &str) -> Option<crate::metrics::StreamMetrics> {
let entry = self.catalog.get_stream_entry(name)?;
let pending = entry.source.pending();
let capacity = entry.source.capacity();
let sql = self
.connector_manager
.lock()
.streams()
.get(name)
.map(|reg| reg.query_sql.clone());
Some(crate::metrics::StreamMetrics {
name: entry.name.clone(),
total_events: entry.source.sequence(),
pending,
capacity,
is_backpressured: crate::metrics::is_backpressured(pending, capacity),
watermark: entry.source.current_watermark(),
sql,
})
}
#[must_use]
pub fn all_stream_metrics(&self) -> Vec<crate::metrics::StreamMetrics> {
self.catalog
.list_streams()
.iter()
.filter_map(|name| self.stream_metrics(name))
.collect()
}
#[must_use]
pub fn total_events_processed(&self) -> u64 {
let snap = self.counters.snapshot();
snap.events_ingested + snap.events_emitted
}
#[must_use]
pub fn counters(&self) -> &Arc<crate::metrics::PipelineCounters> {
&self.counters
}
#[must_use]
pub fn pipeline_watermark(&self) -> i64 {
self.pipeline_watermark
.load(std::sync::atomic::Ordering::Relaxed)
}
pub(crate) fn pipeline_state_enum(&self) -> crate::metrics::PipelineState {
match self.state.load(std::sync::atomic::Ordering::Acquire) {
STATE_CREATED => crate::metrics::PipelineState::Created,
STATE_STARTING => crate::metrics::PipelineState::Starting,
STATE_RUNNING => crate::metrics::PipelineState::Running,
STATE_SHUTTING_DOWN => crate::metrics::PipelineState::ShuttingDown,
_ => crate::metrics::PipelineState::Stopped,
}
}
pub fn cancel_query(&self, query_id: u64) -> Result<(), DbError> {
if self.catalog.deactivate_query(query_id) {
Ok(())
} else {
Err(DbError::QueryNotFound(query_id.to_string()))
}
}
pub fn source_count(&self) -> usize {
self.catalog.list_sources().len()
}
pub fn sink_count(&self) -> usize {
self.catalog.list_sinks().len()
}
#[must_use]
pub fn cycle_duration_percentiles(&self) -> (u64, u64, u64) {
let snap = self.counters.snapshot();
(snap.cycle_p50_ns, snap.cycle_p95_ns, snap.cycle_p99_ns)
}
pub fn checkpoint_stats_nonblocking(
&self,
) -> Option<crate::checkpoint_coordinator::CheckpointStats> {
let guard = self.coordinator.try_lock().ok()?;
guard
.as_ref()
.map(crate::checkpoint_coordinator::CheckpointCoordinator::stats)
}
pub fn active_query_count(&self) -> usize {
self.catalog
.list_queries()
.iter()
.filter(|(_, _, active)| *active)
.count()
}
}