Skip to main content

tramli_plugins/observability/
mod.rs

1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3use tramli::{FlowEngine, FlowState};
4use crate::api::PluginDescriptor;
5
6/// Telemetry event type.
7#[derive(Debug, Clone)]
8pub enum TelemetryType {
9    Transition,
10    Guard,
11    Error,
12    State,
13}
14
15/// A telemetry event captured by the observability plugin.
16#[derive(Debug, Clone)]
17pub struct TelemetryEvent {
18    pub event_type: TelemetryType,
19    pub flow_id: String,
20    pub data: String,
21    pub timestamp: Instant,
22}
23
24/// Telemetry sink trait.
25pub trait TelemetrySink: Send + Sync {
26    fn emit(&self, event: TelemetryEvent);
27    fn events(&self) -> Vec<TelemetryEvent>;
28}
29
30/// In-memory telemetry sink.
31pub struct InMemoryTelemetrySink {
32    log: Mutex<Vec<TelemetryEvent>>,
33}
34
35impl InMemoryTelemetrySink {
36    pub fn new() -> Self {
37        Self { log: Mutex::new(Vec::new()) }
38    }
39}
40
41impl Default for InMemoryTelemetrySink {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl TelemetrySink for InMemoryTelemetrySink {
48    fn emit(&self, event: TelemetryEvent) {
49        self.log.lock().unwrap().push(event);
50    }
51
52    fn events(&self) -> Vec<TelemetryEvent> {
53        self.log.lock().unwrap().clone()
54    }
55}
56
57/// Observability engine plugin — installs transition/error logger hooks.
58pub struct ObservabilityPlugin {
59    sink: Arc<dyn TelemetrySink>,
60}
61
62impl ObservabilityPlugin {
63    pub fn new(sink: Arc<dyn TelemetrySink>) -> Self {
64        Self { sink }
65    }
66
67    pub fn descriptor(&self) -> PluginDescriptor {
68        PluginDescriptor {
69            id: "observability",
70            display_name: "Observability",
71            description: "Telemetry via engine logger hooks",
72        }
73    }
74
75    pub fn install<S: FlowState>(&self, engine: &mut FlowEngine<S>) {
76        let sink = self.sink.clone();
77        engine.set_transition_logger(move |entry| {
78            sink.emit(TelemetryEvent {
79                event_type: TelemetryType::Transition,
80                flow_id: entry.flow_id.clone(),
81                data: format!("{} -> {} via {}", entry.from, entry.to, entry.trigger),
82                timestamp: Instant::now(),
83            });
84        });
85
86        let sink = self.sink.clone();
87        engine.set_error_logger(move |entry| {
88            sink.emit(TelemetryEvent {
89                event_type: TelemetryType::Error,
90                flow_id: entry.flow_id.clone(),
91                data: format!("{} -> {} error: {:?}", entry.from, entry.to, entry.cause),
92                timestamp: Instant::now(),
93            });
94        });
95
96        let sink = self.sink.clone();
97        engine.set_guard_logger(move |entry| {
98            sink.emit(TelemetryEvent {
99                event_type: TelemetryType::Guard,
100                flow_id: entry.flow_id.clone(),
101                data: format!("guard {} at {}: {}", entry.guard_name, entry.state, entry.result),
102                timestamp: Instant::now(),
103            });
104        });
105    }
106}