tramli_plugins/observability/
mod.rs1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3use tramli::{FlowEngine, FlowState};
4use crate::api::PluginDescriptor;
5
6#[derive(Debug, Clone)]
8pub enum TelemetryType {
9 Transition,
10 Guard,
11 Error,
12 State,
13}
14
15#[derive(Debug, Clone)]
17pub struct TelemetryEvent {
18 pub event_type: TelemetryType,
19 pub flow_id: String,
20 pub flow_name: String,
21 pub data: String,
22 pub timestamp: Instant,
23 pub duration_micros: u64,
24}
25
26pub trait TelemetrySink: Send + Sync {
28 fn emit(&self, event: TelemetryEvent);
29 fn events(&self) -> Vec<TelemetryEvent>;
30}
31
32pub struct InMemoryTelemetrySink {
34 log: Mutex<Vec<TelemetryEvent>>,
35}
36
37impl InMemoryTelemetrySink {
38 pub fn new() -> Self {
39 Self { log: Mutex::new(Vec::new()) }
40 }
41}
42
43impl Default for InMemoryTelemetrySink {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl TelemetrySink for InMemoryTelemetrySink {
50 fn emit(&self, event: TelemetryEvent) {
51 self.log.lock().unwrap().push(event);
52 }
53
54 fn events(&self) -> Vec<TelemetryEvent> {
55 self.log.lock().unwrap().clone()
56 }
57}
58
59pub struct ObservabilityPlugin {
61 sink: Arc<dyn TelemetrySink>,
62}
63
64impl ObservabilityPlugin {
65 pub fn new(sink: Arc<dyn TelemetrySink>) -> Self {
66 Self { sink }
67 }
68
69 pub fn descriptor(&self) -> PluginDescriptor {
70 PluginDescriptor {
71 id: "observability",
72 display_name: "Observability",
73 description: "Telemetry via engine logger hooks",
74 }
75 }
76
77 pub fn install<S: FlowState>(&self, engine: &mut FlowEngine<S>) {
78 self.install_with_options(engine, false);
79 }
80
81 pub fn install_with_options<S: FlowState>(&self, engine: &mut FlowEngine<S>, append: bool) {
82 let prev_transition = if append { engine.take_transition_logger() } else { None };
83 let prev_error = if append { engine.take_error_logger() } else { None };
84 let prev_guard = if append { engine.take_guard_logger() } else { None };
85
86 let sink = self.sink.clone();
87 engine.set_transition_logger(move |entry| {
88 if let Some(ref prev) = prev_transition { prev(entry); }
89 sink.emit(TelemetryEvent {
90 event_type: TelemetryType::Transition,
91 flow_id: entry.flow_id.clone(),
92 flow_name: entry.flow_name.clone(),
93 data: format!("{} -> {} via {}", entry.from, entry.to, entry.trigger),
94 timestamp: Instant::now(),
95 duration_micros: entry.duration_micros,
96 });
97 });
98
99 let sink = self.sink.clone();
100 engine.set_error_logger(move |entry| {
101 if let Some(ref prev) = prev_error { prev(entry); }
102 sink.emit(TelemetryEvent {
103 event_type: TelemetryType::Error,
104 flow_id: entry.flow_id.clone(),
105 flow_name: entry.flow_name.clone(),
106 data: format!("{} -> {} error: {:?}", entry.from, entry.to, entry.cause),
107 timestamp: Instant::now(),
108 duration_micros: entry.duration_micros,
109 });
110 });
111
112 let sink = self.sink.clone();
113 engine.set_guard_logger(move |entry| {
114 if let Some(ref prev) = prev_guard { prev(entry); }
115 sink.emit(TelemetryEvent {
116 event_type: TelemetryType::Guard,
117 flow_id: entry.flow_id.clone(),
118 flow_name: entry.flow_name.clone(),
119 data: format!("guard {} at {}: {}", entry.guard_name, entry.state, entry.result),
120 timestamp: Instant::now(),
121 duration_micros: entry.duration_micros,
122 });
123 });
124 }
125}