microsandbox-utils 0.2.6

`microsandbox-utils` is a library containing general utilities for the microsandbox project.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
//! Log rotation implementation for the Microsandbox runtime.
//!
//! This module provides a rotating log implementation that automatically rotates log files
//! when they reach a specified size. The rotation process involves:
//! 1. Renaming the current log file to .old extension
//! 2. Creating a new empty log file
//! 3. Continuing writing to the new file
//!
//! The implementation is fully asynchronous and implements AsyncWrite.

use futures::future::BoxFuture;
use std::{
    io::{self, Write},
    path::{Path, PathBuf},
    pin::Pin,
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc,
    },
    task::{Context, Poll},
};
use tokio::{
    fs::{remove_file, rename, File, OpenOptions},
    io::{AsyncWrite, AsyncWriteExt},
    sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
    task::JoinHandle,
};

use crate::DEFAULT_LOG_MAX_SIZE;

//--------------------------------------------------------------------------------------------------
// Types
//--------------------------------------------------------------------------------------------------

/// A rotating log file that automatically rotates when reaching a maximum size.
///
/// The log rotation process preserves the last full log file with a ".old" extension
/// while continuing to write to a new log file with the original name.
///
/// # Example
///
/// ```no_run
/// use microsandbox_utils::log::RotatingLog;
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
///     let log = RotatingLog::new("app.log").await?; // 1MB max size
///     Ok(())
/// }
/// ```
pub struct RotatingLog {
    /// The current log file being written to
    file: File,

    /// Path to the current log file
    path: PathBuf,

    /// Maximum size in bytes before rotation
    max_size: u64,

    /// Current size of the log file (shared between sync and async paths)
    current_size: Arc<AtomicU64>,

    /// Current state of the log rotation
    state: State,

    /// Channel for sending data to sync writer
    tx: UnboundedSender<Vec<u8>>,

    /// Background task handle
    _background_task: JoinHandle<()>,
}

/// Internal state machine for managing log rotation
enum State {
    /// Normal operation, ready to accept writes
    Idle,

    /// Currently performing log rotation
    Rotating(RotationFuture),

    /// Currently writing data
    Writing,
}

/// A sync writer that sends all written data to a channel.
pub struct SyncChannelWriter {
    tx: UnboundedSender<Vec<u8>>,
}

type RotationFuture = BoxFuture<'static, io::Result<(File, PathBuf)>>;

//--------------------------------------------------------------------------------------------------
// Methods
//--------------------------------------------------------------------------------------------------

impl RotatingLog {
    /// Creates a new rotating log file with the default maximum size.
    ///
    /// This is a convenience wrapper around [`with_max_size`] that uses the default
    /// maximum log file size defined in `DEFAULT_LOG_MAX_SIZE`.
    ///
    /// ## Arguments
    ///
    /// * `path` - Path to the log file
    ///
    /// ## Errors
    ///
    /// Will return an error if:
    /// * The file cannot be created or opened
    /// * File metadata cannot be read
    pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
        Self::with_max_size(path, DEFAULT_LOG_MAX_SIZE).await
    }

    /// Creates a new rotating log file.
    ///
    /// ## Errors
    ///
    /// Will return an error if:
    /// * The file cannot be created or opened
    /// * File metadata cannot be read
    pub async fn with_max_size(path: impl AsRef<Path>, max_size: u64) -> io::Result<Self> {
        let path = path.as_ref().to_path_buf();
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&path)
            .await?;
        let metadata = file.metadata().await?;
        let (tx, rx) = mpsc::unbounded_channel();

        // Create shared atomic counter for current size
        let current_size = Arc::new(AtomicU64::new(metadata.len()));

        // Create a clone of the file and size counter for the background task
        let bg_file = file.try_clone().await?;
        let bg_path = path.clone();
        let bg_max_size = max_size;
        let bg_size = Arc::clone(&current_size);

        // Spawn background task to handle channel data
        let background_task = tokio::spawn(async move {
            handle_channel_data(rx, bg_file, bg_path, bg_max_size, bg_size).await
        });

        Ok(Self {
            file,
            path,
            max_size,
            current_size,
            state: State::Idle,
            tx,
            _background_task: background_task,
        })
    }

    /// Get a sync writer that implements std::io::Write
    pub fn get_sync_writer(&self) -> SyncChannelWriter {
        SyncChannelWriter::new(self.tx.clone())
    }
}

