1use std::net::SocketAddr;
10use std::sync::{Arc, Mutex};
11
12use futures_util::{SinkExt, StreamExt};
13use nucleus_itm::Decoder;
14use tokio::io::{AsyncRead, AsyncReadExt};
15use tokio::net::{TcpListener, TcpStream};
16use tokio::sync::broadcast;
17use tokio::task::JoinHandle;
18use tokio_tungstenite::tungstenite::Message;
19
20use crate::translate::{TraceEvent, Translator, VariableMap};
21
22const BROADCAST_CAPACITY: usize = 4096;
25
26struct Pipeline {
27 decoder: Decoder,
28 translator: Translator,
29}
30
31pub struct TraceServer {
33 tx: broadcast::Sender<String>,
34 pipeline: Mutex<Pipeline>,
35}
36
37impl TraceServer {
38 pub fn new(vars: VariableMap) -> Arc<TraceServer> {
40 let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
41 Arc::new(TraceServer {
42 tx,
43 pipeline: Mutex::new(Pipeline {
44 decoder: Decoder::new(),
45 translator: Translator::new(vars),
46 }),
47 })
48 }
49
50 pub fn subscribe(&self) -> broadcast::Receiver<String> {
52 self.tx.subscribe()
53 }
54
55 pub fn ingest(&self, bytes: &[u8]) -> Vec<TraceEvent> {
58 let events = {
59 let mut p = self.pipeline.lock().expect("pipeline mutex poisoned");
60 let packets = p.decoder.decode(bytes);
61 let mut events = Vec::new();
62 for packet in &packets {
63 events.extend(p.translator.translate(packet));
64 }
65 events
66 };
67 self.broadcast(&events);
68 events
69 }
70
71 pub fn flush(&self) -> Vec<TraceEvent> {
73 let events = {
74 let mut p = self.pipeline.lock().expect("pipeline mutex poisoned");
75 p.translator.flush()
76 };
77 self.broadcast(&events);
78 events
79 }
80
81 fn broadcast(&self, events: &[TraceEvent]) {
82 for event in events {
83 if let Ok(json) = serde_json::to_string(event) {
84 let _ = self.tx.send(json);
86 }
87 }
88 }
89
90 pub async fn serve_ws(
93 self: &Arc<Self>,
94 addr: &str,
95 ) -> std::io::Result<(SocketAddr, JoinHandle<()>)> {
96 let listener = TcpListener::bind(addr).await?;
97 let local = listener.local_addr()?;
98 let server = Arc::clone(self);
99 let handle = tokio::spawn(async move {
100 while let Ok((stream, _peer)) = listener.accept().await {
101 let rx = server.subscribe();
102 tokio::spawn(client_loop(stream, rx));
103 }
104 });
105 Ok((local, handle))
106 }
107
108 pub async fn run_source<R: AsyncRead + Unpin>(
110 self: &Arc<Self>,
111 mut reader: R,
112 ) -> std::io::Result<()> {
113 let mut buf = [0u8; 4096];
114 loop {
115 let n = reader.read(&mut buf).await?;
116 if n == 0 {
117 break;
118 }
119 self.ingest(&buf[..n]);
120 }
121 self.flush();
122 Ok(())
123 }
124}
125
126async fn client_loop(stream: TcpStream, mut rx: broadcast::Receiver<String>) {
128 let Ok(ws) = tokio_tungstenite::accept_async(stream).await else {
129 return;
130 };
131 let (mut sink, mut source) = ws.split();
132 loop {
133 tokio::select! {
134 event = rx.recv() => match event {
135 Ok(text) => {
136 if sink.send(Message::Text(text)).await.is_err() {
137 break;
138 }
139 }
140 Err(broadcast::error::RecvError::Lagged(_)) => continue,
142 Err(broadcast::error::RecvError::Closed) => break,
143 },
144 incoming = source.next() => match incoming {
145 Some(Ok(Message::Close(_))) | None => break,
146 Some(Err(_)) => break,
147 Some(Ok(_)) => {} },
149 }
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[tokio::test]
158 async fn ingest_broadcasts_json_to_a_subscriber() {
159 let mut vars = VariableMap::new();
160 vars.insert(1, "temperature", crate::translate::VarType::F32);
161 let server = TraceServer::new(vars);
162 let mut rx = server.subscribe();
163
164 server.ingest(b"\x01o\x01k\x01\n");
166 let mut stream = vec![0x0Bu8];
167 stream.extend_from_slice(&2.0f32.to_le_bytes());
168 server.ingest(&stream);
169
170 let log: serde_json::Value = serde_json::from_str(&rx.recv().await.unwrap()).unwrap();
171 assert_eq!(log["kind"], "log");
172 assert_eq!(log["message"], "ok");
173
174 let var: serde_json::Value = serde_json::from_str(&rx.recv().await.unwrap()).unwrap();
175 assert_eq!(var["kind"], "variable");
176 assert_eq!(var["name"], "temperature");
177 assert_eq!(var["value"], 2.0);
178 }
179}