tramli_plugins/observability/
mod.rs1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3use tramli::{FlowEngine, FlowState};
4use crate::api::PluginDescriptor;
5
6#[derive(Debug, Clone)]
8pub enum TelemetryType {
9 Transition,
10 Guard,
11 Error,
12 State,
13}
14
15#[derive(Debug, Clone)]
17pub struct TelemetryEvent {
18 pub event_type: TelemetryType,
19 pub flow_id: String,
20 pub data: String,
21 pub timestamp: Instant,
22}
23
24pub trait TelemetrySink: Send + Sync {
26 fn emit(&self, event: TelemetryEvent);
27 fn events(&self) -> Vec<TelemetryEvent>;
28}
29
30pub struct InMemoryTelemetrySink {
32 log: Mutex<Vec<TelemetryEvent>>,
33}
34
35impl InMemoryTelemetrySink {
36 pub fn new() -> Self {
37 Self { log: Mutex::new(Vec::new()) }
38 }
39}
40
41impl Default for InMemoryTelemetrySink {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl TelemetrySink for InMemoryTelemetrySink {
48 fn emit(&self, event: TelemetryEvent) {
49 self.log.lock().unwrap().push(event);
50 }
51
52 fn events(&self) -> Vec<TelemetryEvent> {
53 self.log.lock().unwrap().clone()
54 }
55}
56
57pub struct ObservabilityPlugin {
59 sink: Arc<dyn TelemetrySink>,
60}
61
62impl ObservabilityPlugin {
63 pub fn new(sink: Arc<dyn TelemetrySink>) -> Self {
64 Self { sink }
65 }
66
67 pub fn descriptor(&self) -> PluginDescriptor {
68 PluginDescriptor {
69 id: "observability",
70 display_name: "Observability",
71 description: "Telemetry via engine logger hooks",
72 }
73 }
74
75 pub fn install<S: FlowState>(&self, engine: &mut FlowEngine<S>) {
76 let sink = self.sink.clone();
77 engine.set_transition_logger(move |entry| {
78 sink.emit(TelemetryEvent {
79 event_type: TelemetryType::Transition,
80 flow_id: entry.flow_id.clone(),
81 data: format!("{} -> {} via {}", entry.from, entry.to, entry.trigger),
82 timestamp: Instant::now(),
83 });
84 });
85
86 let sink = self.sink.clone();
87 engine.set_error_logger(move |entry| {
88 sink.emit(TelemetryEvent {
89 event_type: TelemetryType::Error,
90 flow_id: entry.flow_id.clone(),
91 data: format!("{} -> {} error: {:?}", entry.from, entry.to, entry.cause),
92 timestamp: Instant::now(),
93 });
94 });
95
96 let sink = self.sink.clone();
97 engine.set_guard_logger(move |entry| {
98 sink.emit(TelemetryEvent {
99 event_type: TelemetryType::Guard,
100 flow_id: entry.flow_id.clone(),
101 data: format!("guard {} at {}: {}", entry.guard_name, entry.state, entry.result),
102 timestamp: Instant::now(),
103 });
104 });
105 }
106}