impl SyncChannelWriter {
    /// Creates a new `SyncChannelWriter` with the given channel sender.
    pub fn new(tx: UnboundedSender<Vec<u8>>) -> Self {
        Self { tx }
    }
}

//--------------------------------------------------------------------------------------------------
// Functions
//--------------------------------------------------------------------------------------------------

/// Performs the actual log rotation operation.
///
/// # Arguments
///
/// * `file` - The current log file to be rotated
/// * `path` - Path to the current log file
///
/// # Returns
///
/// Returns a tuple containing:
/// * The newly created log file
/// * The path to the new log file
///
/// # Errors
///
/// Will return an error if:
/// * File synchronization fails
/// * Old backup file cannot be removed
/// * File rename operation fails
/// * New log file cannot be created
async fn do_rotation(file: File, path: PathBuf) -> io::Result<(File, PathBuf)> {
    file.sync_all().await?;
    let backup_path = path.with_extension("old");
    if backup_path.exists() {
        remove_file(&backup_path).await?;
    }

    rename(&path, &backup_path).await?;

    let new_file = OpenOptions::new()
        .create(true)
        .append(true)
        .open(&path)
        .await?;

    Ok((new_file, path))
}

/// Background task that handles data from the sync channel
async fn handle_channel_data(
    mut rx: UnboundedReceiver<Vec<u8>>,
    mut file: File,
    path: PathBuf,
    max_size: u64,
    current_size: Arc<AtomicU64>,
) {
    while let Some(data) = rx.recv().await {
        let data_len = data.len() as u64;
        let size = current_size.fetch_add(data_len, Ordering::Relaxed);

        if size + data_len > max_size {
            // Clone the file handle before rotation
            if let Ok(file_clone) = file.try_clone().await {
                match do_rotation(file_clone, path.clone()).await {
                    Ok((new_file, _)) => {
                        file = new_file;
                        current_size.store(0, Ordering::Relaxed);
                    }
                    Err(e) => {
                        tracing::error!("failed to rotate log file: {}", e);
                        continue;
                    }
                }
            } else {
                tracing::error!("failed to clone file handle for rotation");
                continue;
            }
        }

        if let Err(e) = file.write_all(&data).await {
            tracing::error!("failed to write to log file: {}", e);
            // On write error, subtract the size we added
            current_size.fetch_sub(data_len, Ordering::Relaxed);
        }
    }
}

//--------------------------------------------------------------------------------------------------
// Trait Implementations
//--------------------------------------------------------------------------------------------------

impl AsyncWrite for RotatingLog {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        let this = &mut *self;
        let buf_len = buf.len() as u64;

        loop {
            match &mut this.state {
                State::Idle => {
                    let size = this.current_size.fetch_add(buf_len, Ordering::Relaxed);
                    if size + buf_len > this.max_size {
                        let old_file = std::mem::replace(
                            &mut this.file,
                            File::from_std(std::fs::File::open("/dev/null").unwrap()),
                        );
                        let old_path = this.path.clone();
                        let fut = Box::pin(do_rotation(old_file, old_path));
                        this.state = State::Rotating(fut);
                    } else {
                        this.state = State::Writing;
                    }
                }
                State::Rotating(fut) => {
                    match fut.as_mut().poll(cx) {
                        Poll::Pending => return Poll::Pending,
                        Poll::Ready(Err(e)) => {
                            this.state = State::Idle;
                            // On rotation error, subtract the size we added
                            this.current_size.fetch_sub(buf_len, Ordering::Relaxed);
                            return Poll::Ready(Err(e));
                        }
                        Poll::Ready(Ok((new_file, new_path))) => {
                            this.file = new_file;
                            this.path = new_path;
                            this.current_size.store(0, Ordering::Relaxed);
                            this.state = State::Writing;
                        }
                    }
                }
                State::Writing => {
                    let pinned_file = Pin::new(&mut this.file);
                    match pinned_file.poll_write(cx, buf) {
                        Poll::Ready(Ok(written)) => {
                            this.state = State::Idle;
                            return Poll::Ready(Ok(written));
                        }
                        Poll::Ready(Err(e)) => {
                            this.state = State::Idle;
                            // On write error, subtract the size we added
                            this.current_size.fetch_sub(buf_len, Ordering::Relaxed);
                            return Poll::Ready(Err(e));
                        }
                        Poll::Pending => return Poll::Pending,
                    }
                }
            }
        }
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.file).poll_flush(cx)
    }

    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.file).poll_shutdown(cx)
    }
}

