microsandbox_utils/log/
rotating.rs

1//! Log rotation implementation for the Microsandbox runtime.
2//!
3//! This module provides a rotating log implementation that automatically rotates log files
4//! when they reach a specified size. The rotation process involves:
5//! 1. Renaming the current log file to .old extension
6//! 2. Creating a new empty log file
7//! 3. Continuing writing to the new file
8//!
9//! The implementation is fully asynchronous and implements AsyncWrite.
10
11use futures::future::BoxFuture;
12use std::{
13    io::{self, Write},
14    path::{Path, PathBuf},
15    pin::Pin,
16    sync::{
17        atomic::{AtomicU64, Ordering},
18        Arc,
19    },
20    task::{Context, Poll},
21};
22use tokio::{
23    fs::{remove_file, rename, File, OpenOptions},
24    io::{AsyncWrite, AsyncWriteExt},
25    sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
26    task::JoinHandle,
27};
28
29use crate::DEFAULT_LOG_MAX_SIZE;
30
31//--------------------------------------------------------------------------------------------------
32// Types
33//--------------------------------------------------------------------------------------------------
34
35/// A rotating log file that automatically rotates when reaching a maximum size.
36///
37/// The log rotation process preserves the last full log file with a ".old" extension
38/// while continuing to write to a new log file with the original name.
39///
40/// # Example
41///
42/// ```no_run
43/// use microsandbox_utils::log::RotatingLog;
44///
45/// #[tokio::main]
46/// async fn main() -> std::io::Result<()> {
47///     let log = RotatingLog::new("app.log").await?; // 1MB max size
48///     Ok(())
49/// }
50/// ```
51pub struct RotatingLog {
52    /// The current log file being written to
53    file: File,
54
55    /// Path to the current log file
56    path: PathBuf,
57
58    /// Maximum size in bytes before rotation
59    max_size: u64,
60
61    /// Current size of the log file (shared between sync and async paths)
62    current_size: Arc<AtomicU64>,
63
64    /// Current state of the log rotation
65    state: State,
66
67    /// Channel for sending data to sync writer
68    tx: UnboundedSender<Vec<u8>>,
69
70    /// Background task handle
71    _background_task: JoinHandle<()>,
72}
73
74/// Internal state machine for managing log rotation
75enum State {
76    /// Normal operation, ready to accept writes
77    Idle,
78
79    /// Currently performing log rotation
80    Rotating(RotationFuture),
81
82    /// Currently writing data
83    Writing,
84}
85
86/// A sync writer that sends all written data to a channel.
87pub struct SyncChannelWriter {
88    tx: UnboundedSender<Vec<u8>>,
89}
90
91type RotationFuture = BoxFuture<'static, io::Result<(File, PathBuf)>>;
92
93//--------------------------------------------------------------------------------------------------
94// Methods
95//--------------------------------------------------------------------------------------------------
96
97impl RotatingLog {
98    /// Creates a new rotating log file with the default maximum size.
99    ///
100    /// This is a convenience wrapper around [`with_max_size`] that uses the default
101    /// maximum log file size defined in `DEFAULT_LOG_MAX_SIZE`.
102    ///
103    /// ## Arguments
104    ///
105    /// * `path` - Path to the log file
106    ///
107    /// ## Errors
108    ///
109    /// Will return an error if:
110    /// * The file cannot be created or opened
111    /// * File metadata cannot be read
112    pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
113        Self::with_max_size(path, DEFAULT_LOG_MAX_SIZE).await
114    }
115
116    /// Creates a new rotating log file.
117    ///
118    /// ## Errors
119    ///
120    /// Will return an error if:
121    /// * The file cannot be created or opened
122    /// * File metadata cannot be read
123    pub async fn with_max_size(path: impl AsRef<Path>, max_size: u64) -> io::Result<Self> {
124        let path = path.as_ref().to_path_buf();
125        let file = OpenOptions::new()
126            .create(true)
127            .append(true)
128            .open(&path)
129            .await?;
130        let metadata = file.metadata().await?;
131        let (tx, rx) = mpsc::unbounded_channel();
132
133        // Create shared atomic counter for current size
134        let current_size = Arc::new(AtomicU64::new(metadata.len()));
135
136        // Create a clone of the file and size counter for the background task
137        let bg_file = file.try_clone().await?;
138        let bg_path = path.clone();
139        let bg_max_size = max_size;
140        let bg_size = Arc::clone(&current_size);
141
142        // Spawn background task to handle channel data
143        let background_task = tokio::spawn(async move {
144            handle_channel_data(rx, bg_file, bg_path, bg_max_size, bg_size).await
145        });
146
147        Ok(Self {
148            file,
149            path,
150            max_size,
151            current_size,
152            state: State::Idle,
153            tx,
154            _background_task: background_task,
155        })
156    }
157
158    /// Get a sync writer that implements std::io::Write
159    pub fn get_sync_writer(&self) -> SyncChannelWriter {
160        SyncChannelWriter::new(self.tx.clone())
161    }
162}
163
164impl SyncChannelWriter {
165    /// Creates a new `SyncChannelWriter` with the given channel sender.
166    pub fn new(tx: UnboundedSender<Vec<u8>>) -> Self {
167        Self { tx }
168    }
169}
170
171//--------------------------------------------------------------------------------------------------
172// Functions
173//--------------------------------------------------------------------------------------------------
174
175/// Performs the actual log rotation operation.
176///
177/// # Arguments
178///
179/// * `file` - The current log file to be rotated
180/// * `path` - Path to the current log file
181///
182/// # Returns
183///
184/// Returns a tuple containing:
185/// * The newly created log file
186/// * The path to the new log file
187///
188/// # Errors
189///
190/// Will return an error if:
191/// * File synchronization fails
192/// * Old backup file cannot be removed
193/// * File rename operation fails
194/// * New log file cannot be created
195async fn do_rotation(file: File, path: PathBuf) -> io::Result<(File, PathBuf)> {
196    file.sync_all().await?;
197    let backup_path = path.with_extension("old");
198    if backup_path.exists() {
199        remove_file(&backup_path).await?;
200    }
201
202    rename(&path, &backup_path).await?;
203
204    let new_file = OpenOptions::new()
205        .create(true)
206        .append(true)
207        .open(&path)
208        .await?;
209
210    Ok((new_file, path))
211}
212
213/// Background task that handles data from the sync channel
214async fn handle_channel_data(
215    mut rx: UnboundedReceiver<Vec<u8>>,
216    mut file: File,
217    path: PathBuf,
218    max_size: u64,
219    current_size: Arc<AtomicU64>,
220) {
221    while let Some(data) = rx.recv().await {
222        let data_len = data.len() as u64;
223        let size = current_size.fetch_add(data_len, Ordering::Relaxed);
224
225        if size + data_len > max_size {
226            // Clone the file handle before rotation
227            if let Ok(file_clone) = file.try_clone().await {
228                match do_rotation(file_clone, path.clone()).await {
229                    Ok((new_file, _)) => {
230                        file = new_file;
231                        current_size.store(0, Ordering::Relaxed);
232                    }
233                    Err(e) => {
234                        tracing::error!("failed to rotate log file: {}", e);
235                        continue;
236                    }
237                }
238            } else {
239                tracing::error!("failed to clone file handle for rotation");
240                continue;
241            }
242        }
243
244        if let Err(e) = file.write_all(&data).await {
245            tracing::error!("failed to write to log file: {}", e);
246            // On write error, subtract the size we added
247            current_size.fetch_sub(data_len, Ordering::Relaxed);
248        }
249    }
250}
251
252//--------------------------------------------------------------------------------------------------
253// Trait Implementations
254//--------------------------------------------------------------------------------------------------
255
256impl AsyncWrite for RotatingLog {
257    fn poll_write(
258        mut self: Pin<&mut Self>,
259        cx: &mut Context<'_>,
260        buf: &[u8],
261    ) -> Poll<io::Result<usize>> {
262        let this = &mut *self;
263        let buf_len = buf.len() as u64;
264
265        loop {
266            match &mut this.state {
267                State::Idle => {
268                    let size = this.current_size.fetch_add(buf_len, Ordering::Relaxed);
269                    if size + buf_len > this.max_size {
270                        let old_file = std::mem::replace(
271                            &mut this.file,
272                            File::from_std(std::fs::File::open("/dev/null").unwrap()),
273                        );
274                        let old_path = this.path.clone();
275                        let fut = Box::pin(do_rotation(old_file, old_path));
276                        this.state = State::Rotating(fut);
277                    } else {
278                        this.state = State::Writing;
279                    }
280                }
281                State::Rotating(fut) => {
282                    match fut.as_mut().poll(cx) {
283                        Poll::Pending => return Poll::Pending,
284                        Poll::Ready(Err(e)) => {
285                            this.state = State::Idle;
286                            // On rotation error, subtract the size we added
287                            this.current_size.fetch_sub(buf_len, Ordering::Relaxed);
288                            return Poll::Ready(Err(e));
289                        }
290                        Poll::Ready(Ok((new_file, new_path))) => {
291                            this.file = new_file;
292                            this.path = new_path;
293                            this.current_size.store(0, Ordering::Relaxed);
294                            this.state = State::Writing;
295                        }
296                    }
297                }
298                State::Writing => {
299                    let pinned_file = Pin::new(&mut this.file);
300                    match pinned_file.poll_write(cx, buf) {
301                        Poll::Ready(Ok(written)) => {
302                            this.state = State::Idle;
303                            return Poll::Ready(Ok(written));
304                        }
305                        Poll::Ready(Err(e)) => {
306                            this.state = State::Idle;
307                            // On write error, subtract the size we added
308                            this.current_size.fetch_sub(buf_len, Ordering::Relaxed);
309                            return Poll::Ready(Err(e));
310                        }
311                        Poll::Pending => return Poll::Pending,
312                    }
313                }
314            }
315        }
316    }
317
318    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
319        Pin::new(&mut self.file).poll_flush(cx)
320    }
321
322    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
323        Pin::new(&mut self.file).poll_shutdown(cx)
324    }
325}
326
327impl Write for SyncChannelWriter {
328    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
329        let data = buf.to_vec();
330        self.tx.send(data).map_err(|_| {
331            io::Error::new(io::ErrorKind::Other, "failed to send log data to channel")
332        })?;
333        Ok(buf.len())
334    }
335
336    fn flush(&mut self) -> io::Result<()> {
337        Ok(())
338    }
339}
340
341//--------------------------------------------------------------------------------------------------
342// Tests
343//--------------------------------------------------------------------------------------------------
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use std::fs;
349    use tempfile::tempdir;
350    use tokio::io::AsyncWriteExt;
351
352    #[tokio::test]
353    async fn test_create_new_log() -> io::Result<()> {
354        let dir = tempdir()?;
355        let log_path = dir.path().join("test.log");
356
357        let log = RotatingLog::with_max_size(&log_path, 1024).await?;
358        assert!(log_path.exists());
359        assert_eq!(log.max_size, 1024);
360        assert_eq!(log.current_size.load(Ordering::Relaxed), 0);
361
362        Ok(())
363    }
364
365    #[tokio::test]
366    async fn test_write_to_log() -> io::Result<()> {
367        let dir = tempdir()?;
368        let log_path = dir.path().join("test.log");
369
370        let mut log = RotatingLog::with_max_size(&log_path, 1024).await?;
371        let test_data = b"test log entry\n";
372        log.write_all(test_data).await?;
373        log.flush().await?;
374
375        let content = fs::read_to_string(&log_path)?;
376        assert_eq!(content, String::from_utf8_lossy(test_data));
377        assert_eq!(
378            log.current_size.load(Ordering::Relaxed),
379            test_data.len() as u64
380        );
381
382        Ok(())
383    }
384
385    #[tokio::test]
386    async fn test_log_rotation() -> io::Result<()> {
387        let dir = tempdir()?;
388        let log_path = dir.path().join("test.log");
389        let max_size = 20; // Small size to trigger rotation
390
391        let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;
392
393        // Write data until we trigger rotation
394        let first_entry = b"first entry\n";
395        log.write_all(first_entry).await?;
396        log.flush().await?;
397
398        let second_entry = b"second entry\n";
399        log.write_all(second_entry).await?;
400        log.flush().await?;
401
402        // Give some time for rotation to complete
403        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
404
405        // Check that both current and old log files exist
406        assert!(log_path.exists());
407        assert!(log_path.with_extension("old").exists());
408
409        // Verify old file contains our first entry
410        let old_content = fs::read_to_string(log_path.with_extension("old"))?;
411        assert_eq!(old_content, String::from_utf8_lossy(first_entry));
412
413        // Verify new file contains our second entry
414        let new_content = fs::read_to_string(&log_path)?;
415        assert_eq!(new_content, String::from_utf8_lossy(second_entry));
416
417        Ok(())
418    }
419
420    #[tokio::test]
421    async fn test_oversized_write() -> io::Result<()> {
422        let dir = tempdir()?;
423        let log_path = dir.path().join("test.log");
424        let max_size = 10; // Small size
425
426        let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;
427
428        // Write data much larger than max_size
429        let large_entry = b"this is a very large log entry that exceeds the maximum size\n";
430        log.write_all(large_entry).await?;
431        log.flush().await?;
432
433        // Give some time for rotation to complete
434        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
435
436        // Verify the content was written (even though it exceeds max_size)
437        assert!(log_path.exists());
438        let content = fs::read_to_string(&log_path)?;
439        assert_eq!(content, String::from_utf8_lossy(large_entry));
440
441        Ok(())
442    }
443
444    #[tokio::test]
445    async fn test_sync_writer() -> io::Result<()> {
446        let dir = tempdir()?;
447        let log_path = dir.path().join("test.log");
448
449        let log = RotatingLog::with_max_size(&log_path, 1024).await?;
450        let mut sync_writer = log.get_sync_writer();
451
452        let test_data = b"sync writer test\n";
453        sync_writer.write_all(test_data)?;
454        sync_writer.flush()?;
455
456        // Give some time for async processing
457        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
458
459        let content = fs::read_to_string(&log_path)?;
460        assert_eq!(content, String::from_utf8_lossy(test_data));
461
462        Ok(())
463    }
464
465    #[tokio::test]
466    async fn test_multiple_rotations() -> io::Result<()> {
467        let dir = tempdir()?;
468        let log_path = dir.path().join("test.log");
469        let max_size = 20;
470
471        let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;
472
473        // Perform multiple writes to trigger multiple rotations
474        for i in 0..3 {
475            let test_data = format!("rotation test {}\n", i).into_bytes();
476            log.write_all(&test_data).await?;
477            log.flush().await?;
478
479            // Give time for rotation
480            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
481        }
482
483        // Verify only one .old file exists (latest rotation)
484        assert!(log_path.exists());
485        assert!(log_path.with_extension("old").exists());
486
487        Ok(())
488    }
489}