buswatch_tui/source/
stream.rs1use std::sync::{Arc, Mutex};
8
9use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
10use tokio::sync::mpsc;
11
12use super::{DataSource, Snapshot};
13
14#[derive(Debug)]
32pub struct StreamSource {
33 receiver: mpsc::Receiver<Snapshot>,
34 description: String,
35 last_snapshot: Option<Snapshot>,
36 last_error: Arc<Mutex<Option<String>>>,
37}
38
39impl StreamSource {
40 pub fn spawn<R>(reader: R, description: &str) -> Self
45 where
46 R: AsyncRead + Unpin + Send + 'static,
47 {
48 let (tx, rx) = mpsc::channel(16);
49 let last_error = Arc::new(Mutex::new(None));
50 let error_handle = last_error.clone();
51 let desc = description.to_string();
52
53 tokio::spawn(async move {
54 let mut reader = BufReader::new(reader);
55 let mut line = String::new();
56
57 loop {
58 line.clear();
59 match reader.read_line(&mut line).await {
60 Ok(0) => {
61 *error_handle.lock().unwrap() = Some("Connection closed".to_string());
63 break;
64 }
65 Ok(_) => {
66 match serde_json::from_str::<Snapshot>(line.trim()) {
68 Ok(snapshot) => {
69 *error_handle.lock().unwrap() = None;
70 if tx.send(snapshot).await.is_err() {
71 break;
73 }
74 }
75 Err(e) => {
76 *error_handle.lock().unwrap() = Some(format!("Parse error: {}", e));
77 }
78 }
79 }
80 Err(e) => {
81 *error_handle.lock().unwrap() = Some(format!("Read error: {}", e));
82 break;
83 }
84 }
85 }
86 });
87
88 Self {
89 receiver: rx,
90 description: format!("stream: {}", desc),
91 last_snapshot: None,
92 last_error,
93 }
94 }
95
96 pub fn from_bytes_channel(mut rx: mpsc::Receiver<Vec<u8>>, description: &str) -> Self {
103 let (tx, snapshot_rx) = mpsc::channel(16);
104 let last_error = Arc::new(Mutex::new(None));
105 let error_handle = last_error.clone();
106
107 tokio::spawn(async move {
108 while let Some(bytes) = rx.recv().await {
109 match serde_json::from_slice::<Snapshot>(&bytes) {
110 Ok(snapshot) => {
111 *error_handle.lock().unwrap() = None;
112 if tx.send(snapshot).await.is_err() {
113 break;
114 }
115 }
116 Err(e) => {
117 *error_handle.lock().unwrap() = Some(format!("Parse error: {}", e));
118 }
119 }
120 }
121 });
122
123 Self {
124 receiver: snapshot_rx,
125 description: format!("stream: {}", description),
126 last_snapshot: None,
127 last_error,
128 }
129 }
130}
131
132impl DataSource for StreamSource {
133 fn poll(&mut self) -> Option<Snapshot> {
134 match self.receiver.try_recv() {
136 Ok(snapshot) => {
137 self.last_snapshot = Some(snapshot.clone());
138 Some(snapshot)
139 }
140 Err(mpsc::error::TryRecvError::Empty) => None,
141 Err(mpsc::error::TryRecvError::Disconnected) => {
142 *self.last_error.lock().unwrap() = Some("Stream disconnected".to_string());
143 None
144 }
145 }
146 }
147
148 fn description(&self) -> &str {
149 &self.description
150 }
151
152 fn error(&self) -> Option<&str> {
153 None }
157}
158
159impl StreamSource {
161 pub fn last_error(&self) -> Option<String> {
163 self.last_error.lock().unwrap().clone()
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use std::io::Cursor;
171
172 fn sample_json() -> &'static str {
173 r#"{"version":{"major":1,"minor":0},"timestamp_ms":1703160000000,"modules":{"TestModule":{"reads":{"input":{"count":100}},"writes":{"output":{"count":50}}}}}"#
174 }
175
176 #[tokio::test]
177 async fn test_stream_source_spawn() {
178 let data = format!("{}\n", sample_json());
180 let cursor = Cursor::new(data);
181
182 let mut source = StreamSource::spawn(cursor, "test");
183
184 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
186
187 let snapshot = source.poll();
189 assert!(snapshot.is_some());
190 assert!(snapshot.unwrap().modules.contains_key("TestModule"));
191 }
192
193 #[tokio::test]
194 async fn test_stream_source_multiple_snapshots() {
195 let data = format!("{}\n{}\n", sample_json(), sample_json());
196 let cursor = Cursor::new(data);
197
198 let mut source = StreamSource::spawn(cursor, "test");
199
200 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
201
202 let s1 = source.poll();
204 let s2 = source.poll();
205 assert!(s1.is_some());
206 assert!(s2.is_some());
207
208 assert!(source.poll().is_none());
210 }
211
212 #[tokio::test]
213 async fn test_stream_source_description() {
214 let cursor = Cursor::new("");
215 let source = StreamSource::spawn(cursor, "tcp://localhost:9090");
216 assert_eq!(source.description(), "stream: tcp://localhost:9090");
217 }
218
219 #[tokio::test]
220 async fn test_stream_source_from_bytes_channel() {
221 let (tx, rx) = mpsc::channel::<Vec<u8>>(16);
222 let mut source = StreamSource::from_bytes_channel(rx, "test-channel");
223
224 tx.send(sample_json().as_bytes().to_vec()).await.unwrap();
226
227 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
228
229 let snapshot = source.poll();
230 assert!(snapshot.is_some());
231 assert!(snapshot.unwrap().modules.contains_key("TestModule"));
232 }
233
234 #[tokio::test]
235 async fn test_stream_source_invalid_json() {
236 let data = "not valid json\n";
238 let cursor = Cursor::new(data);
239
240 let mut source = StreamSource::spawn(cursor, "test");
241
242 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
243
244 assert!(source.poll().is_none());
246
247 }
250
251 #[tokio::test]
252 async fn test_stream_source_empty_stream() {
253 let cursor = Cursor::new("");
254 let mut source = StreamSource::spawn(cursor, "test");
255
256 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
257
258 assert!(source.poll().is_none());
260 }
261}