use super::*;
fn runtime_state_label(state: RuntimeState) -> &'static str {
match state {
RuntimeState::Idle => "idle",
RuntimeState::Running => "running",
RuntimeState::Stopping => "stopping",
RuntimeState::Stopped => "stopped",
}
}
impl<C, H> CdcRuntime<C, H>
where
C: crate::checkpoint::Checkpoint + Send + Sync + 'static,
H: SchemaHistory + Send + Sync + 'static,
{
pub fn state(&self) -> RuntimeState {
self.state
}
pub fn source_capabilities(&self) -> ConnectorCapabilities {
self.config.source.capabilities()
}
pub fn admin_snapshot(&self) -> RuntimeAdminSnapshot {
let now_ms = now_millis();
let checkpoint_age_ms = self
.last_checkpoint_saved_at_ms
.map(|checkpoint_time| now_ms.saturating_sub(checkpoint_time));
RuntimeAdminSnapshot {
source_type: self.config.source.source_type().map(str::to_string),
state: runtime_state_label(self.state).to_string(),
readiness: self.state == RuntimeState::Running
&& (matches!(self.config.source, RuntimeSourceConfig::Disabled)
|| self.stream.is_some()
|| self.snapshot.is_some()),
liveness: self.state != RuntimeState::Stopped,
capabilities: self.source_capabilities(),
buffer_depth: self.buffered_events.len()
+ self.injected_events.len()
+ self.pending_source_events.len(),
in_flight_events: self
.pending_delivery
.as_ref()
.map_or(0, |pending| pending.events.len()),
snapshot_active: self.snapshot.is_some(),
stream_active: self.stream.is_some(),
handoff_complete: self.handoff_complete,
total_events_polled: self.total_events_polled,
total_events_committed: self.total_events_committed,
total_events_deduplicated: self.total_events_deduplicated,
started_at_ms: self.started_at_ms,
last_poll_at_ms: self.last_poll_at_ms,
last_commit_at_ms: self.last_commit_at_ms,
checkpoint_age_ms,
replication_lag_ms: self.estimate_replication_lag_ms(),
}
}
pub(super) fn estimate_replication_lag_ms(&self) -> Option<u64> {
let now = now_millis();
let source_ts = self.last_source_event_ts_ms?;
Some(now.saturating_sub(source_ts.min(now)))
}
pub fn admin_snapshot_json(&self) -> Result<String> {
serde_json::to_string(&self.admin_snapshot())
.map_err(|error| Error::SerializationError(error.to_string()))
}
pub fn write_admin_metrics_prometheus<W: std::io::Write>(
&self,
w: &mut W,
) -> std::io::Result<()> {
let admin = self.admin_snapshot();
writeln!(
w,
"# HELP cdc_runtime_readiness Runtime readiness (1=ready, 0=not ready).\n\
# TYPE cdc_runtime_readiness gauge\n\
cdc_runtime_readiness{{state=\"{}\"}} {}",
admin.state,
u8::from(admin.readiness)
)?;
writeln!(
w,
"# HELP cdc_runtime_liveness Runtime liveness (1=alive, 0=stopped).\n\
# TYPE cdc_runtime_liveness gauge\n\
cdc_runtime_liveness{{state=\"{}\"}} {}",
admin.state,
u8::from(admin.liveness)
)?;
writeln!(
w,
"# HELP cdc_runtime_buffer_depth Number of buffered events waiting for delivery.\n\
# TYPE cdc_runtime_buffer_depth gauge\n\
cdc_runtime_buffer_depth {}",
admin.buffer_depth
)?;
writeln!(
w,
"# HELP cdc_runtime_in_flight_events Number of delivered but uncommitted events.\n\
# TYPE cdc_runtime_in_flight_events gauge\n\
cdc_runtime_in_flight_events {}",
admin.in_flight_events
)?;
writeln!(
w,
"# HELP cdc_runtime_events_polled_total Total events delivered by runtime batches.\n\
# TYPE cdc_runtime_events_polled_total counter\n\
cdc_runtime_events_polled_total {}",
admin.total_events_polled
)?;
writeln!(
w,
"# HELP cdc_runtime_events_committed_total Total events acknowledged and checkpointed.\n\
# TYPE cdc_runtime_events_committed_total counter\n\
cdc_runtime_events_committed_total {}",
admin.total_events_committed
)?;
writeln!(
w,
"# HELP cdc_runtime_events_deduplicated_total Total events suppressed by runtime idempotency guard.\n\
# TYPE cdc_runtime_events_deduplicated_total counter\n\
cdc_runtime_events_deduplicated_total {}",
admin.total_events_deduplicated
)?;
if let Some(checkpoint_age_ms) = admin.checkpoint_age_ms {
writeln!(
w,
"# HELP cdc_runtime_checkpoint_age_ms Age of last durable checkpoint in milliseconds.\n\
# TYPE cdc_runtime_checkpoint_age_ms gauge\n\
cdc_runtime_checkpoint_age_ms {}",
checkpoint_age_ms
)?;
}
if let Some(lag_ms) = admin.replication_lag_ms {
writeln!(
w,
"# HELP cdc_runtime_replication_lag_ms Estimated replication lag in milliseconds (source event timestamp preferred; poll recency fallback).\n\
# TYPE cdc_runtime_replication_lag_ms gauge\n\
cdc_runtime_replication_lag_ms {}",
lag_ms
)?;
}
writeln!(
w,
"# HELP cdc_runtime_source_capability Connector capability flags.\n\
# TYPE cdc_runtime_source_capability gauge"
)?;
for (name, enabled) in [
("snapshot", admin.capabilities.snapshot),
("handoff", admin.capabilities.handoff),
("ddl_capture", admin.capabilities.ddl_capture),
("heartbeat", admin.capabilities.heartbeat),
("tls", admin.capabilities.tls),
(
"schema_introspection",
admin.capabilities.schema_introspection,
),
("truncate", admin.capabilities.truncate),
(
"incremental_snapshot",
admin.capabilities.incremental_snapshot,
),
] {
writeln!(
w,
"cdc_runtime_source_capability{{capability=\"{name}\"}} {}",
u8::from(enabled)
)?;
}
Ok(())
}
pub fn admin_metrics_prometheus(&self) -> String {
let mut buf = Vec::with_capacity(2048);
self.write_admin_metrics_prometheus(&mut buf)
.expect("writing to Vec<u8> is infallible");
String::from_utf8(buf).expect("prometheus metrics output is always valid UTF-8")
}
}