Skip to main content

nucleus_trace/
server.rs

1//! The trace daemon: decode pipeline + WebSocket fan-out.
2//!
3//! A [`TraceServer`] owns the ITM decoder and the [`Translator`]. Bytes pushed
4//! in via [`TraceServer::ingest`] are decoded, translated to [`TraceEvent`]s,
5//! serialized to JSON, and broadcast to every connected WebSocket client. The
6//! server is transport-agnostic: bytes can come from OpenOCD, a TCP trace port,
7//! or a captured-file replay (see [`crate::source`]).
8
9use std::net::SocketAddr;
10use std::sync::{Arc, Mutex};
11
12use futures_util::{SinkExt, StreamExt};
13use nucleus_itm::Decoder;
14use tokio::io::{AsyncRead, AsyncReadExt};
15use tokio::net::{TcpListener, TcpStream};
16use tokio::sync::broadcast;
17use tokio::task::JoinHandle;
18use tokio_tungstenite::tungstenite::Message;
19
20use crate::translate::{TraceEvent, Translator, VariableMap};
21
22/// Capacity of the per-client broadcast queue. A slow client that falls this far
23/// behind drops messages (logged as a lag) rather than stalling the pipeline.
24const BROADCAST_CAPACITY: usize = 4096;
25
26struct Pipeline {
27    decoder: Decoder,
28    translator: Translator,
29}
30
31/// The shared trace server. Clone the `Arc`, not the server.
32pub struct TraceServer {
33    tx: broadcast::Sender<String>,
34    pipeline: Mutex<Pipeline>,
35}
36
37impl TraceServer {
38    /// Build a server that decodes variables according to `vars`.
39    pub fn new(vars: VariableMap) -> Arc<TraceServer> {
40        let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
41        Arc::new(TraceServer {
42            tx,
43            pipeline: Mutex::new(Pipeline {
44                decoder: Decoder::new(),
45                translator: Translator::new(vars),
46            }),
47        })
48    }
49
50    /// Subscribe to the JSON event stream (one JSON object per message).
51    pub fn subscribe(&self) -> broadcast::Receiver<String> {
52        self.tx.subscribe()
53    }
54
55    /// Decode `bytes`, broadcast the resulting events, and return them (handy
56    /// for tests and for callers that want to log locally).
57    pub fn ingest(&self, bytes: &[u8]) -> Vec<TraceEvent> {
58        let events = {
59            let mut p = self.pipeline.lock().expect("pipeline mutex poisoned");
60            let packets = p.decoder.decode(bytes);
61            let mut events = Vec::new();
62            for packet in &packets {
63                events.extend(p.translator.translate(packet));
64            }
65            events
66        };
67        self.broadcast(&events);
68        events
69    }
70
71    /// Emit any buffered partial log line. Call when the source ends.
72    pub fn flush(&self) -> Vec<TraceEvent> {
73        let events = {
74            let mut p = self.pipeline.lock().expect("pipeline mutex poisoned");
75            p.translator.flush()
76        };
77        self.broadcast(&events);
78        events
79    }
80
81    fn broadcast(&self, events: &[TraceEvent]) {
82        for event in events {
83            if let Ok(json) = serde_json::to_string(event) {
84                // `send` errors only when there are no subscribers; ignore.
85                let _ = self.tx.send(json);
86            }
87        }
88    }
89
90    /// Bind a WebSocket listener on `addr` and accept clients in the background.
91    /// Returns the bound address (useful when `addr` uses port 0).
92    pub async fn serve_ws(
93        self: &Arc<Self>,
94        addr: &str,
95    ) -> std::io::Result<(SocketAddr, JoinHandle<()>)> {
96        let listener = TcpListener::bind(addr).await?;
97        let local = listener.local_addr()?;
98        let server = Arc::clone(self);
99        let handle = tokio::spawn(async move {
100            while let Ok((stream, _peer)) = listener.accept().await {
101                let rx = server.subscribe();
102                tokio::spawn(client_loop(stream, rx));
103            }
104        });
105        Ok((local, handle))
106    }
107
108    /// Read SWO bytes from `reader` until EOF, feeding the pipeline.
109    pub async fn run_source<R: AsyncRead + Unpin>(
110        self: &Arc<Self>,
111        mut reader: R,
112    ) -> std::io::Result<()> {
113        let mut buf = [0u8; 4096];
114        loop {
115            let n = reader.read(&mut buf).await?;
116            if n == 0 {
117                break;
118            }
119            self.ingest(&buf[..n]);
120        }
121        self.flush();
122        Ok(())
123    }
124}
125
126/// Forward broadcast events to one WebSocket client until it disconnects.
127async fn client_loop(stream: TcpStream, mut rx: broadcast::Receiver<String>) {
128    let Ok(ws) = tokio_tungstenite::accept_async(stream).await else {
129        return;
130    };
131    let (mut sink, mut source) = ws.split();
132    loop {
133        tokio::select! {
134            event = rx.recv() => match event {
135                Ok(text) => {
136                    if sink.send(Message::Text(text)).await.is_err() {
137                        break;
138                    }
139                }
140                // Slow consumer: skip dropped messages and keep going.
141                Err(broadcast::error::RecvError::Lagged(_)) => continue,
142                Err(broadcast::error::RecvError::Closed) => break,
143            },
144            incoming = source.next() => match incoming {
145                Some(Ok(Message::Close(_))) | None => break,
146                Some(Err(_)) => break,
147                Some(Ok(_)) => {} // clients are read-only; ignore their messages
148            },
149        }
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[tokio::test]
158    async fn ingest_broadcasts_json_to_a_subscriber() {
159        let mut vars = VariableMap::new();
160        vars.insert(1, "temperature", crate::translate::VarType::F32);
161        let server = TraceServer::new(vars);
162        let mut rx = server.subscribe();
163
164        // Port 0 "ok\n" then a port-1 f32.
165        server.ingest(b"\x01o\x01k\x01\n");
166        let mut stream = vec![0x0Bu8];
167        stream.extend_from_slice(&2.0f32.to_le_bytes());
168        server.ingest(&stream);
169
170        let log: serde_json::Value = serde_json::from_str(&rx.recv().await.unwrap()).unwrap();
171        assert_eq!(log["kind"], "log");
172        assert_eq!(log["message"], "ok");
173
174        let var: serde_json::Value = serde_json::from_str(&rx.recv().await.unwrap()).unwrap();
175        assert_eq!(var["kind"], "variable");
176        assert_eq!(var["name"], "temperature");
177        assert_eq!(var["value"], 2.0);
178    }
179}