Skip to main content

tracing_console_host/
protocol.rs

1//! Wire protocol between the host (this crate) and the console client.
2//!
3//! Encodes via messagepack (per-message length prefix supplied by
4//! `protosocket-messagepack`).  The protosocket-rpc framework needs each
5//! message to carry an id and a control code, so [`Request`] and [`Response`]
6//! both wrap an enum body next to those framework fields.
7
8use protosocket_rpc::{Message, ProtosocketControlCode};
9use serde::{Deserialize, Serialize};
10
11/// Wire representation of a captured field value.  Mirrors
12/// [`tracing_cache::FieldValue`] but collapses the four string variants
13/// (`Str` / `SmallString` / `SharedString` / `String`) to a single
14/// `Str(String)` — the heap is unavoidable once we cross the network
15/// boundary anyway.
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub enum WireFieldValue {
18    U64(u64),
19    I64(i64),
20    F64(f64),
21    Bool(bool),
22    Str(String),
23}
24
25impl WireFieldValue {
26    /// Render the value as its printable string form.
27    pub fn to_string_value(&self) -> String {
28        match self {
29            WireFieldValue::U64(v) => v.to_string(),
30            WireFieldValue::I64(v) => v.to_string(),
31            WireFieldValue::F64(v) => v.to_string(),
32            WireFieldValue::Bool(v) => v.to_string(),
33            WireFieldValue::Str(s) => s.clone(),
34        }
35    }
36
37    /// Substring-match the printable representation.
38    pub fn contains(&self, needle: &str) -> bool {
39        match self {
40            WireFieldValue::Str(s) => s.contains(needle),
41            other => other.to_string_value().contains(needle),
42        }
43    }
44}
45
46impl std::fmt::Display for WireFieldValue {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            WireFieldValue::U64(v) => write!(f, "{v}"),
50            WireFieldValue::I64(v) => write!(f, "{v}"),
51            WireFieldValue::F64(v) => write!(f, "{v}"),
52            WireFieldValue::Bool(v) => write!(f, "{v}"),
53            WireFieldValue::Str(s) => f.write_str(s),
54        }
55    }
56}
57
58// ── Wire-friendly span / event types ─────────────────────────────────────────
59
60#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
61pub enum WireLevel {
62    Trace,
63    Debug,
64    Info,
65    Warn,
66    Error,
67}
68
69impl WireLevel {
70    pub fn from_tracing(level: &tracing::Level) -> Self {
71        // Match exhaustively so a tracing-core change forces an update here.
72        match *level {
73            tracing::Level::TRACE => WireLevel::Trace,
74            tracing::Level::DEBUG => WireLevel::Debug,
75            tracing::Level::INFO => WireLevel::Info,
76            tracing::Level::WARN => WireLevel::Warn,
77            tracing::Level::ERROR => WireLevel::Error,
78        }
79    }
80
81    pub fn to_tracing(self) -> tracing::Level {
82        match self {
83            WireLevel::Trace => tracing::Level::TRACE,
84            WireLevel::Debug => tracing::Level::DEBUG,
85            WireLevel::Info => tracing::Level::INFO,
86            WireLevel::Warn => tracing::Level::WARN,
87            WireLevel::Error => tracing::Level::ERROR,
88        }
89    }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct WireEvent {
94    pub name: String,
95    pub level: WireLevel,
96    pub fields: Vec<(String, WireFieldValue)>,
97    /// Nanoseconds since the Unix epoch.
98    pub recorded_at_ns: u64,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct WireSpan {
103    pub id: u64,
104    pub parent_id: Option<u64>,
105    pub name: String,
106    pub target: String,
107    pub level: WireLevel,
108    pub fields: Vec<(String, WireFieldValue)>,
109    pub events: Vec<WireEvent>,
110    /// Nanoseconds since the Unix epoch.
111    pub opened_at_ns: u64,
112    /// `None` if still in flight at snapshot time (currently only closed spans
113    /// are streamed, so this is `Some` in practice).
114    pub closed_at_ns: Option<u64>,
115}
116
117impl WireSpan {
118    /// Look up a field by name; O(N) over the typically-small field list.
119    pub fn field(&self, name: &str) -> Option<&WireFieldValue> {
120        self.fields.iter().find(|(k, _)| k == name).map(|(_, v)| v)
121    }
122}
123
124impl WireEvent {
125    pub fn field(&self, name: &str) -> Option<&WireFieldValue> {
126        self.fields.iter().find(|(k, _)| k == name).map(|(_, v)| v)
127    }
128}
129
130// ── Level filter (mirrors tracing::LevelFilter, OFF + 5 levels) ─────────────
131
132/// Wire counterpart to `tracing::level_filters::LevelFilter`.  Includes
133/// `Off` because the cache-side global level can be fully disabled —
134/// distinct from `WireLevel` (which is the per-span/event level and
135/// therefore can't be "off").
136#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
137pub enum WireLevelFilter {
138    Off,
139    Error,
140    Warn,
141    Info,
142    Debug,
143    Trace,
144}
145
146impl WireLevelFilter {
147    pub fn from_tracing(filter: tracing::metadata::LevelFilter) -> Self {
148        use tracing::metadata::LevelFilter as L;
149        if filter == L::OFF {
150            WireLevelFilter::Off
151        } else if filter == L::ERROR {
152            WireLevelFilter::Error
153        } else if filter == L::WARN {
154            WireLevelFilter::Warn
155        } else if filter == L::INFO {
156            WireLevelFilter::Info
157        } else if filter == L::DEBUG {
158            WireLevelFilter::Debug
159        } else {
160            WireLevelFilter::Trace
161        }
162    }
163
164    pub fn to_tracing(self) -> tracing::metadata::LevelFilter {
165        use tracing::metadata::LevelFilter as L;
166        match self {
167            WireLevelFilter::Off => L::OFF,
168            WireLevelFilter::Error => L::ERROR,
169            WireLevelFilter::Warn => L::WARN,
170            WireLevelFilter::Info => L::INFO,
171            WireLevelFilter::Debug => L::DEBUG,
172            WireLevelFilter::Trace => L::TRACE,
173        }
174    }
175}
176
177// ── Request body — every command the client can send ─────────────────────────
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub enum RequestBody {
181    /// Filler used by the framework for control-only frames (cancel / end).
182    Noop,
183    /// Begin streaming closed spans to the client.  Server keeps streaming
184    /// until the client cancels the RPC (drops its handle).
185    StartStream,
186    /// Stop the current stream.  In protosocket-rpc terms, the cleaner way
187    /// is to drop the streaming-RPC handle on the client; this command is
188    /// here for completeness and explicit shutdown.
189    StopStream,
190    /// Per-connection minimum span level filter.  Only spans whose level is
191    /// at least this severe (per `tracing` ordering) are streamed.
192    SetLevel(WireLevel),
193    /// Server-wide cache-recording level.  Mutates the subscriber's
194    /// `LevelPredicate`, so it affects what the cache *records*, not
195    /// just what gets streamed to one client.  The server pushes the
196    /// resulting level back to every connected stream as
197    /// [`ResponseBody::CacheLevel`].
198    SetCacheLevel(WireLevelFilter),
199    /// Server-wide chance percentage `[0.0, 100.0]` that a root
200    /// span passes the cache's `ChancePredicate`.  Out-of-range or
201    /// NaN values are clamped server-side; the resulting effective
202    /// chance is broadcast as [`ResponseBody::CacheChance`].
203    SetCacheChance(f64),
204    /// Sample the stream — `1.0` = every span, `0.5` = ~half.  Applied per
205    /// root-span family so a sampled root drops its whole subtree.
206    SetSamplingRate(f64),
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct Request {
211    pub id: u64,
212    pub control: u8,
213    pub body: RequestBody,
214}
215
216impl Request {
217    pub fn new(body: RequestBody) -> Self {
218        Self {
219            id: 0,
220            control: ProtosocketControlCode::Normal.as_u8(),
221            body,
222        }
223    }
224}
225
226impl Message for Request {
227    fn message_id(&self) -> u64 {
228        self.id
229    }
230    fn control_code(&self) -> ProtosocketControlCode {
231        ProtosocketControlCode::from_u8(self.control)
232    }
233    fn set_message_id(&mut self, id: u64) {
234        self.id = id
235    }
236    fn cancelled(id: u64) -> Self {
237        Self {
238            id,
239            control: ProtosocketControlCode::Cancel.as_u8(),
240            body: RequestBody::Noop,
241        }
242    }
243    fn ended(id: u64) -> Self {
244        Self {
245            id,
246            control: ProtosocketControlCode::End.as_u8(),
247            body: RequestBody::Noop,
248        }
249    }
250}
251
252// ── Response body — what the server can send ─────────────────────────────────
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub enum ResponseBody {
256    /// Filler used by the framework for control-only frames (cancel / end).
257    Noop,
258    /// One closed span snapshot.  The streaming response side of `StartStream`
259    /// emits these one at a time as the host's span cache produces them.
260    Span(WireSpan),
261    /// Server-pushed notification of the current cache-recording level.
262    /// Sent once when a `StartStream` begins and again every time the
263    /// level changes (e.g. another client sent `SetCacheLevel`).  The
264    /// client treats this as the source of truth for its UI display.
265    CacheLevel(WireLevelFilter),
266    /// Server-pushed notification of the current effective chance
267    /// percentage `[0.0, 100.0]` for the cache's `ChancePredicate`.
268    /// Same lifecycle as [`ResponseBody::CacheLevel`]: sent on
269    /// `StartStream` and on every change.
270    CacheChance(f64),
271    /// Acknowledgement of a unary command (Set*, StopStream).
272    Ack,
273    /// Server-side error message for a command.
274    Error(String),
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct Response {
279    pub id: u64,
280    pub control: u8,
281    pub body: ResponseBody,
282}
283
284impl Response {
285    pub fn new(body: ResponseBody) -> Self {
286        Self {
287            id: 0,
288            control: ProtosocketControlCode::Normal.as_u8(),
289            body,
290        }
291    }
292    pub fn ack() -> Self {
293        Self::new(ResponseBody::Ack)
294    }
295    pub fn error(msg: impl Into<String>) -> Self {
296        Self::new(ResponseBody::Error(msg.into()))
297    }
298    pub fn span(s: WireSpan) -> Self {
299        Self::new(ResponseBody::Span(s))
300    }
301    pub fn cache_level(level: WireLevelFilter) -> Self {
302        Self::new(ResponseBody::CacheLevel(level))
303    }
304    pub fn cache_chance(pct: f64) -> Self {
305        Self::new(ResponseBody::CacheChance(pct))
306    }
307    /// Set the message id and return `self` so the call chains.  The
308    /// server must echo the request id on every response so the
309    /// client's completion registry (keyed by id) can route the
310    /// response back to the right pending RPC.  protosocket-rpc does
311    /// NOT auto-assign ids — both endpoints must do it manually, and
312    /// in particular an RPC at id=0 will clobber any other RPC at
313    /// id=0 on the same client connection.
314    pub fn with_id(mut self, id: u64) -> Self {
315        self.id = id;
316        self
317    }
318}
319
320impl Message for Response {
321    fn message_id(&self) -> u64 {
322        self.id
323    }
324    fn control_code(&self) -> ProtosocketControlCode {
325        ProtosocketControlCode::from_u8(self.control)
326    }
327    fn set_message_id(&mut self, id: u64) {
328        self.id = id
329    }
330    fn cancelled(id: u64) -> Self {
331        Self {
332            id,
333            control: ProtosocketControlCode::Cancel.as_u8(),
334            body: ResponseBody::Noop,
335        }
336    }
337    fn ended(id: u64) -> Self {
338        Self {
339            id,
340            control: ProtosocketControlCode::End.as_u8(),
341            body: ResponseBody::Noop,
342        }
343    }
344}