1use std::sync::Arc;
2
3use crate::conn_context::ConnId;
4use crate::error::SerializedError;
5use crate::ir::NodeId;
6
7pub trait FlowLogSink: Send + Sync {
8 fn emit(&self, event: FlowLogEvent);
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct FlowLogEvent {
13 pub t: u64,
14 pub conn: ConnId,
15 pub seq: u32,
16 pub kind: FlowLogKind,
17 pub node: Option<NodeId>,
18 pub error: Option<Arc<SerializedError>>,
19 pub data: Option<serde_json::Value>,
20}
21
22#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
23pub enum FlowLogKind {
24 Check,
25 Middleware,
26 Fetch,
27 Terminate,
28 Error,
29 SecurityLimit,
30 Upgrade,
31 Trajectory,
35}
36
37#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
38pub enum FlowLogVerbosity {
39 Trajectory,
43 Debug,
46}
47
48#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
49pub struct TrajectoryStep {
50 pub node: NodeId,
51 pub kind: FlowLogKind,
52 pub branch: Option<bool>,
55}
56
57#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
58pub enum TerminatorOutcomeKind {
59 Close,
60 WriteHttpResponse,
61 ByteTunnel,
62}
63
64#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
65pub enum TrajectoryOutcome {
66 Terminated { node: NodeId, terminator: TerminatorOutcomeKind },
67 Error { node: NodeId, message: TrajectoryErrorMessage },
68}
69
70#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
88#[serde(transparent)]
89pub struct TrajectoryErrorMessage(String);
90
91impl TrajectoryErrorMessage {
92 pub const MAX_BYTES: usize = crate::error::SERIALIZED_MESSAGE_CAP;
97
98 #[must_use]
101 pub fn from_static(s: &'static str) -> Self {
102 Self(cap_message_for_traj(s.to_owned()))
103 }
104
105 #[must_use]
107 pub fn from_truncated(s: String) -> Self {
108 Self(cap_message_for_traj(s))
109 }
110
111 #[must_use]
112 pub fn as_str(&self) -> &str {
113 &self.0
114 }
115}
116
117impl From<&crate::error::Error> for TrajectoryErrorMessage {
118 fn from(err: &crate::error::Error) -> Self {
119 Self(SerializedError::from(err).message)
120 }
121}
122
123fn cap_message_for_traj(s: String) -> String {
124 const SUFFIX: &str = "… [truncated]";
125 if s.len() <= TrajectoryErrorMessage::MAX_BYTES {
126 return s;
127 }
128 let budget = TrajectoryErrorMessage::MAX_BYTES.saturating_sub(SUFFIX.len());
129 let mut end = budget.min(s.len());
130 while end > 0 && !s.is_char_boundary(end) {
131 end -= 1;
132 }
133 let mut out = String::with_capacity(end + SUFFIX.len());
134 out.push_str(&s[..end]);
135 out.push_str(SUFFIX);
136 out
137}
138
139#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
140pub struct FlowTrajectory {
141 pub conn: ConnId,
142 pub entry: NodeId,
143 pub steps: Vec<TrajectoryStep>,
144 pub outcome: TrajectoryOutcome,
145 pub started_at_ms: u64,
146 pub finished_at_ms: u64,
147}
148
149#[derive(Debug)]
154pub struct TrajectoryBuilder {
155 conn: ConnId,
156 entry: NodeId,
157 started_at_ms: u64,
158 steps: Vec<TrajectoryStep>,
159}
160
161impl TrajectoryBuilder {
162 #[must_use]
163 pub fn new(conn: ConnId, entry: NodeId, started_at_ms: u64) -> Self {
164 Self { conn, entry, started_at_ms, steps: Vec::new() }
165 }
166
167 #[must_use]
177 pub fn placeholder(conn: ConnId, started_at_ms: u64) -> Self {
178 Self { conn, entry: NodeId::new(0), started_at_ms, steps: Vec::new() }
179 }
180
181 pub fn push(&mut self, step: TrajectoryStep) {
182 self.steps.push(step);
183 }
184
185 #[must_use]
186 pub fn finalize(self, outcome: TrajectoryOutcome, finished_at_ms: u64) -> FlowTrajectory {
187 FlowTrajectory {
188 conn: self.conn,
189 entry: self.entry,
190 steps: self.steps,
191 outcome,
192 started_at_ms: self.started_at_ms,
193 finished_at_ms,
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use parking_lot::Mutex;
201
202 use super::*;
203 use crate::error::Error;
204
205 struct RecordingSink {
206 events: Mutex<Vec<FlowLogEvent>>,
207 }
208
209 impl FlowLogSink for RecordingSink {
210 fn emit(&self, event: FlowLogEvent) {
211 self.events.lock().push(event);
212 }
213 }
214
215 fn sample_event(seq: u32, kind: FlowLogKind) -> FlowLogEvent {
216 FlowLogEvent {
217 t: 1_234_567_890_123,
218 conn: ConnId(0x0bad_f00d_dead_beef),
219 seq,
220 kind,
221 node: Some(NodeId::new(42)),
222 error: None,
223 data: Some(serde_json::json!({ "kv": "v" })),
224 }
225 }
226
227 #[test]
228 fn flow_log_event_round_trips_through_json() {
229 let err = Error::internal("boom");
230 let event = FlowLogEvent {
231 t: 1_700_000_000_000,
232 conn: ConnId(7),
233 seq: 13,
234 kind: FlowLogKind::Error,
235 node: Some(NodeId::new(3)),
236 error: Some(Arc::new(SerializedError::from(&err))),
237 data: Some(serde_json::json!({ "note": "sample" })),
238 };
239 let encoded = serde_json::to_string(&event).expect("serialize");
240 let decoded: FlowLogEvent = serde_json::from_str(&encoded).expect("deserialize");
241
242 assert_eq!(decoded.t, event.t);
243 assert_eq!(decoded.conn, event.conn);
244 assert_eq!(decoded.seq, event.seq);
245 assert_eq!(decoded.kind, event.kind);
246 assert_eq!(decoded.node, event.node);
247 assert_eq!(decoded.data, event.data);
248 let dec_err = decoded.error.as_ref().expect("error preserved");
249 let src_err = event.error.as_ref().expect("error set");
250 assert_eq!(dec_err.kind, src_err.kind);
251 assert_eq!(dec_err.reason, src_err.reason);
252 assert_eq!(dec_err.message, src_err.message);
253 assert_eq!(dec_err.ctx, src_err.ctx);
254 assert_eq!(dec_err.source_chain, src_err.source_chain);
255 assert_eq!(dec_err.http_status, src_err.http_status);
256 assert_eq!(dec_err.retryable, src_err.retryable);
257 }
258
259 #[test]
260 fn flow_log_kind_serde_round_trip_per_variant() {
261 for k in [
262 FlowLogKind::Check,
263 FlowLogKind::Middleware,
264 FlowLogKind::Fetch,
265 FlowLogKind::Terminate,
266 FlowLogKind::Error,
267 FlowLogKind::SecurityLimit,
268 FlowLogKind::Upgrade,
269 ] {
270 let encoded = serde_json::to_string(&k).expect("serialize");
271 let decoded: FlowLogKind = serde_json::from_str(&encoded).expect("deserialize");
272 assert_eq!(decoded, k);
273 }
274 }
275
276 #[test]
277 fn flow_log_sink_trait_accepts_concrete_impl_and_records_in_order() {
278 let sink = RecordingSink { events: Mutex::new(Vec::new()) };
279 let first = sample_event(1, FlowLogKind::Check);
280 let second = sample_event(2, FlowLogKind::Middleware);
281 sink.emit(first.clone());
282 sink.emit(second.clone());
283 let recorded = sink.events.lock();
284 assert_eq!(recorded.len(), 2);
285 assert_eq!(recorded[0].seq, first.seq);
286 assert_eq!(recorded[0].kind, first.kind);
287 assert_eq!(recorded[1].seq, second.seq);
288 assert_eq!(recorded[1].kind, second.kind);
289 }
290
291 #[test]
292 fn flow_log_sink_is_usable_as_trait_object() {
293 let sink = RecordingSink { events: Mutex::new(Vec::new()) };
294 let dyn_sink: &dyn FlowLogSink = &sink;
297 dyn_sink.emit(sample_event(1, FlowLogKind::Fetch));
298 assert_eq!(sink.events.lock().len(), 1);
299 }
300
301 #[test]
302 fn trajectory_builder_pushes_in_order_and_finalizes_terminated() {
303 let mut b = TrajectoryBuilder::new(ConnId(7), NodeId::new(0), 1_000);
304 b.push(TrajectoryStep { node: NodeId::new(1), kind: FlowLogKind::Check, branch: Some(true) });
305 b.push(TrajectoryStep { node: NodeId::new(2), kind: FlowLogKind::Middleware, branch: None });
306 b.push(TrajectoryStep { node: NodeId::new(3), kind: FlowLogKind::Fetch, branch: None });
307
308 let traj = b.finalize(
309 TrajectoryOutcome::Terminated {
310 node: NodeId::new(4),
311 terminator: TerminatorOutcomeKind::WriteHttpResponse,
312 },
313 1_500,
314 );
315
316 assert_eq!(traj.conn, ConnId(7));
317 assert_eq!(traj.entry, NodeId::new(0));
318 assert_eq!(traj.started_at_ms, 1_000);
319 assert_eq!(traj.finished_at_ms, 1_500);
320 assert_eq!(traj.steps.len(), 3);
321
322 assert_eq!(traj.steps[0].node, NodeId::new(1));
323 assert_eq!(traj.steps[0].kind, FlowLogKind::Check);
324 assert_eq!(traj.steps[0].branch, Some(true));
325
326 assert_eq!(traj.steps[1].node, NodeId::new(2));
327 assert_eq!(traj.steps[1].kind, FlowLogKind::Middleware);
328 assert_eq!(traj.steps[1].branch, None);
329
330 assert_eq!(traj.steps[2].node, NodeId::new(3));
331 assert_eq!(traj.steps[2].kind, FlowLogKind::Fetch);
332 assert_eq!(traj.steps[2].branch, None);
333
334 match traj.outcome {
335 TrajectoryOutcome::Terminated { node, terminator } => {
336 assert_eq!(node, NodeId::new(4));
337 assert_eq!(terminator, TerminatorOutcomeKind::WriteHttpResponse);
338 }
339 other @ TrajectoryOutcome::Error { .. } => {
340 panic!("expected Terminated outcome, got {other:?}")
341 }
342 }
343 }
344
345 #[test]
346 fn trajectory_builder_finalizes_with_error_outcome() {
347 let b = TrajectoryBuilder::new(ConnId(7), NodeId::new(0), 1_000);
348 let traj = b.finalize(
349 TrajectoryOutcome::Error {
350 node: NodeId::new(0),
351 message: TrajectoryErrorMessage::from_static("boom"),
352 },
353 2_000,
354 );
355
356 assert!(traj.steps.is_empty(), "no pushes → no steps in finalized trajectory");
357 match &traj.outcome {
358 TrajectoryOutcome::Error { node, message } => {
359 assert_eq!(*node, NodeId::new(0));
360 assert_eq!(message.as_str(), "boom");
361 }
362 other @ TrajectoryOutcome::Terminated { .. } => {
363 panic!("expected Error outcome, got {other:?}")
364 }
365 }
366 assert_eq!(traj.finished_at_ms, 2_000);
367 }
368
369 fn assert_trajectories_match(a: &FlowTrajectory, b: &FlowTrajectory) {
370 assert_eq!(a.conn, b.conn);
371 assert_eq!(a.entry, b.entry);
372 assert_eq!(a.started_at_ms, b.started_at_ms);
373 assert_eq!(a.finished_at_ms, b.finished_at_ms);
374 assert_eq!(a.steps.len(), b.steps.len());
375 for (left, right) in a.steps.iter().zip(b.steps.iter()) {
376 assert_eq!(left.node, right.node);
377 assert_eq!(left.kind, right.kind);
378 assert_eq!(left.branch, right.branch);
379 }
380 match (&a.outcome, &b.outcome) {
381 (
382 TrajectoryOutcome::Terminated { node: na, terminator: ta },
383 TrajectoryOutcome::Terminated { node: nb, terminator: tb },
384 ) => {
385 assert_eq!(na, nb);
386 assert_eq!(ta, tb);
387 }
388 (
389 TrajectoryOutcome::Error { node: na, message: ma },
390 TrajectoryOutcome::Error { node: nb, message: mb },
391 ) => {
392 assert_eq!(na, nb);
393 assert_eq!(ma.as_str(), mb.as_str());
394 }
395 (left, right) => panic!("outcome variant mismatch: {left:?} vs {right:?}"),
396 }
397 }
398
399 #[test]
400 fn flow_trajectory_round_trips_through_json() {
401 let mut b = TrajectoryBuilder::new(ConnId(0x1234_5678), NodeId::new(0), 100);
402 b.push(TrajectoryStep { node: NodeId::new(1), kind: FlowLogKind::Check, branch: Some(false) });
403 b.push(TrajectoryStep { node: NodeId::new(2), kind: FlowLogKind::Upgrade, branch: None });
404 let term = b.finalize(
405 TrajectoryOutcome::Terminated {
406 node: NodeId::new(3),
407 terminator: TerminatorOutcomeKind::ByteTunnel,
408 },
409 200,
410 );
411 let encoded = serde_json::to_string(&term).expect("serialize terminated");
412 let decoded: FlowTrajectory = serde_json::from_str(&encoded).expect("deserialize terminated");
413 assert_trajectories_match(&term, &decoded);
414
415 let err = TrajectoryBuilder::new(ConnId(42), NodeId::new(7), 0).finalize(
416 TrajectoryOutcome::Error {
417 node: NodeId::new(8),
418 message: TrajectoryErrorMessage::from_static("upstream went away"),
419 },
420 17,
421 );
422 let encoded = serde_json::to_string(&err).expect("serialize error");
423 let decoded: FlowTrajectory = serde_json::from_str(&encoded).expect("deserialize error");
424 assert_trajectories_match(&err, &decoded);
425 }
426
427 #[test]
428 fn flow_log_kind_trajectory_serde_round_trip() {
429 let encoded = serde_json::to_string(&FlowLogKind::Trajectory).expect("serialize");
430 let decoded: FlowLogKind = serde_json::from_str(&encoded).expect("deserialize");
431 assert_eq!(decoded, FlowLogKind::Trajectory);
432 }
433
434 #[test]
435 fn flow_log_verbosity_serde_round_trip_per_variant() {
436 for v in [FlowLogVerbosity::Trajectory, FlowLogVerbosity::Debug] {
437 let encoded = serde_json::to_string(&v).expect("serialize");
438 let decoded: FlowLogVerbosity = serde_json::from_str(&encoded).expect("deserialize");
439 assert_eq!(decoded, v);
440 }
441 }
442}