1use crate::error::{Error, Result};
4use crate::reader::read_file_content;
5use crate::watcher::{FileWatcher, is_event_relevant_to_file};
6use futures::Stream;
7use std::path::{Path, PathBuf};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12
13pub struct LogStream {
15 receiver: mpsc::UnboundedReceiver<Result<Vec<String>>>,
16 _shutdown_tx: broadcast::Sender<()>,
17 _task_handle: JoinHandle<()>,
18}
19
20impl LogStream {
21 pub async fn new<P: AsRef<Path>>(path: P, separator: Option<String>) -> Result<Self> {
23 let file_path = path.as_ref().to_path_buf();
24 let separator = separator.unwrap_or_else(|| "\n".to_string());
25
26 let (tx, rx) = mpsc::unbounded_channel();
27 let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
28
29 let task_file_path = file_path.clone();
31 let task_separator = separator.clone();
32 let task_tx = tx.clone();
33
34 let task_handle = tokio::spawn(async move {
35 if let Err(e) =
36 file_reader_task(task_file_path, task_separator, task_tx, shutdown_rx).await
37 {
38 eprintln!("File reader task error: {}", e);
40 }
41 });
42
43 Ok(LogStream {
44 receiver: rx,
45 _shutdown_tx: shutdown_tx,
46 _task_handle: task_handle,
47 })
48 }
49
50 #[cfg(test)]
52 pub fn is_closed(&self) -> bool {
53 self.receiver.is_closed()
54 }
55}
56
57impl Drop for LogStream {
58 fn drop(&mut self) {
59 let _ = self._shutdown_tx.send(());
61
62 }
65}
66
67async fn file_reader_task(
69 file_path: PathBuf,
70 separator: String,
71 tx: mpsc::UnboundedSender<Result<Vec<String>>>,
72 mut shutdown_rx: broadcast::Receiver<()>,
73) -> Result<()> {
74 let mut last_position = 0u64;
75
76 if file_path.exists() {
78 if let Err(e) = read_file_content(&file_path, &mut last_position, &separator, &tx).await {
79 let _ = tx.send(Err(e));
80 return Ok(());
81 }
82 }
83
84 let mut watcher = FileWatcher::new(&file_path)?;
86 watcher.start_watching()?;
87
88 let file_name = file_path
90 .file_name()
91 .map(|name| name.to_string_lossy().to_string())
92 .unwrap_or_default();
93
94 loop {
96 tokio::select! {
97 _ = shutdown_rx.recv() => {
99 break;
101 }
102
103 event = watcher.next_event() => {
105 match event {
106 Some(Ok(event)) => {
107 if is_event_relevant_to_file(&event, &file_name) {
109 if let Err(e) = read_file_content(&file_path, &mut last_position, &separator, &tx).await {
110 let _ = tx.send(Err(e));
111 break;
112 }
113 }
114 }
115 Some(Err(e)) => {
116 let _ = tx.send(Err(Error::Watcher(e)));
117 break;
118 }
119 None => {
120 break;
122 }
123 }
124 }
125 }
126 }
127
128 Ok(())
129}
130
131impl Stream for LogStream {
132 type Item = Result<Vec<String>>;
133
134 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
135 Pin::new(&mut self.receiver).poll_recv(cx)
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use std::time::Duration;
143 use tokio_stream::StreamExt;
144
145 #[tokio::test]
146 async fn test_log_stream_creation() {
147 let stream = LogStream::new("fixtures/simple_append.log", None).await;
148 assert!(stream.is_ok());
149
150 let stream = stream.unwrap();
151 assert!(!stream.is_closed());
152 }
153
154 #[tokio::test]
155 async fn test_log_stream_creation_with_custom_separator() {
156 let stream =
157 LogStream::new("fixtures/different_separators.log", Some("|".to_string())).await;
158 assert!(stream.is_ok());
159
160 let stream = stream.unwrap();
161 assert!(!stream.is_closed());
162 }
163
164 #[tokio::test]
165 async fn test_log_stream_creation_nonexistent_file() {
166 let stream = LogStream::new("fixtures/nonexistent.log", None).await;
167 assert!(stream.is_ok());
168
169 let stream = stream.unwrap();
170 assert!(!stream.is_closed());
171 }
172
173 #[tokio::test]
174 async fn test_log_stream_graceful_shutdown_on_drop() {
175 let mut stream = LogStream::new("fixtures/simple_append.log", None)
176 .await
177 .unwrap();
178
179 let first_item = tokio::time::timeout(Duration::from_millis(100), stream.next()).await;
181 assert!(first_item.is_ok());
182
183 drop(stream);
185
186 tokio::time::sleep(Duration::from_millis(10)).await;
188
189 }
191
192 #[tokio::test]
193 async fn test_log_stream_multiple_streams_independence() {
194 let stream1 = LogStream::new("fixtures/simple_append.log", None)
195 .await
196 .unwrap();
197 let stream2 = LogStream::new("fixtures/simple_append.log", None)
198 .await
199 .unwrap();
200
201 assert!(!stream1.is_closed());
202 assert!(!stream2.is_closed());
203
204 drop(stream1);
206
207 assert!(!stream2.is_closed());
209
210 drop(stream2);
211 }
212
213 #[tokio::test]
214 async fn test_log_stream_reading_existing_content() {
215 let mut stream = LogStream::new("fixtures/simple_append.log", None)
216 .await
217 .unwrap();
218
219 let items = collect_stream_items(&mut stream, 5, Duration::from_millis(100)).await;
221
222 assert!(!items.is_empty());
223 assert!(items[0].len() > 0);
225 assert!(items[0][0].contains("Starting application"));
226 }
227
228 #[tokio::test]
229 async fn test_log_stream_with_custom_separator() {
230 let mut stream = LogStream::new("fixtures/different_separators.log", Some("|".to_string()))
231 .await
232 .unwrap();
233
234 let items = collect_stream_items(&mut stream, 3, Duration::from_millis(100)).await;
235
236 assert!(!items.is_empty());
237 assert_eq!(items.len(), 1);
239 let lines = &items[0];
240 assert!(lines.len() > 1);
241 }
242
243 #[tokio::test]
244 async fn test_log_stream_empty_file() {
245 let mut stream = LogStream::new("fixtures/empty.log", None).await.unwrap();
246
247 let items = collect_stream_items(&mut stream, 1, Duration::from_millis(50)).await;
249 assert!(items.is_empty());
250 }
251
252 #[tokio::test]
253 async fn test_file_reader_task_shutdown_signal() {
254 let file_path = PathBuf::from("fixtures/simple_append.log");
255 let separator = "\n".to_string();
256 let (tx, mut rx) = mpsc::unbounded_channel();
257 let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
258
259 let task_handle =
261 tokio::spawn(
262 async move { file_reader_task(file_path, separator, tx, shutdown_rx).await },
263 );
264
265 tokio::time::sleep(Duration::from_millis(10)).await;
267
268 let _ = shutdown_tx.send(());
270
271 let result = tokio::time::timeout(Duration::from_millis(100), task_handle).await;
273 assert!(result.is_ok());
274 assert!(result.unwrap().is_ok());
275
276 let mut message_count = 0;
278 while rx.try_recv().is_ok() {
279 message_count += 1;
280 }
281 assert!(message_count > 0);
282 }
283
284 #[tokio::test]
285 async fn test_file_reader_task_error_handling() {
286 let file_path = PathBuf::from("/invalid/path/that/does/not/exist.log");
287 let separator = "\n".to_string();
288 let (tx, mut rx) = mpsc::unbounded_channel();
289 let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);
290
291 let result = file_reader_task(file_path, separator, tx, shutdown_rx).await;
293
294 assert!(result.is_ok() || result.is_err());
296
297 while rx.try_recv().is_ok() {
299 }
301 }
302
303 async fn collect_stream_items(
305 stream: &mut LogStream,
306 max_items: usize,
307 timeout: Duration,
308 ) -> Vec<Vec<String>> {
309 let mut items = Vec::new();
310 let start = tokio::time::Instant::now();
311
312 while items.len() < max_items && start.elapsed() < timeout {
313 match tokio::time::timeout(Duration::from_millis(10), stream.next()).await {
314 Ok(Some(Ok(item))) => items.push(item),
315 Ok(Some(Err(_))) => break, Ok(None) => break, Err(_) => break, }
319 }
320
321 items
322 }
323}