Skip to main content

tramli_plugins/observability/
mod.rs

1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3use tramli::{FlowEngine, FlowState};
4use crate::api::PluginDescriptor;
5
6/// Telemetry event type.
7#[derive(Debug, Clone)]
8pub enum TelemetryType {
9    Transition,
10    Guard,
11    Error,
12    State,
13}
14
15/// A telemetry event captured by the observability plugin.
16#[derive(Debug, Clone)]
17pub struct TelemetryEvent {
18    pub event_type: TelemetryType,
19    pub flow_id: String,
20    pub flow_name: String,
21    pub data: String,
22    pub timestamp: Instant,
23    pub duration_micros: u64,
24}
25
26/// Telemetry sink trait.
27pub trait TelemetrySink: Send + Sync {
28    fn emit(&self, event: TelemetryEvent);
29    fn events(&self) -> Vec<TelemetryEvent>;
30}
31
32/// In-memory telemetry sink.
33pub struct InMemoryTelemetrySink {
34    log: Mutex<Vec<TelemetryEvent>>,
35}
36
37impl InMemoryTelemetrySink {
38    pub fn new() -> Self {
39        Self { log: Mutex::new(Vec::new()) }
40    }
41}
42
43impl Default for InMemoryTelemetrySink {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl TelemetrySink for InMemoryTelemetrySink {
50    fn emit(&self, event: TelemetryEvent) {
51        self.log.lock().unwrap().push(event);
52    }
53
54    fn events(&self) -> Vec<TelemetryEvent> {
55        self.log.lock().unwrap().clone()
56    }
57}
58
59/// No-op telemetry sink for benchmarking baseline.
60pub struct NoopTelemetrySink;
61
62impl TelemetrySink for NoopTelemetrySink {
63    fn emit(&self, _event: TelemetryEvent) {}
64    fn events(&self) -> Vec<TelemetryEvent> { Vec::new() }
65}
66
67/// Observability engine plugin — installs transition/error logger hooks.
68pub struct ObservabilityPlugin {
69    sink: Arc<dyn TelemetrySink>,
70}
71
72impl ObservabilityPlugin {
73    pub fn new(sink: Arc<dyn TelemetrySink>) -> Self {
74        Self { sink }
75    }
76
77    pub fn descriptor(&self) -> PluginDescriptor {
78        PluginDescriptor {
79            id: "observability",
80            display_name: "Observability",
81            description: "Telemetry via engine logger hooks",
82        }
83    }
84
85    pub fn install<S: FlowState>(&self, engine: &mut FlowEngine<S>) {
86        self.install_with_options(engine, false);
87    }
88
89    pub fn install_with_options<S: FlowState>(&self, engine: &mut FlowEngine<S>, append: bool) {
90        let prev_transition = if append { engine.take_transition_logger() } else { None };
91        let prev_error = if append { engine.take_error_logger() } else { None };
92        let prev_guard = if append { engine.take_guard_logger() } else { None };
93
94        let sink = self.sink.clone();
95        engine.set_transition_logger(move |entry| {
96            if let Some(ref prev) = prev_transition { prev(entry); }
97            sink.emit(TelemetryEvent {
98                event_type: TelemetryType::Transition,
99                flow_id: entry.flow_id.clone(),
100                flow_name: entry.flow_name.clone(),
101                data: format!("{} -> {} via {}", entry.from, entry.to, entry.trigger),
102                timestamp: Instant::now(),
103                duration_micros: entry.duration_micros,
104            });
105        });
106
107        let sink = self.sink.clone();
108        engine.set_error_logger(move |entry| {
109            if let Some(ref prev) = prev_error { prev(entry); }
110            sink.emit(TelemetryEvent {
111                event_type: TelemetryType::Error,
112                flow_id: entry.flow_id.clone(),
113                flow_name: entry.flow_name.clone(),
114                data: format!("{} -> {} error: {:?}", entry.from, entry.to, entry.cause),
115                timestamp: Instant::now(),
116                duration_micros: entry.duration_micros,
117            });
118        });
119
120        let sink = self.sink.clone();
121        engine.set_guard_logger(move |entry| {
122            if let Some(ref prev) = prev_guard { prev(entry); }
123            sink.emit(TelemetryEvent {
124                event_type: TelemetryType::Guard,
125                flow_id: entry.flow_id.clone(),
126                flow_name: entry.flow_name.clone(),
127                data: format!("guard {} at {}: {}", entry.guard_name, entry.state, entry.result),
128                timestamp: Instant::now(),
129                duration_micros: entry.duration_micros,
130            });
131        });
132    }
133}