Skip to main content

tramli_plugins/observability/
mod.rs

1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3use tramli::{FlowEngine, FlowState};
4use crate::api::PluginDescriptor;
5
6/// Telemetry event type.
7#[derive(Debug, Clone)]
8pub enum TelemetryType {
9    Transition,
10    Guard,
11    Error,
12    State,
13}
14
15/// A telemetry event captured by the observability plugin.
16#[derive(Debug, Clone)]
17pub struct TelemetryEvent {
18    pub event_type: TelemetryType,
19    pub flow_id: String,
20    pub flow_name: String,
21    pub data: String,
22    pub timestamp: Instant,
23    pub duration_micros: u64,
24}
25
26/// Telemetry sink trait.
27pub trait TelemetrySink: Send + Sync {
28    fn emit(&self, event: TelemetryEvent);
29    fn events(&self) -> Vec<TelemetryEvent>;
30}
31
32/// In-memory telemetry sink.
33pub struct InMemoryTelemetrySink {
34    log: Mutex<Vec<TelemetryEvent>>,
35}
36
37impl InMemoryTelemetrySink {
38    pub fn new() -> Self {
39        Self { log: Mutex::new(Vec::new()) }
40    }
41}
42
43impl Default for InMemoryTelemetrySink {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl TelemetrySink for InMemoryTelemetrySink {
50    fn emit(&self, event: TelemetryEvent) {
51        self.log.lock().unwrap().push(event);
52    }
53
54    fn events(&self) -> Vec<TelemetryEvent> {
55        self.log.lock().unwrap().clone()
56    }
57}
58
59/// Observability engine plugin — installs transition/error logger hooks.
60pub struct ObservabilityPlugin {
61    sink: Arc<dyn TelemetrySink>,
62}
63
64impl ObservabilityPlugin {
65    pub fn new(sink: Arc<dyn TelemetrySink>) -> Self {
66        Self { sink }
67    }
68
69    pub fn descriptor(&self) -> PluginDescriptor {
70        PluginDescriptor {
71            id: "observability",
72            display_name: "Observability",
73            description: "Telemetry via engine logger hooks",
74        }
75    }
76
77    pub fn install<S: FlowState>(&self, engine: &mut FlowEngine<S>) {
78        self.install_with_options(engine, false);
79    }
80
81    pub fn install_with_options<S: FlowState>(&self, engine: &mut FlowEngine<S>, append: bool) {
82        let prev_transition = if append { engine.take_transition_logger() } else { None };
83        let prev_error = if append { engine.take_error_logger() } else { None };
84        let prev_guard = if append { engine.take_guard_logger() } else { None };
85
86        let sink = self.sink.clone();
87        engine.set_transition_logger(move |entry| {
88            if let Some(ref prev) = prev_transition { prev(entry); }
89            sink.emit(TelemetryEvent {
90                event_type: TelemetryType::Transition,
91                flow_id: entry.flow_id.clone(),
92                flow_name: entry.flow_name.clone(),
93                data: format!("{} -> {} via {}", entry.from, entry.to, entry.trigger),
94                timestamp: Instant::now(),
95                duration_micros: entry.duration_micros,
96            });
97        });
98
99        let sink = self.sink.clone();
100        engine.set_error_logger(move |entry| {
101            if let Some(ref prev) = prev_error { prev(entry); }
102            sink.emit(TelemetryEvent {
103                event_type: TelemetryType::Error,
104                flow_id: entry.flow_id.clone(),
105                flow_name: entry.flow_name.clone(),
106                data: format!("{} -> {} error: {:?}", entry.from, entry.to, entry.cause),
107                timestamp: Instant::now(),
108                duration_micros: entry.duration_micros,
109            });
110        });
111
112        let sink = self.sink.clone();
113        engine.set_guard_logger(move |entry| {
114            if let Some(ref prev) = prev_guard { prev(entry); }
115            sink.emit(TelemetryEvent {
116                event_type: TelemetryType::Guard,
117                flow_id: entry.flow_id.clone(),
118                flow_name: entry.flow_name.clone(),
119                data: format!("guard {} at {}: {}", entry.guard_name, entry.state, entry.result),
120                timestamp: Instant::now(),
121                duration_micros: entry.duration_micros,
122            });
123        });
124    }
125}