Skip to main content

vane_core/
flow_log.rs

1use std::sync::Arc;
2
3use crate::conn_context::ConnId;
4use crate::error::SerializedError;
5use crate::ir::NodeId;
6
7pub trait FlowLogSink: Send + Sync {
8	fn emit(&self, event: FlowLogEvent);
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct FlowLogEvent {
13	pub t: u64,
14	pub conn: ConnId,
15	pub seq: u32,
16	pub kind: FlowLogKind,
17	pub node: Option<NodeId>,
18	pub error: Option<Arc<SerializedError>>,
19	pub data: Option<serde_json::Value>,
20}
21
22#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
23pub enum FlowLogKind {
24	Check,
25	Middleware,
26	Fetch,
27	Terminate,
28	Error,
29	SecurityLimit,
30	Upgrade,
31	/// Per-request summary event. The `data` field carries a serialized
32	/// [`FlowTrajectory`]. Always emitted exactly once per request,
33	/// regardless of verbosity.
34	Trajectory,
35}
36
37#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
38pub enum FlowLogVerbosity {
39	/// Default. One `Trajectory` event per request, plus the existing
40	/// per-connection milestone events (`Terminate`, `Error`, `Upgrade`,
41	/// `SecurityLimit`).
42	Trajectory,
43	/// Adds a per-step event for each `Check` / `Middleware` / `Fetch` /
44	/// `Upgrade` node. Used at incident time; not for production volumes.
45	Debug,
46}
47
48#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
49pub struct TrajectoryStep {
50	pub node: NodeId,
51	pub kind: FlowLogKind,
52	/// `Some(true)` = Check matched, `Some(false)` = Check missed; `None`
53	/// for non-Check steps.
54	pub branch: Option<bool>,
55}
56
57#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
58pub enum TerminatorOutcomeKind {
59	Close,
60	WriteHttpResponse,
61	ByteTunnel,
62}
63
64#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
65pub enum TrajectoryOutcome {
66	Terminated { node: NodeId, terminator: TerminatorOutcomeKind },
67	Error { node: NodeId, message: TrajectoryErrorMessage },
68}
69
70/// Capped error-message payload for [`TrajectoryOutcome::Error`].
71///
72/// Wraps a `String` that has been truncated to
73/// [`TrajectoryErrorMessage::MAX_BYTES`]. `Cow<'static, str>` (the
74/// previous shape) made `err.to_string().into()` look harmless even
75/// though the full `Display` form can carry many KiB of context —
76/// every event then balloons the size of the flow-log sinks. Constrain
77/// the type so a caller can't accidentally bypass the cap.
78///
79/// Construction:
80///
81/// - `From<&Error>` — the production path: routes the message through
82///   [`SerializedError::from`], inheriting its byte cap.
83/// - `from_static(&'static str)` — convenience for tests / fixtures.
84/// - `from_truncated(String)` — explicit cap on an already-built
85///   string; useful when the caller already has a message they don't
86///   want to re-wrap.
87#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
88#[serde(transparent)]
89pub struct TrajectoryErrorMessage(String);
90
91impl TrajectoryErrorMessage {
92	/// Hard cap on the rendered message. Matches the
93	/// `SerializedError::message` ceiling so the two carriers stay in
94	/// lock-step; anything beyond this is truncated with a
95	/// `… [truncated]` suffix.
96	pub const MAX_BYTES: usize = crate::error::SERIALIZED_MESSAGE_CAP;
97
98	/// Build from a static string slice. No truncation needed at the
99	/// type level — call sites that exceed the cap are caller-error.
100	#[must_use]
101	pub fn from_static(s: &'static str) -> Self {
102		Self(cap_message_for_traj(s.to_owned()))
103	}
104
105	/// Cap an already-built `String` to [`Self::MAX_BYTES`].
106	#[must_use]
107	pub fn from_truncated(s: String) -> Self {
108		Self(cap_message_for_traj(s))
109	}
110
111	#[must_use]
112	pub fn as_str(&self) -> &str {
113		&self.0
114	}
115}
116
117impl From<&crate::error::Error> for TrajectoryErrorMessage {
118	fn from(err: &crate::error::Error) -> Self {
119		Self(SerializedError::from(err).message)
120	}
121}
122
123fn cap_message_for_traj(s: String) -> String {
124	const SUFFIX: &str = "… [truncated]";
125	if s.len() <= TrajectoryErrorMessage::MAX_BYTES {
126		return s;
127	}
128	let budget = TrajectoryErrorMessage::MAX_BYTES.saturating_sub(SUFFIX.len());
129	let mut end = budget.min(s.len());
130	while end > 0 && !s.is_char_boundary(end) {
131		end -= 1;
132	}
133	let mut out = String::with_capacity(end + SUFFIX.len());
134	out.push_str(&s[..end]);
135	out.push_str(SUFFIX);
136	out
137}
138
139#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
140pub struct FlowTrajectory {
141	pub conn: ConnId,
142	pub entry: NodeId,
143	pub steps: Vec<TrajectoryStep>,
144	pub outcome: TrajectoryOutcome,
145	pub started_at_ms: u64,
146	pub finished_at_ms: u64,
147}
148
149/// Per-walker accumulator that the executor pushes steps into and
150/// converts to a [`FlowTrajectory`] at terminate/error time. Not a
151/// `FlowLogSink` — the executor explicitly emits one event from the
152/// finalized trajectory.
153#[derive(Debug)]
154pub struct TrajectoryBuilder {
155	conn: ConnId,
156	entry: NodeId,
157	started_at_ms: u64,
158	steps: Vec<TrajectoryStep>,
159}
160
161impl TrajectoryBuilder {
162	#[must_use]
163	pub fn new(conn: ConnId, entry: NodeId, started_at_ms: u64) -> Self {
164		Self { conn, entry, started_at_ms, steps: Vec::new() }
165	}
166
167	/// Detached builder used as a transient placeholder when the
168	/// owning [`FlowCtx`](crate::flow_ctx::FlowCtx) needs to swap its
169	/// trajectory out via [`std::mem::replace`] (finalize consumes by
170	/// value, so the `FlowCtx` must hold *something* in the slot
171	/// during the call).
172	///
173	/// The resulting builder records no entry node and is discarded
174	/// immediately after the swap — callers must not push steps to
175	/// it or finalize it as if it represented a real trace.
176	#[must_use]
177	pub fn placeholder(conn: ConnId, started_at_ms: u64) -> Self {
178		Self { conn, entry: NodeId::new(0), started_at_ms, steps: Vec::new() }
179	}
180
181	pub fn push(&mut self, step: TrajectoryStep) {
182		self.steps.push(step);
183	}
184
185	#[must_use]
186	pub fn finalize(self, outcome: TrajectoryOutcome, finished_at_ms: u64) -> FlowTrajectory {
187		FlowTrajectory {
188			conn: self.conn,
189			entry: self.entry,
190			steps: self.steps,
191			outcome,
192			started_at_ms: self.started_at_ms,
193			finished_at_ms,
194		}
195	}
196}
197
198#[cfg(test)]
199mod tests {
200	use parking_lot::Mutex;
201
202	use super::*;
203	use crate::error::Error;
204
205	struct RecordingSink {
206		events: Mutex<Vec<FlowLogEvent>>,
207	}
208
209	impl FlowLogSink for RecordingSink {
210		fn emit(&self, event: FlowLogEvent) {
211			self.events.lock().push(event);
212		}
213	}
214
215	fn sample_event(seq: u32, kind: FlowLogKind) -> FlowLogEvent {
216		FlowLogEvent {
217			t: 1_234_567_890_123,
218			conn: ConnId(0x0bad_f00d_dead_beef),
219			seq,
220			kind,
221			node: Some(NodeId::new(42)),
222			error: None,
223			data: Some(serde_json::json!({ "kv": "v" })),
224		}
225	}
226
227	#[test]
228	fn flow_log_event_round_trips_through_json() {
229		let err = Error::internal("boom");
230		let event = FlowLogEvent {
231			t: 1_700_000_000_000,
232			conn: ConnId(7),
233			seq: 13,
234			kind: FlowLogKind::Error,
235			node: Some(NodeId::new(3)),
236			error: Some(Arc::new(SerializedError::from(&err))),
237			data: Some(serde_json::json!({ "note": "sample" })),
238		};
239		let encoded = serde_json::to_string(&event).expect("serialize");
240		let decoded: FlowLogEvent = serde_json::from_str(&encoded).expect("deserialize");
241
242		assert_eq!(decoded.t, event.t);
243		assert_eq!(decoded.conn, event.conn);
244		assert_eq!(decoded.seq, event.seq);
245		assert_eq!(decoded.kind, event.kind);
246		assert_eq!(decoded.node, event.node);
247		assert_eq!(decoded.data, event.data);
248		let dec_err = decoded.error.as_ref().expect("error preserved");
249		let src_err = event.error.as_ref().expect("error set");
250		assert_eq!(dec_err.kind, src_err.kind);
251		assert_eq!(dec_err.reason, src_err.reason);
252		assert_eq!(dec_err.message, src_err.message);
253		assert_eq!(dec_err.ctx, src_err.ctx);
254		assert_eq!(dec_err.source_chain, src_err.source_chain);
255		assert_eq!(dec_err.http_status, src_err.http_status);
256		assert_eq!(dec_err.retryable, src_err.retryable);
257	}
258
259	#[test]
260	fn flow_log_kind_serde_round_trip_per_variant() {
261		for k in [
262			FlowLogKind::Check,
263			FlowLogKind::Middleware,
264			FlowLogKind::Fetch,
265			FlowLogKind::Terminate,
266			FlowLogKind::Error,
267			FlowLogKind::SecurityLimit,
268			FlowLogKind::Upgrade,
269		] {
270			let encoded = serde_json::to_string(&k).expect("serialize");
271			let decoded: FlowLogKind = serde_json::from_str(&encoded).expect("deserialize");
272			assert_eq!(decoded, k);
273		}
274	}
275
276	#[test]
277	fn flow_log_sink_trait_accepts_concrete_impl_and_records_in_order() {
278		let sink = RecordingSink { events: Mutex::new(Vec::new()) };
279		let first = sample_event(1, FlowLogKind::Check);
280		let second = sample_event(2, FlowLogKind::Middleware);
281		sink.emit(first.clone());
282		sink.emit(second.clone());
283		let recorded = sink.events.lock();
284		assert_eq!(recorded.len(), 2);
285		assert_eq!(recorded[0].seq, first.seq);
286		assert_eq!(recorded[0].kind, first.kind);
287		assert_eq!(recorded[1].seq, second.seq);
288		assert_eq!(recorded[1].kind, second.kind);
289	}
290
291	#[test]
292	fn flow_log_sink_is_usable_as_trait_object() {
293		let sink = RecordingSink { events: Mutex::new(Vec::new()) };
294		// Coerce to trait object and invoke through the vtable; validates
295		// that the trait's `fn emit(&self, ...)` signature is object-safe.
296		let dyn_sink: &dyn FlowLogSink = &sink;
297		dyn_sink.emit(sample_event(1, FlowLogKind::Fetch));
298		assert_eq!(sink.events.lock().len(), 1);
299	}
300
301	#[test]
302	fn trajectory_builder_pushes_in_order_and_finalizes_terminated() {
303		let mut b = TrajectoryBuilder::new(ConnId(7), NodeId::new(0), 1_000);
304		b.push(TrajectoryStep { node: NodeId::new(1), kind: FlowLogKind::Check, branch: Some(true) });
305		b.push(TrajectoryStep { node: NodeId::new(2), kind: FlowLogKind::Middleware, branch: None });
306		b.push(TrajectoryStep { node: NodeId::new(3), kind: FlowLogKind::Fetch, branch: None });
307
308		let traj = b.finalize(
309			TrajectoryOutcome::Terminated {
310				node: NodeId::new(4),
311				terminator: TerminatorOutcomeKind::WriteHttpResponse,
312			},
313			1_500,
314		);
315
316		assert_eq!(traj.conn, ConnId(7));
317		assert_eq!(traj.entry, NodeId::new(0));
318		assert_eq!(traj.started_at_ms, 1_000);
319		assert_eq!(traj.finished_at_ms, 1_500);
320		assert_eq!(traj.steps.len(), 3);
321
322		assert_eq!(traj.steps[0].node, NodeId::new(1));
323		assert_eq!(traj.steps[0].kind, FlowLogKind::Check);
324		assert_eq!(traj.steps[0].branch, Some(true));
325
326		assert_eq!(traj.steps[1].node, NodeId::new(2));
327		assert_eq!(traj.steps[1].kind, FlowLogKind::Middleware);
328		assert_eq!(traj.steps[1].branch, None);
329
330		assert_eq!(traj.steps[2].node, NodeId::new(3));
331		assert_eq!(traj.steps[2].kind, FlowLogKind::Fetch);
332		assert_eq!(traj.steps[2].branch, None);
333
334		match traj.outcome {
335			TrajectoryOutcome::Terminated { node, terminator } => {
336				assert_eq!(node, NodeId::new(4));
337				assert_eq!(terminator, TerminatorOutcomeKind::WriteHttpResponse);
338			}
339			other @ TrajectoryOutcome::Error { .. } => {
340				panic!("expected Terminated outcome, got {other:?}")
341			}
342		}
343	}
344
345	#[test]
346	fn trajectory_builder_finalizes_with_error_outcome() {
347		let b = TrajectoryBuilder::new(ConnId(7), NodeId::new(0), 1_000);
348		let traj = b.finalize(
349			TrajectoryOutcome::Error {
350				node: NodeId::new(0),
351				message: TrajectoryErrorMessage::from_static("boom"),
352			},
353			2_000,
354		);
355
356		assert!(traj.steps.is_empty(), "no pushes → no steps in finalized trajectory");
357		match &traj.outcome {
358			TrajectoryOutcome::Error { node, message } => {
359				assert_eq!(*node, NodeId::new(0));
360				assert_eq!(message.as_str(), "boom");
361			}
362			other @ TrajectoryOutcome::Terminated { .. } => {
363				panic!("expected Error outcome, got {other:?}")
364			}
365		}
366		assert_eq!(traj.finished_at_ms, 2_000);
367	}
368
369	fn assert_trajectories_match(a: &FlowTrajectory, b: &FlowTrajectory) {
370		assert_eq!(a.conn, b.conn);
371		assert_eq!(a.entry, b.entry);
372		assert_eq!(a.started_at_ms, b.started_at_ms);
373		assert_eq!(a.finished_at_ms, b.finished_at_ms);
374		assert_eq!(a.steps.len(), b.steps.len());
375		for (left, right) in a.steps.iter().zip(b.steps.iter()) {
376			assert_eq!(left.node, right.node);
377			assert_eq!(left.kind, right.kind);
378			assert_eq!(left.branch, right.branch);
379		}
380		match (&a.outcome, &b.outcome) {
381			(
382				TrajectoryOutcome::Terminated { node: na, terminator: ta },
383				TrajectoryOutcome::Terminated { node: nb, terminator: tb },
384			) => {
385				assert_eq!(na, nb);
386				assert_eq!(ta, tb);
387			}
388			(
389				TrajectoryOutcome::Error { node: na, message: ma },
390				TrajectoryOutcome::Error { node: nb, message: mb },
391			) => {
392				assert_eq!(na, nb);
393				assert_eq!(ma.as_str(), mb.as_str());
394			}
395			(left, right) => panic!("outcome variant mismatch: {left:?} vs {right:?}"),
396		}
397	}
398
399	#[test]
400	fn flow_trajectory_round_trips_through_json() {
401		let mut b = TrajectoryBuilder::new(ConnId(0x1234_5678), NodeId::new(0), 100);
402		b.push(TrajectoryStep { node: NodeId::new(1), kind: FlowLogKind::Check, branch: Some(false) });
403		b.push(TrajectoryStep { node: NodeId::new(2), kind: FlowLogKind::Upgrade, branch: None });
404		let term = b.finalize(
405			TrajectoryOutcome::Terminated {
406				node: NodeId::new(3),
407				terminator: TerminatorOutcomeKind::ByteTunnel,
408			},
409			200,
410		);
411		let encoded = serde_json::to_string(&term).expect("serialize terminated");
412		let decoded: FlowTrajectory = serde_json::from_str(&encoded).expect("deserialize terminated");
413		assert_trajectories_match(&term, &decoded);
414
415		let err = TrajectoryBuilder::new(ConnId(42), NodeId::new(7), 0).finalize(
416			TrajectoryOutcome::Error {
417				node: NodeId::new(8),
418				message: TrajectoryErrorMessage::from_static("upstream went away"),
419			},
420			17,
421		);
422		let encoded = serde_json::to_string(&err).expect("serialize error");
423		let decoded: FlowTrajectory = serde_json::from_str(&encoded).expect("deserialize error");
424		assert_trajectories_match(&err, &decoded);
425	}
426
427	#[test]
428	fn flow_log_kind_trajectory_serde_round_trip() {
429		let encoded = serde_json::to_string(&FlowLogKind::Trajectory).expect("serialize");
430		let decoded: FlowLogKind = serde_json::from_str(&encoded).expect("deserialize");
431		assert_eq!(decoded, FlowLogKind::Trajectory);
432	}
433
434	#[test]
435	fn flow_log_verbosity_serde_round_trip_per_variant() {
436		for v in [FlowLogVerbosity::Trajectory, FlowLogVerbosity::Debug] {
437			let encoded = serde_json::to_string(&v).expect("serialize");
438			let decoded: FlowLogVerbosity = serde_json::from_str(&encoded).expect("deserialize");
439			assert_eq!(decoded, v);
440		}
441	}
442}