use std::panic::RefUnwindSafe;
use crate::runtimes::observer::{
CheckpointLoadMeta, CheckpointSaveMeta, EventBusEmitMeta, InvocationFinishMeta,
InvocationStartMeta, NodeFinishMeta, NodeOutcome, RuntimeObserver,
};
#[derive(Debug, Clone, Copy)]
pub struct MetricsObserver;
impl RefUnwindSafe for MetricsObserver {}
impl RuntimeObserver for MetricsObserver {
fn on_invocation_start(&self, _meta: &InvocationStartMeta<'_>) {
}
fn on_invocation_finish(&self, meta: &InvocationFinishMeta<'_>) {
let outcome = match meta.outcome {
crate::runtimes::observer::InvocationOutcome::Completed => "completed",
crate::runtimes::observer::InvocationOutcome::Error => "error",
};
metrics::counter!("weavegraph.invocation.count", "outcome" => outcome).increment(1);
metrics::histogram!("weavegraph.invocation.duration_ms").record(meta.duration_ms as f64);
}
fn on_node_finish(&self, meta: &NodeFinishMeta<'_>) {
let node = meta.node_kind.encode().to_string();
let outcome = match meta.outcome {
NodeOutcome::Completed => "completed",
NodeOutcome::Error => "error",
NodeOutcome::Skipped => "skipped",
};
metrics::counter!(
"weavegraph.node.invocations",
"node" => node.clone(),
"outcome" => outcome
)
.increment(1);
if meta.outcome != NodeOutcome::Skipped {
metrics::histogram!("weavegraph.node.step_duration_ms", "node" => node)
.record(meta.step_duration_ms as f64);
}
}
fn on_checkpoint_load(&self, meta: &CheckpointLoadMeta<'_>) {
metrics::counter!(
"weavegraph.checkpoint.loads",
"backend" => meta.backend.to_string()
)
.increment(1);
}
fn on_checkpoint_save(&self, meta: &CheckpointSaveMeta<'_>) {
let backend = meta.backend.to_string();
metrics::counter!("weavegraph.checkpoint.saves", "backend" => backend.clone()).increment(1);
metrics::histogram!(
"weavegraph.checkpoint.save_duration_ms",
"backend" => backend
)
.record(meta.duration_ms as f64);
}
fn on_event_bus_emit(&self, meta: &EventBusEmitMeta<'_>) {
metrics::counter!(
"weavegraph.event_bus.emits",
"scope" => meta.scope.to_string()
)
.increment(1);
}
}