buswatch_tui/source/
stream.rs

1//! Stream-based data source.
2//!
3//! Receives monitor snapshots from an async byte stream.
4//! This is useful for network-based sources like TCP connections
5//! or message bus subscriptions.
6
7use std::sync::{Arc, Mutex};
8
9use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
10use tokio::sync::mpsc;
11
12use super::{DataSource, Snapshot};
13
14/// A data source that receives monitor snapshots from an async stream.
15///
16/// This source spawns a background task that reads newline-delimited JSON
17/// from the provided async reader and makes snapshots available via `poll()`.
18///
19/// # Example with a byte stream
20///
21/// ```
22/// use std::io::Cursor;
23/// use buswatch_tui::StreamSource;
24///
25/// # tokio_test::block_on(async {
26/// let data = b"{}\n";
27/// let stream = Cursor::new(data.to_vec());
28/// let source = StreamSource::spawn(stream, "example");
29/// # });
30/// ```
31#[derive(Debug)]
32pub struct StreamSource {
33    receiver: mpsc::Receiver<Snapshot>,
34    description: String,
35    last_snapshot: Option<Snapshot>,
36    last_error: Arc<Mutex<Option<String>>>,
37}
38
39impl StreamSource {
40    /// Spawn a background task that reads from the given async reader.
41    ///
42    /// The reader should provide newline-delimited JSON snapshots.
43    /// Each line is parsed as a complete `MonitorSnapshot`.
44    pub fn spawn<R>(reader: R, description: &str) -> Self
45    where
46        R: AsyncRead + Unpin + Send + 'static,
47    {
48        let (tx, rx) = mpsc::channel(16);
49        let last_error = Arc::new(Mutex::new(None));
50        let error_handle = last_error.clone();
51        let desc = description.to_string();
52
53        tokio::spawn(async move {
54            let mut reader = BufReader::new(reader);
55            let mut line = String::new();
56
57            loop {
58                line.clear();
59                match reader.read_line(&mut line).await {
60                    Ok(0) => {
61                        // EOF
62                        *error_handle.lock().unwrap() = Some("Connection closed".to_string());
63                        break;
64                    }
65                    Ok(_) => {
66                        // Try to parse the line as JSON
67                        match serde_json::from_str::<Snapshot>(line.trim()) {
68                            Ok(snapshot) => {
69                                *error_handle.lock().unwrap() = None;
70                                if tx.send(snapshot).await.is_err() {
71                                    // Receiver dropped
72                                    break;
73                                }
74                            }
75                            Err(e) => {
76                                *error_handle.lock().unwrap() = Some(format!("Parse error: {}", e));
77                            }
78                        }
79                    }
80                    Err(e) => {
81                        *error_handle.lock().unwrap() = Some(format!("Read error: {}", e));
82                        break;
83                    }
84                }
85            }
86        });
87
88        Self {
89            receiver: rx,
90            description: format!("stream: {}", desc),
91            last_snapshot: None,
92            last_error,
93        }
94    }
95
96    /// Create a StreamSource from a raw bytes channel.
97    ///
98    /// This is useful when you want to push JSON bytes from another source
99    /// (like a message bus) without using an AsyncRead.
100    ///
101    /// Each `Vec<u8>` sent through the channel should be a complete JSON snapshot.
102    pub fn from_bytes_channel(mut rx: mpsc::Receiver<Vec<u8>>, description: &str) -> Self {
103        let (tx, snapshot_rx) = mpsc::channel(16);
104        let last_error = Arc::new(Mutex::new(None));
105        let error_handle = last_error.clone();
106
107        tokio::spawn(async move {
108            while let Some(bytes) = rx.recv().await {
109                match serde_json::from_slice::<Snapshot>(&bytes) {
110                    Ok(snapshot) => {
111                        *error_handle.lock().unwrap() = None;
112                        if tx.send(snapshot).await.is_err() {
113                            break;
114                        }
115                    }
116                    Err(e) => {
117                        *error_handle.lock().unwrap() = Some(format!("Parse error: {}", e));
118                    }
119                }
120            }
121        });
122
123        Self {
124            receiver: snapshot_rx,
125            description: format!("stream: {}", description),
126            last_snapshot: None,
127            last_error,
128        }
129    }
130}
131
132impl DataSource for StreamSource {
133    fn poll(&mut self) -> Option<Snapshot> {
134        // Try to receive without blocking
135        match self.receiver.try_recv() {
136            Ok(snapshot) => {
137                self.last_snapshot = Some(snapshot.clone());
138                Some(snapshot)
139            }
140            Err(mpsc::error::TryRecvError::Empty) => None,
141            Err(mpsc::error::TryRecvError::Disconnected) => {
142                *self.last_error.lock().unwrap() = Some("Stream disconnected".to_string());
143                None
144            }
145        }
146    }
147
148    fn description(&self) -> &str {
149        &self.description
150    }
151
152    fn error(&self) -> Option<&str> {
153        // This is a bit awkward due to the mutex, but we need interior mutability
154        // for the error state. In practice, this is called infrequently.
155        None // Can't return reference to mutex-guarded data easily
156    }
157}
158
159// Custom error method that returns owned string
160impl StreamSource {
161    /// Get the last error message, if any.
162    pub fn last_error(&self) -> Option<String> {
163        self.last_error.lock().unwrap().clone()
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use std::io::Cursor;
171
172    fn sample_json() -> &'static str {
173        r#"{"version":{"major":1,"minor":0},"timestamp_ms":1703160000000,"modules":{"TestModule":{"reads":{"input":{"count":100}},"writes":{"output":{"count":50}}}}}"#
174    }
175
176    #[tokio::test]
177    async fn test_stream_source_spawn() {
178        // Create a cursor with newline-delimited JSON
179        let data = format!("{}\n", sample_json());
180        let cursor = Cursor::new(data);
181
182        let mut source = StreamSource::spawn(cursor, "test");
183
184        // Give the background task time to process
185        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
186
187        // Should receive the snapshot
188        let snapshot = source.poll();
189        assert!(snapshot.is_some());
190        assert!(snapshot.unwrap().modules.contains_key("TestModule"));
191    }
192
193    #[tokio::test]
194    async fn test_stream_source_multiple_snapshots() {
195        let data = format!("{}\n{}\n", sample_json(), sample_json());
196        let cursor = Cursor::new(data);
197
198        let mut source = StreamSource::spawn(cursor, "test");
199
200        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
201
202        // Should receive both snapshots
203        let s1 = source.poll();
204        let s2 = source.poll();
205        assert!(s1.is_some());
206        assert!(s2.is_some());
207
208        // No more data
209        assert!(source.poll().is_none());
210    }
211
212    #[tokio::test]
213    async fn test_stream_source_description() {
214        let cursor = Cursor::new("");
215        let source = StreamSource::spawn(cursor, "tcp://localhost:9090");
216        assert_eq!(source.description(), "stream: tcp://localhost:9090");
217    }
218
219    #[tokio::test]
220    async fn test_stream_source_from_bytes_channel() {
221        let (tx, rx) = mpsc::channel::<Vec<u8>>(16);
222        let mut source = StreamSource::from_bytes_channel(rx, "test-channel");
223
224        // Send a snapshot
225        tx.send(sample_json().as_bytes().to_vec()).await.unwrap();
226
227        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
228
229        let snapshot = source.poll();
230        assert!(snapshot.is_some());
231        assert!(snapshot.unwrap().modules.contains_key("TestModule"));
232    }
233
234    #[tokio::test]
235    async fn test_stream_source_invalid_json() {
236        // Include valid JSON after invalid to keep the stream processing
237        let data = "not valid json\n";
238        let cursor = Cursor::new(data);
239
240        let mut source = StreamSource::spawn(cursor, "test");
241
242        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
243
244        // Should not receive anything (invalid JSON is skipped)
245        assert!(source.poll().is_none());
246
247        // Error may be overwritten by "Connection closed" after EOF,
248        // so we just verify no valid snapshot was received
249    }
250
251    #[tokio::test]
252    async fn test_stream_source_empty_stream() {
253        let cursor = Cursor::new("");
254        let mut source = StreamSource::spawn(cursor, "test");
255
256        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
257
258        // No data to receive
259        assert!(source.poll().is_none());
260    }
261}