Skip to main content

vane_core/
flow_log.rs

1use std::sync::Arc;
2
3use crate::conn_context::ConnId;
4use crate::error::SerializedError;
5use crate::ir::NodeId;
6
7pub trait FlowLogSink: Send + Sync {
8	fn emit(&self, event: FlowLogEvent);
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct FlowLogEvent {
13	pub t: u64,
14	pub conn: ConnId,
15	pub seq: u32,
16	pub kind: FlowLogKind,
17	pub node: Option<NodeId>,
18	pub error: Option<Arc<SerializedError>>,
19	pub data: Option<serde_json::Value>,
20}
21
22#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
23pub enum FlowLogKind {
24	Check,
25	Middleware,
26	Fetch,
27	Terminate,
28	Error,
29	SecurityLimit,
30	Upgrade,
31	/// Per-request summary event. The `data` field carries a serialized
32	/// [`FlowTrajectory`]. Always emitted exactly once per request,
33	/// regardless of verbosity.
34	Trajectory,
35}
36
37#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
38pub enum FlowLogVerbosity {
39	/// Default. One `Trajectory` event per request, plus the existing
40	/// per-connection milestone events (`Terminate`, `Error`, `Upgrade`,
41	/// `SecurityLimit`).
42	Trajectory,
43	/// Adds a per-step event for each `Check` / `Middleware` / `Fetch` /
44	/// `Upgrade` node. Used at incident time; not for production volumes.
45	Debug,
46}
47
48#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
49pub struct TrajectoryStep {
50	pub node: NodeId,
51	pub kind: FlowLogKind,
52	/// `Some(true)` = Check matched, `Some(false)` = Check missed; `None`
53	/// for non-Check steps.
54	pub branch: Option<bool>,
55}
56
57#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
58pub enum TerminatorOutcomeKind {
59	Close,
60	WriteHttpResponse,
61	ByteTunnel,
62}
63
64#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
65pub enum TrajectoryOutcome {
66	Terminated { node: NodeId, terminator: TerminatorOutcomeKind },
67	Error { node: NodeId, message: std::borrow::Cow<'static, str> },
68}
69
70#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
71pub struct FlowTrajectory {
72	pub conn: ConnId,
73	pub entry: NodeId,
74	pub steps: Vec<TrajectoryStep>,
75	pub outcome: TrajectoryOutcome,
76	pub started_at_ms: u64,
77	pub finished_at_ms: u64,
78}
79
80/// Per-walker accumulator that the executor pushes steps into and
81/// converts to a [`FlowTrajectory`] at terminate/error time. Not a
82/// `FlowLogSink` — the executor explicitly emits one event from the
83/// finalized trajectory.
84#[derive(Debug)]
85pub struct TrajectoryBuilder {
86	conn: ConnId,
87	entry: NodeId,
88	started_at_ms: u64,
89	steps: Vec<TrajectoryStep>,
90}
91
92impl TrajectoryBuilder {
93	#[must_use]
94	pub fn new(conn: ConnId, entry: NodeId, started_at_ms: u64) -> Self {
95		Self { conn, entry, started_at_ms, steps: Vec::new() }
96	}
97
98	pub fn push(&mut self, step: TrajectoryStep) {
99		self.steps.push(step);
100	}
101
102	#[must_use]
103	pub fn finalize(self, outcome: TrajectoryOutcome, finished_at_ms: u64) -> FlowTrajectory {
104		FlowTrajectory {
105			conn: self.conn,
106			entry: self.entry,
107			steps: self.steps,
108			outcome,
109			started_at_ms: self.started_at_ms,
110			finished_at_ms,
111		}
112	}
113}
114
115#[cfg(test)]
116mod tests {
117	use parking_lot::Mutex;
118
119	use super::*;
120	use crate::error::Error;
121
122	struct RecordingSink {
123		events: Mutex<Vec<FlowLogEvent>>,
124	}
125
126	impl FlowLogSink for RecordingSink {
127		fn emit(&self, event: FlowLogEvent) {
128			self.events.lock().push(event);
129		}
130	}
131
132	fn sample_event(seq: u32, kind: FlowLogKind) -> FlowLogEvent {
133		FlowLogEvent {
134			t: 1_234_567_890_123,
135			conn: ConnId(0x0bad_f00d_dead_beef),
136			seq,
137			kind,
138			node: Some(NodeId::new(42)),
139			error: None,
140			data: Some(serde_json::json!({ "kv": "v" })),
141		}
142	}
143
144	#[test]
145	fn flow_log_event_round_trips_through_json() {
146		let err = Error::internal("boom");
147		let event = FlowLogEvent {
148			t: 1_700_000_000_000,
149			conn: ConnId(7),
150			seq: 13,
151			kind: FlowLogKind::Error,
152			node: Some(NodeId::new(3)),
153			error: Some(Arc::new(SerializedError::from(&err))),
154			data: Some(serde_json::json!({ "note": "sample" })),
155		};
156		let encoded = serde_json::to_string(&event).expect("serialize");
157		let decoded: FlowLogEvent = serde_json::from_str(&encoded).expect("deserialize");
158
159		assert_eq!(decoded.t, event.t);
160		assert_eq!(decoded.conn, event.conn);
161		assert_eq!(decoded.seq, event.seq);
162		assert_eq!(decoded.kind, event.kind);
163		assert_eq!(decoded.node, event.node);
164		assert_eq!(decoded.data, event.data);
165		let dec_err = decoded.error.as_ref().expect("error preserved");
166		let src_err = event.error.as_ref().expect("error set");
167		assert_eq!(dec_err.kind, src_err.kind);
168		assert_eq!(dec_err.reason, src_err.reason);
169		assert_eq!(dec_err.message, src_err.message);
170		assert_eq!(dec_err.ctx, src_err.ctx);
171		assert_eq!(dec_err.source_chain, src_err.source_chain);
172		assert_eq!(dec_err.http_status, src_err.http_status);
173		assert_eq!(dec_err.retryable, src_err.retryable);
174	}
175
176	#[test]
177	fn flow_log_kind_serde_round_trip_per_variant() {
178		for k in [
179			FlowLogKind::Check,
180			FlowLogKind::Middleware,
181			FlowLogKind::Fetch,
182			FlowLogKind::Terminate,
183			FlowLogKind::Error,
184			FlowLogKind::SecurityLimit,
185			FlowLogKind::Upgrade,
186		] {
187			let encoded = serde_json::to_string(&k).expect("serialize");
188			let decoded: FlowLogKind = serde_json::from_str(&encoded).expect("deserialize");
189			assert_eq!(decoded, k);
190		}
191	}
192
193	#[test]
194	fn flow_log_sink_trait_accepts_concrete_impl_and_records_in_order() {
195		let sink = RecordingSink { events: Mutex::new(Vec::new()) };
196		let first = sample_event(1, FlowLogKind::Check);
197		let second = sample_event(2, FlowLogKind::Middleware);
198		sink.emit(first.clone());
199		sink.emit(second.clone());
200		let recorded = sink.events.lock();
201		assert_eq!(recorded.len(), 2);
202		assert_eq!(recorded[0].seq, first.seq);
203		assert_eq!(recorded[0].kind, first.kind);
204		assert_eq!(recorded[1].seq, second.seq);
205		assert_eq!(recorded[1].kind, second.kind);
206	}
207
208	#[test]
209	fn flow_log_sink_is_usable_as_trait_object() {
210		let sink = RecordingSink { events: Mutex::new(Vec::new()) };
211		// Coerce to trait object and invoke through the vtable; validates
212		// that the trait's `fn emit(&self, ...)` signature is object-safe.
213		let dyn_sink: &dyn FlowLogSink = &sink;
214		dyn_sink.emit(sample_event(1, FlowLogKind::Fetch));
215		assert_eq!(sink.events.lock().len(), 1);
216	}
217}