vane_core/flow_ctx.rs
1use std::sync::Arc;
2
3use tokio_util::sync::CancellationToken;
4
5use crate::flow_log::{FlowLogSink, FlowLogVerbosity, TrajectoryBuilder};
6
7/// Per-walk execution context. Constructed once per L4 connection (and
8/// re-constructed per L7 request when a hyper service-fn dispatches into
9/// the L7 sub-graph). Fields are *owned* — no lifetime parameter — so the
10/// struct survives `tokio::spawn` and `move` closures (notably hyper's
11/// service-fn closure at `Node::Upgrade`, which captures `log` / `cancel`
12/// / `verbosity` per request).
13///
14/// `Arc<dyn FlowLogSink>` and `CancellationToken` clone cheaply (each is
15/// internally an `Arc`), and `tracing::Span` is also `Arc`-backed; the
16/// per-request clones in the hyper bridge are O(1).
17pub struct FlowCtx {
18 pub span: tracing::Span,
19 pub log: Arc<dyn FlowLogSink>,
20 /// Hard-cancel token. Fired by the listener's `force_cancel` tier
21 /// when shutdown's soft-drain window expires. Long-lived
22 /// terminators (`ByteTunnel`, UDP tunnel) `select!` on this and
23 /// surface `CloseReason::Cancelled`. Per-request hyper drivers
24 /// hand this to their per-request executor wiring.
25 pub cancel: CancellationToken,
26 /// Soft-drain token. Fired by the listener's `accept_cancel` tier
27 /// at the start of shutdown / listener removal — well before
28 /// `cancel` (`force_cancel`) trips. Hyper H1 / H2 drivers wire
29 /// this into `graceful_shutdown()` so idle keep-alive clients see
30 /// `Connection: close` (H1) or `GOAWAY` (H2) immediately rather
31 /// than camping until the drain budget runs out. Defaults to a
32 /// fresh detached `CancellationToken` so call sites that have not
33 /// been wired through yet keep their pre-split semantics.
34 pub accept_cancel: CancellationToken,
35 /// Verbosity selected when this connection was accepted. The listener
36 /// reads `engine::VerbosityState` once at `FlowCtx` construction;
37 /// in-flight connections retain the value they were built with.
38 pub verbosity: FlowLogVerbosity,
39 /// Walker-internal step accumulator. The executor pushes one entry
40 /// per node-visit and emits a single `FlowLogKind::Trajectory` event
41 /// from `finalize()` at terminate or error.
42 pub trajectory: TrajectoryBuilder,
43}
44
45#[cfg(test)]
46mod tests {
47 use parking_lot::Mutex;
48
49 use super::*;
50 use crate::conn_context::ConnId;
51 use crate::flow_log::FlowLogEvent;
52 use crate::ir::NodeId;
53
54 struct NullSink {
55 count: Mutex<u32>,
56 }
57
58 impl FlowLogSink for NullSink {
59 fn emit(&self, _event: FlowLogEvent) {
60 *self.count.lock() += 1;
61 }
62 }
63
64 // Compile-gate: a FlowCtx must be constructible from a concrete sink
65 // wrapped in `Arc<dyn FlowLogSink>`, alongside an owned tracing::Span
66 // and CancellationToken, plus the verbosity / trajectory fields the
67 // walker reads. Field visibility regressions break this.
68 #[test]
69 fn flow_ctx_accepts_arc_dyn_sink_and_owned_fields() {
70 let sink: Arc<dyn FlowLogSink> = Arc::new(NullSink { count: Mutex::new(0) });
71 let span = tracing::Span::none();
72 let cancel = CancellationToken::new();
73 let ctx = FlowCtx {
74 span,
75 log: sink,
76 cancel,
77 accept_cancel: CancellationToken::new(),
78 verbosity: FlowLogVerbosity::Trajectory,
79 trajectory: TrajectoryBuilder::new(ConnId(0), NodeId::new(0), 0),
80 };
81 let _ = &ctx.span;
82 let _ = &ctx.log;
83 let _ = &ctx.cancel;
84 let _ = &ctx.accept_cancel;
85 let _ = &ctx.verbosity;
86 let _ = &ctx.trajectory;
87 }
88}