Skip to main content

tracing_console_host/
protocol.rs

1//! Wire protocol between the host (this crate) and the console client.
2//!
3//! Encodes via messagepack (per-message length prefix supplied by
4//! `protosocket-messagepack`).  The protosocket-rpc framework needs each
5//! message to carry an id and a control code, so [`Request`] and [`Response`]
6//! both wrap an enum body next to those framework fields.
7
8use protosocket_rpc::{Message, ProtosocketControlCode};
9use serde::{Deserialize, Serialize};
10
11/// Wire representation of a captured field value.  Mirrors
12/// [`tracing_cache::FieldValue`] but collapses the four string variants
13/// (`Str` / `SmallString` / `SharedString` / `String`) to a single
14/// `Str(String)` — the heap is unavoidable once we cross the network
15/// boundary anyway.
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub enum WireFieldValue {
18    U64(u64),
19    I64(i64),
20    F64(f64),
21    Bool(bool),
22    Str(String),
23}
24
25impl WireFieldValue {
26    /// Render the value as its printable string form.
27    pub fn to_string_value(&self) -> String {
28        match self {
29            WireFieldValue::U64(v) => v.to_string(),
30            WireFieldValue::I64(v) => v.to_string(),
31            WireFieldValue::F64(v) => v.to_string(),
32            WireFieldValue::Bool(v) => v.to_string(),
33            WireFieldValue::Str(s) => s.clone(),
34        }
35    }
36
37    /// Substring-match the printable representation.
38    pub fn contains(&self, needle: &str) -> bool {
39        match self {
40            WireFieldValue::Str(s) => s.contains(needle),
41            other => other.to_string_value().contains(needle),
42        }
43    }
44}
45
46impl std::fmt::Display for WireFieldValue {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            WireFieldValue::U64(v) => write!(f, "{v}"),
50            WireFieldValue::I64(v) => write!(f, "{v}"),
51            WireFieldValue::F64(v) => write!(f, "{v}"),
52            WireFieldValue::Bool(v) => write!(f, "{v}"),
53            WireFieldValue::Str(s) => f.write_str(s),
54        }
55    }
56}
57
58// ── Wire-friendly span / event types ─────────────────────────────────────────
59
60#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
61pub enum WireLevel {
62    Trace,
63    Debug,
64    Info,
65    Warn,
66    Error,
67}
68
69impl WireLevel {
70    pub fn from_tracing(level: &tracing::Level) -> Self {
71        // Match exhaustively so a tracing-core change forces an update here.
72        match *level {
73            tracing::Level::TRACE => WireLevel::Trace,
74            tracing::Level::DEBUG => WireLevel::Debug,
75            tracing::Level::INFO => WireLevel::Info,
76            tracing::Level::WARN => WireLevel::Warn,
77            tracing::Level::ERROR => WireLevel::Error,
78        }
79    }
80
81    pub fn to_tracing(self) -> tracing::Level {
82        match self {
83            WireLevel::Trace => tracing::Level::TRACE,
84            WireLevel::Debug => tracing::Level::DEBUG,
85            WireLevel::Info => tracing::Level::INFO,
86            WireLevel::Warn => tracing::Level::WARN,
87            WireLevel::Error => tracing::Level::ERROR,
88        }
89    }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct WireEvent {
94    pub name: String,
95    pub level: WireLevel,
96    pub fields: Vec<(String, WireFieldValue)>,
97    /// Nanoseconds since the Unix epoch.
98    pub recorded_at_ns: u64,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct WireSpan {
103    pub id: u64,
104    pub parent_id: Option<u64>,
105    pub name: String,
106    pub target: String,
107    pub level: WireLevel,
108    pub fields: Vec<(String, WireFieldValue)>,
109    pub events: Vec<WireEvent>,
110    /// Nanoseconds since the Unix epoch.
111    pub opened_at_ns: u64,
112    /// `None` if still in flight at snapshot time (currently only closed spans
113    /// are streamed, so this is `Some` in practice).
114    pub closed_at_ns: Option<u64>,
115}
116
117impl WireSpan {
118    /// Look up a field by name; O(N) over the typically-small field list.
119    pub fn field(&self, name: &str) -> Option<&WireFieldValue> {
120        self.fields.iter().find(|(k, _)| k == name).map(|(_, v)| v)
121    }
122}
123
124impl WireEvent {
125    pub fn field(&self, name: &str) -> Option<&WireFieldValue> {
126        self.fields.iter().find(|(k, _)| k == name).map(|(_, v)| v)
127    }
128}
129
130// ── Level filter (mirrors tracing::LevelFilter, OFF + 5 levels) ─────────────
131
132/// Wire counterpart to `tracing::level_filters::LevelFilter`.  Includes
133/// `Off` because the cache-side global level can be fully disabled —
134/// distinct from `WireLevel` (which is the per-span/event level and
135/// therefore can't be "off").
136#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
137pub enum WireLevelFilter {
138    Off,
139    Error,
140    Warn,
141    Info,
142    Debug,
143    Trace,
144}
145
146impl WireLevelFilter {
147    pub fn from_tracing(filter: tracing::metadata::LevelFilter) -> Self {
148        use tracing::metadata::LevelFilter as L;
149        if filter == L::OFF {
150            WireLevelFilter::Off
151        } else if filter == L::ERROR {
152            WireLevelFilter::Error
153        } else if filter == L::WARN {
154            WireLevelFilter::Warn
155        } else if filter == L::INFO {
156            WireLevelFilter::Info
157        } else if filter == L::DEBUG {
158            WireLevelFilter::Debug
159        } else {
160            WireLevelFilter::Trace
161        }
162    }
163
164    pub fn to_tracing(self) -> tracing::metadata::LevelFilter {
165        use tracing::metadata::LevelFilter as L;
166        match self {
167            WireLevelFilter::Off => L::OFF,
168            WireLevelFilter::Error => L::ERROR,
169            WireLevelFilter::Warn => L::WARN,
170            WireLevelFilter::Info => L::INFO,
171            WireLevelFilter::Debug => L::DEBUG,
172            WireLevelFilter::Trace => L::TRACE,
173        }
174    }
175}
176
177// ── Request body — every command the client can send ─────────────────────────
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub enum RequestBody {
181    /// Filler used by the framework for control-only frames (cancel / end).
182    Noop,
183    /// Begin streaming closed spans to the client.  Server keeps streaming
184    /// until the client cancels the RPC (drops its handle).
185    StartStream,
186    /// Stop the current stream.  In protosocket-rpc terms, the cleaner way
187    /// is to drop the streaming-RPC handle on the client; this command is
188    /// here for completeness and explicit shutdown.
189    StopStream,
190    /// Per-connection minimum span level filter.  Only spans whose level is
191    /// at least this severe (per `tracing` ordering) are streamed.
192    SetLevel(WireLevel),
193    /// Server-wide cache-recording level.  Mutates the subscriber's
194    /// `LevelPredicate`, so it affects what the cache *records*, not
195    /// just what gets streamed to one client.  The server pushes the
196    /// resulting level back to every connected stream as
197    /// [`ResponseBody::CacheLevel`].
198    SetCacheLevel(WireLevelFilter),
199    /// Server-wide chance percentage `[0.0, 100.0]` that a root
200    /// span passes the cache's `ChancePredicate`.  Out-of-range or
201    /// NaN values are clamped server-side; the resulting effective
202    /// chance is broadcast as [`ResponseBody::CacheChance`].
203    SetCacheChance(f64),
204    /// Sample the stream — `1.0` = every span, `0.5` = ~half.  Applied per
205    /// root-span family so a sampled root drops its whole subtree.
206    SetSamplingRate(f64),
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct Request {
211    pub id: u64,
212    pub control: u8,
213    pub body: RequestBody,
214}
215
216impl Request {
217    pub fn new(body: RequestBody) -> Self {
218        Self {
219            id: 0,
220            control: ProtosocketControlCode::Normal.as_u8(),
221            body,
222        }
223    }
224}
225
226impl Message for Request {
227    fn message_id(&self) -> u64 {
228        self.id
229    }
230    fn control_code(&self) -> ProtosocketControlCode {
231        ProtosocketControlCode::from_u8(self.control)
232    }
233    fn set_message_id(&mut self, id: u64) {
234        self.id = id
235    }
236    fn cancelled(id: u64) -> Self {
237        Self {
238            id,
239            control: ProtosocketControlCode::Cancel.as_u8(),
240            body: RequestBody::Noop,
241        }
242    }
243    fn ended(id: u64) -> Self {
244        Self {
245            id,
246            control: ProtosocketControlCode::End.as_u8(),
247            body: RequestBody::Noop,
248        }
249    }
250}
251
252// ── Response body — what the server can send ─────────────────────────────────
253
254/// One-shot server-pushed handshake describing the host binary the
255/// client is talking to.  Sent as the very first response on every
256/// `StartStream` so the client can verify it's connected to a
257/// compatible version (the host crate's version is workspace-pinned
258/// to the same number as the client binary).  Kept as a struct so
259/// future fields (build sha, supported features, …) don't require a
260/// wire-protocol break.
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
262pub struct WireServerInfo {
263    /// `CARGO_PKG_VERSION` of the `tracing-console-host` crate on the
264    /// server.  Use it to spot mismatched client/server pairs.
265    pub version: String,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub enum ResponseBody {
270    /// Filler used by the framework for control-only frames (cancel / end).
271    Noop,
272    /// First message of every `StartStream` — identifies the server
273    /// binary.  See [`WireServerInfo`].
274    ServerInfo(WireServerInfo),
275    /// One closed span snapshot.  The streaming response side of `StartStream`
276    /// emits these one at a time as the host's span cache produces them.
277    Span(WireSpan),
278    /// Server-pushed notification of the current cache-recording level.
279    /// Sent once when a `StartStream` begins and again every time the
280    /// level changes (e.g. another client sent `SetCacheLevel`).  The
281    /// client treats this as the source of truth for its UI display.
282    CacheLevel(WireLevelFilter),
283    /// Server-pushed notification of the current effective chance
284    /// percentage `[0.0, 100.0]` for the cache's `ChancePredicate`.
285    /// Same lifecycle as [`ResponseBody::CacheLevel`]: sent on
286    /// `StartStream` and on every change.
287    CacheChance(f64),
288    /// Acknowledgement of a unary command (Set*, StopStream).
289    Ack,
290    /// Server-side error message for a command.
291    Error(String),
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct Response {
296    pub id: u64,
297    pub control: u8,
298    pub body: ResponseBody,
299}
300
301impl Response {
302    pub fn new(body: ResponseBody) -> Self {
303        Self {
304            id: 0,
305            control: ProtosocketControlCode::Normal.as_u8(),
306            body,
307        }
308    }
309    pub fn ack() -> Self {
310        Self::new(ResponseBody::Ack)
311    }
312    pub fn error(msg: impl Into<String>) -> Self {
313        Self::new(ResponseBody::Error(msg.into()))
314    }
315    pub fn span(s: WireSpan) -> Self {
316        Self::new(ResponseBody::Span(s))
317    }
318    pub fn cache_level(level: WireLevelFilter) -> Self {
319        Self::new(ResponseBody::CacheLevel(level))
320    }
321    pub fn cache_chance(pct: f64) -> Self {
322        Self::new(ResponseBody::CacheChance(pct))
323    }
324    pub fn server_info(version: impl Into<String>) -> Self {
325        Self::new(ResponseBody::ServerInfo(WireServerInfo {
326            version: version.into(),
327        }))
328    }
329    /// Set the message id and return `self` so the call chains.  The
330    /// server must echo the request id on every response so the
331    /// client's completion registry (keyed by id) can route the
332    /// response back to the right pending RPC.  protosocket-rpc does
333    /// NOT auto-assign ids — both endpoints must do it manually, and
334    /// in particular an RPC at id=0 will clobber any other RPC at
335    /// id=0 on the same client connection.
336    pub fn with_id(mut self, id: u64) -> Self {
337        self.id = id;
338        self
339    }
340}
341
342impl Message for Response {
343    fn message_id(&self) -> u64 {
344        self.id
345    }
346    fn control_code(&self) -> ProtosocketControlCode {
347        ProtosocketControlCode::from_u8(self.control)
348    }
349    fn set_message_id(&mut self, id: u64) {
350        self.id = id
351    }
352    fn cancelled(id: u64) -> Self {
353        Self {
354            id,
355            control: ProtosocketControlCode::Cancel.as_u8(),
356            body: ResponseBody::Noop,
357        }
358    }
359    fn ended(id: u64) -> Self {
360        Self {
361            id,
362            control: ProtosocketControlCode::End.as_u8(),
363            body: ResponseBody::Noop,
364        }
365    }
366}