impl Write for SyncChannelWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let data = buf.to_vec();
        self.tx.send(data).map_err(|_| {
            io::Error::new(io::ErrorKind::Other, "failed to send log data to channel")
        })?;
        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

//--------------------------------------------------------------------------------------------------
// Tests
//--------------------------------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use std::fs;
    use tempfile::tempdir;
    use tokio::io::AsyncWriteExt;

    #[tokio::test]
    async fn test_create_new_log() -> io::Result<()> {
        let dir = tempdir()?;
        let log_path = dir.path().join("test.log");

        let log = RotatingLog::with_max_size(&log_path, 1024).await?;
        assert!(log_path.exists());
        assert_eq!(log.max_size, 1024);
        assert_eq!(log.current_size.load(Ordering::Relaxed), 0);

        Ok(())
    }

    #[tokio::test]
    async fn test_write_to_log() -> io::Result<()> {
        let dir = tempdir()?;
        let log_path = dir.path().join("test.log");

        let mut log = RotatingLog::with_max_size(&log_path, 1024).await?;
        let test_data = b"test log entry\n";
        log.write_all(test_data).await?;
        log.flush().await?;

        let content = fs::read_to_string(&log_path)?;
        assert_eq!(content, String::from_utf8_lossy(test_data));
        assert_eq!(
            log.current_size.load(Ordering::Relaxed),
            test_data.len() as u64
        );

        Ok(())
    }

    #[tokio::test]
    async fn test_log_rotation() -> io::Result<()> {
        let dir = tempdir()?;
        let log_path = dir.path().join("test.log");
        let max_size = 20; // Small size to trigger rotation

        let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;

        // Write data until we trigger rotation
        let first_entry = b"first entry\n";
        log.write_all(first_entry).await?;
        log.flush().await?;

        let second_entry = b"second entry\n";
        log.write_all(second_entry).await?;
        log.flush().await?;

        // Give some time for rotation to complete
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

        // Check that both current and old log files exist
        assert!(log_path.exists());
        assert!(log_path.with_extension("old").exists());

        // Verify old file contains our first entry
        let old_content = fs::read_to_string(log_path.with_extension("old"))?;
        assert_eq!(old_content, String::from_utf8_lossy(first_entry));

        // Verify new file contains our second entry
        let new_content = fs::read_to_string(&log_path)?;
        assert_eq!(new_content, String::from_utf8_lossy(second_entry));

        Ok(())
    }

    #[tokio::test]
    async fn test_oversized_write() -> io::Result<()> {
        let dir = tempdir()?;
        let log_path = dir.path().join("test.log");
        let max_size = 10; // Small size

        let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;

        // Write data much larger than max_size
        let large_entry = b"this is a very large log entry that exceeds the maximum size\n";
        log.write_all(large_entry).await?;
        log.flush().await?;

        // Give some time for rotation to complete
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

        // Verify the content was written (even though it exceeds max_size)
        assert!(log_path.exists());
        let content = fs::read_to_string(&log_path)?;
        assert_eq!(content, String::from_utf8_lossy(large_entry));

        Ok(())
    }

    #[tokio::test]
    async fn test_sync_writer() -> io::Result<()> {
        let dir = tempdir()?;
        let log_path = dir.path().join("test.log");

        let log = RotatingLog::with_max_size(&log_path, 1024).await?;
        let mut sync_writer = log.get_sync_writer();

        let test_data = b"sync writer test\n";
        sync_writer.write_all(test_data)?;
        sync_writer.flush()?;

        // Give some time for async processing
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

        let content = fs::read_to_string(&log_path)?;
        assert_eq!(content, String::from_utf8_lossy(test_data));

        Ok(())
    }

    #[tokio::test]
    async fn test_multiple_rotations() -> io::Result<()> {
        let dir = tempdir()?;
        let log_path = dir.path().join("test.log");
        let max_size = 20;

        let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;

        // Perform multiple writes to trigger multiple rotations
        for i in 0..3 {
            let test_data = format!("rotation test {}\n", i).into_bytes();
            log.write_all(&test_data).await?;
            log.flush().await?;

            // Give time for rotation
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        }

        // Verify only one .old file exists (latest rotation)
        assert!(log_path.exists());
        assert!(log_path.with_extension("old").exists());

        Ok(())
    }
}