1use std::sync::Arc;
2
3use crate::conn_context::ConnId;
4use crate::error::SerializedError;
5use crate::ir::NodeId;
6
7pub trait FlowLogSink: Send + Sync {
8 fn emit(&self, event: FlowLogEvent);
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct FlowLogEvent {
13 pub t: u64,
14 pub conn: ConnId,
15 pub seq: u32,
16 pub kind: FlowLogKind,
17 pub node: Option<NodeId>,
18 pub error: Option<Arc<SerializedError>>,
19 pub data: Option<serde_json::Value>,
20}
21
22#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
23pub enum FlowLogKind {
24 Check,
25 Middleware,
26 Fetch,
27 Terminate,
28 Error,
29 SecurityLimit,
30 Upgrade,
31 Trajectory,
35}
36
37#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
38pub enum FlowLogVerbosity {
39 Trajectory,
43 Debug,
46}
47
48#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
49pub struct TrajectoryStep {
50 pub node: NodeId,
51 pub kind: FlowLogKind,
52 pub branch: Option<bool>,
55}
56
57#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
58pub enum TerminatorOutcomeKind {
59 Close,
60 WriteHttpResponse,
61 ByteTunnel,
62}
63
64#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
65pub enum TrajectoryOutcome {
66 Terminated { node: NodeId, terminator: TerminatorOutcomeKind },
67 Error { node: NodeId, message: std::borrow::Cow<'static, str> },
68}
69
70#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
71pub struct FlowTrajectory {
72 pub conn: ConnId,
73 pub entry: NodeId,
74 pub steps: Vec<TrajectoryStep>,
75 pub outcome: TrajectoryOutcome,
76 pub started_at_ms: u64,
77 pub finished_at_ms: u64,
78}
79
80#[derive(Debug)]
85pub struct TrajectoryBuilder {
86 conn: ConnId,
87 entry: NodeId,
88 started_at_ms: u64,
89 steps: Vec<TrajectoryStep>,
90}
91
92impl TrajectoryBuilder {
93 #[must_use]
94 pub fn new(conn: ConnId, entry: NodeId, started_at_ms: u64) -> Self {
95 Self { conn, entry, started_at_ms, steps: Vec::new() }
96 }
97
98 pub fn push(&mut self, step: TrajectoryStep) {
99 self.steps.push(step);
100 }
101
102 #[must_use]
103 pub fn finalize(self, outcome: TrajectoryOutcome, finished_at_ms: u64) -> FlowTrajectory {
104 FlowTrajectory {
105 conn: self.conn,
106 entry: self.entry,
107 steps: self.steps,
108 outcome,
109 started_at_ms: self.started_at_ms,
110 finished_at_ms,
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use parking_lot::Mutex;
118
119 use super::*;
120 use crate::error::Error;
121
122 struct RecordingSink {
123 events: Mutex<Vec<FlowLogEvent>>,
124 }
125
126 impl FlowLogSink for RecordingSink {
127 fn emit(&self, event: FlowLogEvent) {
128 self.events.lock().push(event);
129 }
130 }
131
132 fn sample_event(seq: u32, kind: FlowLogKind) -> FlowLogEvent {
133 FlowLogEvent {
134 t: 1_234_567_890_123,
135 conn: ConnId(0x0bad_f00d_dead_beef),
136 seq,
137 kind,
138 node: Some(NodeId::new(42)),
139 error: None,
140 data: Some(serde_json::json!({ "kv": "v" })),
141 }
142 }
143
144 #[test]
145 fn flow_log_event_round_trips_through_json() {
146 let err = Error::internal("boom");
147 let event = FlowLogEvent {
148 t: 1_700_000_000_000,
149 conn: ConnId(7),
150 seq: 13,
151 kind: FlowLogKind::Error,
152 node: Some(NodeId::new(3)),
153 error: Some(Arc::new(SerializedError::from(&err))),
154 data: Some(serde_json::json!({ "note": "sample" })),
155 };
156 let encoded = serde_json::to_string(&event).expect("serialize");
157 let decoded: FlowLogEvent = serde_json::from_str(&encoded).expect("deserialize");
158
159 assert_eq!(decoded.t, event.t);
160 assert_eq!(decoded.conn, event.conn);
161 assert_eq!(decoded.seq, event.seq);
162 assert_eq!(decoded.kind, event.kind);
163 assert_eq!(decoded.node, event.node);
164 assert_eq!(decoded.data, event.data);
165 let dec_err = decoded.error.as_ref().expect("error preserved");
166 let src_err = event.error.as_ref().expect("error set");
167 assert_eq!(dec_err.kind, src_err.kind);
168 assert_eq!(dec_err.reason, src_err.reason);
169 assert_eq!(dec_err.message, src_err.message);
170 assert_eq!(dec_err.ctx, src_err.ctx);
171 assert_eq!(dec_err.source_chain, src_err.source_chain);
172 assert_eq!(dec_err.http_status, src_err.http_status);
173 assert_eq!(dec_err.retryable, src_err.retryable);
174 }
175
176 #[test]
177 fn flow_log_kind_serde_round_trip_per_variant() {
178 for k in [
179 FlowLogKind::Check,
180 FlowLogKind::Middleware,
181 FlowLogKind::Fetch,
182 FlowLogKind::Terminate,
183 FlowLogKind::Error,
184 FlowLogKind::SecurityLimit,
185 FlowLogKind::Upgrade,
186 ] {
187 let encoded = serde_json::to_string(&k).expect("serialize");
188 let decoded: FlowLogKind = serde_json::from_str(&encoded).expect("deserialize");
189 assert_eq!(decoded, k);
190 }
191 }
192
193 #[test]
194 fn flow_log_sink_trait_accepts_concrete_impl_and_records_in_order() {
195 let sink = RecordingSink { events: Mutex::new(Vec::new()) };
196 let first = sample_event(1, FlowLogKind::Check);
197 let second = sample_event(2, FlowLogKind::Middleware);
198 sink.emit(first.clone());
199 sink.emit(second.clone());
200 let recorded = sink.events.lock();
201 assert_eq!(recorded.len(), 2);
202 assert_eq!(recorded[0].seq, first.seq);
203 assert_eq!(recorded[0].kind, first.kind);
204 assert_eq!(recorded[1].seq, second.seq);
205 assert_eq!(recorded[1].kind, second.kind);
206 }
207
208 #[test]
209 fn flow_log_sink_is_usable_as_trait_object() {
210 let sink = RecordingSink { events: Mutex::new(Vec::new()) };
211 let dyn_sink: &dyn FlowLogSink = &sink;
214 dyn_sink.emit(sample_event(1, FlowLogKind::Fetch));
215 assert_eq!(sink.events.lock().len(), 1);
216 }
217}