tramli_plugins/observability/
mod.rs1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3use tramli::{FlowEngine, FlowState};
4use crate::api::PluginDescriptor;
5
6#[derive(Debug, Clone)]
8pub enum TelemetryType {
9 Transition,
10 Guard,
11 Error,
12 State,
13}
14
15#[derive(Debug, Clone)]
17pub struct TelemetryEvent {
18 pub event_type: TelemetryType,
19 pub flow_id: String,
20 pub flow_name: String,
21 pub data: String,
22 pub timestamp: Instant,
23 pub duration_micros: u64,
24}
25
26pub trait TelemetrySink: Send + Sync {
28 fn emit(&self, event: TelemetryEvent);
29 fn events(&self) -> Vec<TelemetryEvent>;
30}
31
32pub struct InMemoryTelemetrySink {
34 log: Mutex<Vec<TelemetryEvent>>,
35}
36
37impl InMemoryTelemetrySink {
38 pub fn new() -> Self {
39 Self { log: Mutex::new(Vec::new()) }
40 }
41}
42
43impl Default for InMemoryTelemetrySink {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl TelemetrySink for InMemoryTelemetrySink {
50 fn emit(&self, event: TelemetryEvent) {
51 self.log.lock().unwrap().push(event);
52 }
53
54 fn events(&self) -> Vec<TelemetryEvent> {
55 self.log.lock().unwrap().clone()
56 }
57}
58
59pub struct NoopTelemetrySink;
61
62impl TelemetrySink for NoopTelemetrySink {
63 fn emit(&self, _event: TelemetryEvent) {}
64 fn events(&self) -> Vec<TelemetryEvent> { Vec::new() }
65}
66
67pub struct ObservabilityPlugin {
69 sink: Arc<dyn TelemetrySink>,
70}
71
72impl ObservabilityPlugin {
73 pub fn new(sink: Arc<dyn TelemetrySink>) -> Self {
74 Self { sink }
75 }
76
77 pub fn descriptor(&self) -> PluginDescriptor {
78 PluginDescriptor {
79 id: "observability",
80 display_name: "Observability",
81 description: "Telemetry via engine logger hooks",
82 }
83 }
84
85 pub fn install<S: FlowState>(&self, engine: &mut FlowEngine<S>) {
86 self.install_with_options(engine, false);
87 }
88
89 pub fn install_with_options<S: FlowState>(&self, engine: &mut FlowEngine<S>, append: bool) {
90 let prev_transition = if append { engine.take_transition_logger() } else { None };
91 let prev_error = if append { engine.take_error_logger() } else { None };
92 let prev_guard = if append { engine.take_guard_logger() } else { None };
93
94 let sink = self.sink.clone();
95 engine.set_transition_logger(move |entry| {
96 if let Some(ref prev) = prev_transition { prev(entry); }
97 sink.emit(TelemetryEvent {
98 event_type: TelemetryType::Transition,
99 flow_id: entry.flow_id.clone(),
100 flow_name: entry.flow_name.clone(),
101 data: format!("{} -> {} via {}", entry.from, entry.to, entry.trigger),
102 timestamp: Instant::now(),
103 duration_micros: entry.duration_micros,
104 });
105 });
106
107 let sink = self.sink.clone();
108 engine.set_error_logger(move |entry| {
109 if let Some(ref prev) = prev_error { prev(entry); }
110 sink.emit(TelemetryEvent {
111 event_type: TelemetryType::Error,
112 flow_id: entry.flow_id.clone(),
113 flow_name: entry.flow_name.clone(),
114 data: format!("{} -> {} error: {:?}", entry.from, entry.to, entry.cause),
115 timestamp: Instant::now(),
116 duration_micros: entry.duration_micros,
117 });
118 });
119
120 let sink = self.sink.clone();
121 engine.set_guard_logger(move |entry| {
122 if let Some(ref prev) = prev_guard { prev(entry); }
123 sink.emit(TelemetryEvent {
124 event_type: TelemetryType::Guard,
125 flow_id: entry.flow_id.clone(),
126 flow_name: entry.flow_name.clone(),
127 data: format!("guard {} at {}: {}", entry.guard_name, entry.state, entry.result),
128 timestamp: Instant::now(),
129 duration_micros: entry.duration_micros,
130 });
131 });
132 }
133}