Skip to main content

tracing_console_host/
protocol.rs

1//! Wire protocol between the host (this crate) and the console client.
2//!
3//! Encodes via messagepack (per-message length prefix supplied by
4//! `protosocket-messagepack`).  The protosocket-rpc framework needs each
5//! message to carry an id and a control code, so [`Request`] and [`Response`]
6//! both wrap an enum body next to those framework fields.
7
8use protosocket_rpc::{Message, ProtosocketControlCode};
9use serde::{Deserialize, Serialize};
10
11/// Wire representation of a captured field value.  Mirrors
12/// [`tracing_cache::FieldValue`] but collapses the four string variants
13/// (`Str` / `SmallString` / `SharedString` / `String`) to a single
14/// `Str(String)` — the heap is unavoidable once we cross the network
15/// boundary anyway.
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub enum WireFieldValue {
18    U64(u64),
19    I64(i64),
20    F64(f64),
21    Bool(bool),
22    Str(String),
23}
24
25impl WireFieldValue {
26    /// Render the value as its printable string form.
27    pub fn to_string_value(&self) -> String {
28        match self {
29            WireFieldValue::U64(v) => v.to_string(),
30            WireFieldValue::I64(v) => v.to_string(),
31            WireFieldValue::F64(v) => v.to_string(),
32            WireFieldValue::Bool(v) => v.to_string(),
33            WireFieldValue::Str(s) => s.clone(),
34        }
35    }
36
37    /// Substring-match the printable representation.
38    pub fn contains(&self, needle: &str) -> bool {
39        match self {
40            WireFieldValue::Str(s) => s.contains(needle),
41            other => other.to_string_value().contains(needle),
42        }
43    }
44}
45
46impl std::fmt::Display for WireFieldValue {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            WireFieldValue::U64(v) => write!(f, "{v}"),
50            WireFieldValue::I64(v) => write!(f, "{v}"),
51            WireFieldValue::F64(v) => write!(f, "{v}"),
52            WireFieldValue::Bool(v) => write!(f, "{v}"),
53            WireFieldValue::Str(s) => f.write_str(s),
54        }
55    }
56}
57
58// ── Wire-friendly span / event types ─────────────────────────────────────────
59
60#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
61pub enum WireLevel {
62    Trace,
63    Debug,
64    Info,
65    Warn,
66    Error,
67}
68
69impl WireLevel {
70    pub fn from_tracing(level: &tracing::Level) -> Self {
71        // Match exhaustively so a tracing-core change forces an update here.
72        match *level {
73            tracing::Level::TRACE => WireLevel::Trace,
74            tracing::Level::DEBUG => WireLevel::Debug,
75            tracing::Level::INFO => WireLevel::Info,
76            tracing::Level::WARN => WireLevel::Warn,
77            tracing::Level::ERROR => WireLevel::Error,
78        }
79    }
80
81    pub fn to_tracing(self) -> tracing::Level {
82        match self {
83            WireLevel::Trace => tracing::Level::TRACE,
84            WireLevel::Debug => tracing::Level::DEBUG,
85            WireLevel::Info => tracing::Level::INFO,
86            WireLevel::Warn => tracing::Level::WARN,
87            WireLevel::Error => tracing::Level::ERROR,
88        }
89    }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct WireEvent {
94    pub name: String,
95    pub level: WireLevel,
96    pub fields: Vec<(String, WireFieldValue)>,
97    /// Nanoseconds since the Unix epoch.
98    pub recorded_at_ns: u64,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct WireSpan {
103    pub id: u64,
104    pub parent_id: Option<u64>,
105    pub name: String,
106    pub target: String,
107    pub level: WireLevel,
108    pub fields: Vec<(String, WireFieldValue)>,
109    pub events: Vec<WireEvent>,
110    /// Nanoseconds since the Unix epoch.
111    pub opened_at_ns: u64,
112    /// `None` if still in flight at snapshot time (currently only closed spans
113    /// are streamed, so this is `Some` in practice).
114    pub closed_at_ns: Option<u64>,
115}
116
117impl WireSpan {
118    /// Look up a field by name; O(N) over the typically-small field list.
119    pub fn field(&self, name: &str) -> Option<&WireFieldValue> {
120        self.fields.iter().find(|(k, _)| k == name).map(|(_, v)| v)
121    }
122}
123
124impl WireEvent {
125    pub fn field(&self, name: &str) -> Option<&WireFieldValue> {
126        self.fields.iter().find(|(k, _)| k == name).map(|(_, v)| v)
127    }
128}
129
130// ── Level filter (mirrors tracing::LevelFilter, OFF + 5 levels) ─────────────
131
132/// Wire counterpart to `tracing::level_filters::LevelFilter`.  Includes
133/// `Off` because the cache-side global level can be fully disabled —
134/// distinct from `WireLevel` (which is the per-span/event level and
135/// therefore can't be "off").
136#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
137pub enum WireLevelFilter {
138    Off,
139    Error,
140    Warn,
141    Info,
142    Debug,
143    Trace,
144}
145
146impl WireLevelFilter {
147    pub fn from_tracing(filter: tracing::metadata::LevelFilter) -> Self {
148        use tracing::metadata::LevelFilter as L;
149        if filter == L::OFF {
150            WireLevelFilter::Off
151        } else if filter == L::ERROR {
152            WireLevelFilter::Error
153        } else if filter == L::WARN {
154            WireLevelFilter::Warn
155        } else if filter == L::INFO {
156            WireLevelFilter::Info
157        } else if filter == L::DEBUG {
158            WireLevelFilter::Debug
159        } else {
160            WireLevelFilter::Trace
161        }
162    }
163
164    pub fn to_tracing(self) -> tracing::metadata::LevelFilter {
165        use tracing::metadata::LevelFilter as L;
166        match self {
167            WireLevelFilter::Off => L::OFF,
168            WireLevelFilter::Error => L::ERROR,
169            WireLevelFilter::Warn => L::WARN,
170            WireLevelFilter::Info => L::INFO,
171            WireLevelFilter::Debug => L::DEBUG,
172            WireLevelFilter::Trace => L::TRACE,
173        }
174    }
175}
176
177// ── Request body — every command the client can send ─────────────────────────
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub enum RequestBody {
181    /// Filler used by the framework for control-only frames (cancel / end).
182    Noop,
183    /// Begin streaming closed spans to the client.  Server keeps streaming
184    /// until the client cancels the RPC (drops its handle).
185    StartStream,
186    /// Stop the current stream.  In protosocket-rpc terms, the cleaner way
187    /// is to drop the streaming-RPC handle on the client; this command is
188    /// here for completeness and explicit shutdown.
189    StopStream,
190    /// Per-connection minimum span level filter.  Only spans whose level is
191    /// at least this severe (per `tracing` ordering) are streamed.
192    SetLevel(WireLevel),
193    /// Server-wide cache-recording level.  Mutates the subscriber's
194    /// `LevelPredicate`, so it affects what the cache *records*, not
195    /// just what gets streamed to one client.  The server pushes the
196    /// resulting level back to every connected stream as
197    /// [`ResponseBody::CacheLevel`].
198    SetCacheLevel(WireLevelFilter),
199    /// Server-wide chance percentage `[0.0, 100.0]` that a root
200    /// span passes the cache's `ChancePredicate`.  Out-of-range or
201    /// NaN values are clamped server-side; the resulting effective
202    /// chance is broadcast as [`ResponseBody::CacheChance`].
203    SetCacheChance(f64),
204    /// Sample the stream — `1.0` = every span, `0.5` = ~half.  Applied per
205    /// root-span family so a sampled root drops its whole subtree.
206    SetSamplingRate(f64),
207    /// Substring filter applied to root span name + fields.  Applies
208    /// transitively: if the root matches, descendants stream too; if not,
209    /// none of the family is streamed.
210    SetFilter(Option<String>),
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct Request {
215    pub id: u64,
216    pub control: u8,
217    pub body: RequestBody,
218}
219
220impl Request {
221    pub fn new(body: RequestBody) -> Self {
222        Self {
223            id: 0,
224            control: ProtosocketControlCode::Normal.as_u8(),
225            body,
226        }
227    }
228}
229
230impl Message for Request {
231    fn message_id(&self) -> u64 {
232        self.id
233    }
234    fn control_code(&self) -> ProtosocketControlCode {
235        ProtosocketControlCode::from_u8(self.control)
236    }
237    fn set_message_id(&mut self, id: u64) {
238        self.id = id
239    }
240    fn cancelled(id: u64) -> Self {
241        Self {
242            id,
243            control: ProtosocketControlCode::Cancel.as_u8(),
244            body: RequestBody::Noop,
245        }
246    }
247    fn ended(id: u64) -> Self {
248        Self {
249            id,
250            control: ProtosocketControlCode::End.as_u8(),
251            body: RequestBody::Noop,
252        }
253    }
254}
255
256// ── Response body — what the server can send ─────────────────────────────────
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub enum ResponseBody {
260    /// Filler used by the framework for control-only frames (cancel / end).
261    Noop,
262    /// One closed span snapshot.  The streaming response side of `StartStream`
263    /// emits these one at a time as the host's span cache produces them.
264    Span(WireSpan),
265    /// Server-pushed notification of the current cache-recording level.
266    /// Sent once when a `StartStream` begins and again every time the
267    /// level changes (e.g. another client sent `SetCacheLevel`).  The
268    /// client treats this as the source of truth for its UI display.
269    CacheLevel(WireLevelFilter),
270    /// Server-pushed notification of the current effective chance
271    /// percentage `[0.0, 100.0]` for the cache's `ChancePredicate`.
272    /// Same lifecycle as [`ResponseBody::CacheLevel`]: sent on
273    /// `StartStream` and on every change.
274    CacheChance(f64),
275    /// Acknowledgement of a unary command (Set*, StopStream).
276    Ack,
277    /// Server-side error message for a command.
278    Error(String),
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct Response {
283    pub id: u64,
284    pub control: u8,
285    pub body: ResponseBody,
286}
287
288impl Response {
289    pub fn new(body: ResponseBody) -> Self {
290        Self {
291            id: 0,
292            control: ProtosocketControlCode::Normal.as_u8(),
293            body,
294        }
295    }
296    pub fn ack() -> Self {
297        Self::new(ResponseBody::Ack)
298    }
299    pub fn error(msg: impl Into<String>) -> Self {
300        Self::new(ResponseBody::Error(msg.into()))
301    }
302    pub fn span(s: WireSpan) -> Self {
303        Self::new(ResponseBody::Span(s))
304    }
305    pub fn cache_level(level: WireLevelFilter) -> Self {
306        Self::new(ResponseBody::CacheLevel(level))
307    }
308    pub fn cache_chance(pct: f64) -> Self {
309        Self::new(ResponseBody::CacheChance(pct))
310    }
311    /// Set the message id and return `self` so the call chains.  The
312    /// server must echo the request id on every response so the
313    /// client's completion registry (keyed by id) can route the
314    /// response back to the right pending RPC.  protosocket-rpc does
315    /// NOT auto-assign ids — both endpoints must do it manually, and
316    /// in particular an RPC at id=0 will clobber any other RPC at
317    /// id=0 on the same client connection.
318    pub fn with_id(mut self, id: u64) -> Self {
319        self.id = id;
320        self
321    }
322}
323
324impl Message for Response {
325    fn message_id(&self) -> u64 {
326        self.id
327    }
328    fn control_code(&self) -> ProtosocketControlCode {
329        ProtosocketControlCode::from_u8(self.control)
330    }
331    fn set_message_id(&mut self, id: u64) {
332        self.id = id
333    }
334    fn cancelled(id: u64) -> Self {
335        Self {
336            id,
337            control: ProtosocketControlCode::Cancel.as_u8(),
338            body: ResponseBody::Noop,
339        }
340    }
341    fn ended(id: u64) -> Self {
342        Self {
343            id,
344            control: ProtosocketControlCode::End.as_u8(),
345            body: ResponseBody::Noop,
346        }
347    }
348}