log_reader/
stream.rs

1//! Stream implementation for reading log files with real-time monitoring.
2
3use crate::error::{Error, Result};
4use crate::reader::read_file_content;
5use crate::watcher::{FileWatcher, is_event_relevant_to_file};
6use futures::Stream;
7use std::path::{Path, PathBuf};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12
13/// A stream that monitors a file for changes and yields new content.
14pub struct LogStream {
15    receiver: mpsc::UnboundedReceiver<Result<Vec<String>>>,
16    _shutdown_tx: broadcast::Sender<()>,
17    _task_handle: JoinHandle<()>,
18}
19
20impl LogStream {
21    /// Creates a new LogStream for the specified file.
22    pub async fn new<P: AsRef<Path>>(path: P, separator: Option<String>) -> Result<Self> {
23        let file_path = path.as_ref().to_path_buf();
24        let separator = separator.unwrap_or_else(|| "\n".to_string());
25
26        let (tx, rx) = mpsc::unbounded_channel();
27        let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
28
29        // Spawn background task to handle file watching and reading
30        let task_file_path = file_path.clone();
31        let task_separator = separator.clone();
32        let task_tx = tx.clone();
33
34        let task_handle = tokio::spawn(async move {
35            if let Err(e) =
36                file_reader_task(task_file_path, task_separator, task_tx, shutdown_rx).await
37            {
38                // Log error or send it through channel
39                eprintln!("File reader task error: {}", e);
40            }
41        });
42
43        Ok(LogStream {
44            receiver: rx,
45            _shutdown_tx: shutdown_tx,
46            _task_handle: task_handle,
47        })
48    }
49
50    /// Check if the stream has been closed/dropped
51    #[cfg(test)]
52    pub fn is_closed(&self) -> bool {
53        self.receiver.is_closed()
54    }
55}
56
57impl Drop for LogStream {
58    fn drop(&mut self) {
59        // Send shutdown signal - ignore errors if already dropped or no receivers
60        let _ = self._shutdown_tx.send(());
61
62        // The task handle will be automatically aborted when it's dropped,
63        // but we've also sent a graceful shutdown signal for clean cleanup
64    }
65}
66
67/// Background task that handles file watching and reading
68async fn file_reader_task(
69    file_path: PathBuf,
70    separator: String,
71    tx: mpsc::UnboundedSender<Result<Vec<String>>>,
72    mut shutdown_rx: broadcast::Receiver<()>,
73) -> Result<()> {
74    let mut last_position = 0u64;
75
76    // Read existing content in the file.
77    if file_path.exists() {
78        if let Err(e) = read_file_content(&file_path, &mut last_position, &separator, &tx).await {
79            let _ = tx.send(Err(e));
80            return Ok(());
81        }
82    }
83
84    // Now start watching for future changes
85    let mut watcher = FileWatcher::new(&file_path)?;
86    watcher.start_watching()?;
87
88    // Get the file name for filtering
89    let file_name = file_path
90        .file_name()
91        .map(|name| name.to_string_lossy().to_string())
92        .unwrap_or_default();
93
94    // Watch for file changes
95    loop {
96        tokio::select! {
97            // Check for shutdown signal
98            _ = shutdown_rx.recv() => {
99                // Graceful shutdown requested
100                break;
101            }
102
103            // Process file events
104            event = watcher.next_event() => {
105                match event {
106                    Some(Ok(event)) => {
107                        // Filter events to only include those affecting our target file
108                        if is_event_relevant_to_file(&event, &file_name) {
109                            if let Err(e) = read_file_content(&file_path, &mut last_position, &separator, &tx).await {
110                                let _ = tx.send(Err(e));
111                                break;
112                            }
113                        }
114                    }
115                    Some(Err(e)) => {
116                        let _ = tx.send(Err(Error::Watcher(e)));
117                        break;
118                    }
119                    None => {
120                        // Watcher closed, shutdown gracefully
121                        break;
122                    }
123                }
124            }
125        }
126    }
127
128    Ok(())
129}
130
131impl Stream for LogStream {
132    type Item = Result<Vec<String>>;
133
134    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
135        Pin::new(&mut self.receiver).poll_recv(cx)
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use std::time::Duration;
143    use tokio_stream::StreamExt;
144
145    #[tokio::test]
146    async fn test_log_stream_creation() {
147        let stream = LogStream::new("fixtures/simple_append.log", None).await;
148        assert!(stream.is_ok());
149
150        let stream = stream.unwrap();
151        assert!(!stream.is_closed());
152    }
153
154    #[tokio::test]
155    async fn test_log_stream_creation_with_custom_separator() {
156        let stream =
157            LogStream::new("fixtures/different_separators.log", Some("|".to_string())).await;
158        assert!(stream.is_ok());
159
160        let stream = stream.unwrap();
161        assert!(!stream.is_closed());
162    }
163
164    #[tokio::test]
165    async fn test_log_stream_creation_nonexistent_file() {
166        let stream = LogStream::new("fixtures/nonexistent.log", None).await;
167        assert!(stream.is_ok());
168
169        let stream = stream.unwrap();
170        assert!(!stream.is_closed());
171    }
172
173    #[tokio::test]
174    async fn test_log_stream_graceful_shutdown_on_drop() {
175        let mut stream = LogStream::new("fixtures/simple_append.log", None)
176            .await
177            .unwrap();
178
179        // Consume one item to ensure the stream is active
180        let first_item = tokio::time::timeout(Duration::from_millis(100), stream.next()).await;
181        assert!(first_item.is_ok());
182
183        // Drop the stream
184        drop(stream);
185
186        // Give background task time to shut down
187        tokio::time::sleep(Duration::from_millis(10)).await;
188
189        // Test passes if we reach here without hanging
190    }
191
192    #[tokio::test]
193    async fn test_log_stream_multiple_streams_independence() {
194        let stream1 = LogStream::new("fixtures/simple_append.log", None)
195            .await
196            .unwrap();
197        let stream2 = LogStream::new("fixtures/simple_append.log", None)
198            .await
199            .unwrap();
200
201        assert!(!stream1.is_closed());
202        assert!(!stream2.is_closed());
203
204        // Drop first stream
205        drop(stream1);
206
207        // Second stream should still be functional
208        assert!(!stream2.is_closed());
209
210        drop(stream2);
211    }
212
213    #[tokio::test]
214    async fn test_log_stream_reading_existing_content() {
215        let mut stream = LogStream::new("fixtures/simple_append.log", None)
216            .await
217            .unwrap();
218
219        // Should immediately yield existing content
220        let items = collect_stream_items(&mut stream, 5, Duration::from_millis(100)).await;
221
222        assert!(!items.is_empty());
223        // Now we get Vec<String> items, so check the first Vec contains the expected content
224        assert!(items[0].len() > 0);
225        assert!(items[0][0].contains("Starting application"));
226    }
227
228    #[tokio::test]
229    async fn test_log_stream_with_custom_separator() {
230        let mut stream = LogStream::new("fixtures/different_separators.log", Some("|".to_string()))
231            .await
232            .unwrap();
233
234        let items = collect_stream_items(&mut stream, 3, Duration::from_millis(100)).await;
235
236        assert!(!items.is_empty());
237        // Should split by pipe character - now we get one Vec<String> with multiple parts
238        assert_eq!(items.len(), 1);
239        let lines = &items[0];
240        assert!(lines.len() > 1);
241    }
242
243    #[tokio::test]
244    async fn test_log_stream_empty_file() {
245        let mut stream = LogStream::new("fixtures/empty.log", None).await.unwrap();
246
247        // Empty file should not yield any items immediately
248        let items = collect_stream_items(&mut stream, 1, Duration::from_millis(50)).await;
249        assert!(items.is_empty());
250    }
251
252    #[tokio::test]
253    async fn test_file_reader_task_shutdown_signal() {
254        let file_path = PathBuf::from("fixtures/simple_append.log");
255        let separator = "\n".to_string();
256        let (tx, mut rx) = mpsc::unbounded_channel();
257        let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
258
259        // Start the task
260        let task_handle =
261            tokio::spawn(
262                async move { file_reader_task(file_path, separator, tx, shutdown_rx).await },
263            );
264
265        // Let it run briefly
266        tokio::time::sleep(Duration::from_millis(10)).await;
267
268        // Send shutdown signal
269        let _ = shutdown_tx.send(());
270
271        // Task should complete gracefully
272        let result = tokio::time::timeout(Duration::from_millis(100), task_handle).await;
273        assert!(result.is_ok());
274        assert!(result.unwrap().is_ok());
275
276        // Should have received some messages from reading existing content
277        let mut message_count = 0;
278        while rx.try_recv().is_ok() {
279            message_count += 1;
280        }
281        assert!(message_count > 0);
282    }
283
284    #[tokio::test]
285    async fn test_file_reader_task_error_handling() {
286        let file_path = PathBuf::from("/invalid/path/that/does/not/exist.log");
287        let separator = "\n".to_string();
288        let (tx, mut rx) = mpsc::unbounded_channel();
289        let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);
290
291        // Task should handle invalid paths gracefully
292        let result = file_reader_task(file_path, separator, tx, shutdown_rx).await;
293
294        // Task should complete without panicking
295        assert!(result.is_ok() || result.is_err());
296
297        // Check if any error messages were sent
298        while rx.try_recv().is_ok() {
299            // Consume any messages
300        }
301    }
302
303    // Helper function to collect stream items with timeout
304    async fn collect_stream_items(
305        stream: &mut LogStream,
306        max_items: usize,
307        timeout: Duration,
308    ) -> Vec<Vec<String>> {
309        let mut items = Vec::new();
310        let start = tokio::time::Instant::now();
311
312        while items.len() < max_items && start.elapsed() < timeout {
313            match tokio::time::timeout(Duration::from_millis(10), stream.next()).await {
314                Ok(Some(Ok(item))) => items.push(item),
315                Ok(Some(Err(_))) => break, // Error occurred
316                Ok(None) => break,         // Stream ended
317                Err(_) => break,           // Timeout
318            }
319        }
320
321        items
322    }
323}