Skip to main content

obs_core/sink/
writer.rs

1//! `MakeWriter` and the writer family (Stdout/Stderr/LevelSplit/Tee/
2//! `RollingFile` / `NonBlocking`). Spec 20 §§ 3.3–3.5.
3//!
4//! `RollingFileWriter` deliberately uses `std::fs` synchronously
5//! because it implements `std::io::Write`; sinks run in the per-tier
6//! tokio worker that already owns the write call, so converting
7//! everything to `tokio::fs` would force a `block_on` round-trip per
8//! batch. The blanket `disallowed-{methods,types}` guidance is
9//! file-overridden here.
10#![allow(clippy::disallowed_types, clippy::disallowed_methods)]
11
12use std::{
13    io::{self, Write},
14    path::PathBuf,
15    sync::{
16        Arc,
17        atomic::{AtomicBool, AtomicU64, Ordering},
18        mpsc::{self, SyncSender, sync_channel},
19    },
20    thread::{self, JoinHandle},
21    time::{Duration, SystemTime, UNIX_EPOCH},
22};
23
24use obs_proto::obs::v1::Severity;
25use parking_lot::Mutex;
26
27/// A factory that yields one `io::Write` per batch. Cheap to call.
28/// Spec 20 § 3.3.
29pub trait MakeWriter: Send + Sync + 'static {
30    /// The writer type produced by [`Self::make_writer`]; usually
31    /// `Stdout`, `Stderr`, or a guard around a file handle.
32    type Writer: Write + Send + 'static;
33
34    /// Yield a writer for this batch.
35    fn make_writer(&self) -> Self::Writer;
36
37    /// Yield a severity-specific writer; defaults to
38    /// [`Self::make_writer`].
39    fn make_writer_for(&self, _sev: Severity) -> Self::Writer {
40        self.make_writer()
41    }
42}
43
44/// Writes to stdout.
45#[derive(Debug, Default, Clone, Copy)]
46pub struct StdoutWriter;
47
48impl MakeWriter for StdoutWriter {
49    type Writer = io::Stdout;
50    fn make_writer(&self) -> io::Stdout {
51        io::stdout()
52    }
53}
54
55/// Writes to stderr.
56#[derive(Debug, Default, Clone, Copy)]
57pub struct StderrWriter;
58
59impl MakeWriter for StderrWriter {
60    type Writer = io::Stderr;
61    fn make_writer(&self) -> io::Stderr {
62        io::stderr()
63    }
64}
65
66/// Composes two writers — INFO+ goes through `low`, WARN+ through
67/// `high`. The conventional shape for cargo binaries.
68#[derive(Debug, Clone)]
69pub struct LevelSplitWriter<L, H> {
70    low: L,
71    high: H,
72    threshold: Severity,
73}
74
75impl<L: MakeWriter, H: MakeWriter> LevelSplitWriter<L, H> {
76    /// New split writer with default `WARN` threshold.
77    #[must_use]
78    pub fn new(low: L, high: H) -> Self {
79        Self {
80            low,
81            high,
82            threshold: Severity::Warn,
83        }
84    }
85
86    /// Override the threshold.
87    #[must_use]
88    pub fn threshold(mut self, threshold: Severity) -> Self {
89        self.threshold = threshold;
90        self
91    }
92}
93
94/// Erased `Box<dyn Write + Send>` so `LevelSplitWriter::Writer` can
95/// be a single concrete type (the trait associated type is fixed,
96/// can't be `match`-ed at the type level).
97pub struct ErasedWriter(Box<dyn Write + Send + 'static>);
98
99impl ErasedWriter {
100    /// Construct an erased writer.
101    #[must_use]
102    pub fn new<W: Write + Send + 'static>(w: W) -> Self {
103        Self(Box::new(w))
104    }
105}
106
107impl std::fmt::Debug for ErasedWriter {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("ErasedWriter").finish_non_exhaustive()
110    }
111}
112
113impl Write for ErasedWriter {
114    fn write(&mut self, b: &[u8]) -> io::Result<usize> {
115        self.0.write(b)
116    }
117    fn flush(&mut self) -> io::Result<()> {
118        self.0.flush()
119    }
120}
121
122impl<L: MakeWriter, H: MakeWriter> MakeWriter for LevelSplitWriter<L, H> {
123    type Writer = ErasedWriter;
124
125    fn make_writer(&self) -> ErasedWriter {
126        ErasedWriter::new(self.low.make_writer())
127    }
128
129    fn make_writer_for(&self, sev: Severity) -> ErasedWriter {
130        if sev >= self.threshold {
131            ErasedWriter::new(self.high.make_writer_for(sev))
132        } else {
133            ErasedWriter::new(self.low.make_writer_for(sev))
134        }
135    }
136}
137
138/// Tee writer — writes to both branches.
139#[derive(Debug, Clone)]
140pub struct TeeWriter<A, B> {
141    a: A,
142    b: B,
143}
144
145impl<A: MakeWriter, B: MakeWriter> TeeWriter<A, B> {
146    /// New tee writer.
147    pub fn new(a: A, b: B) -> Self {
148        Self { a, b }
149    }
150}
151
152/// Concrete writer returned by `TeeWriter::make_writer` — writes
153/// every byte to both inner writers; if either errors the call
154/// returns the first error.
155pub struct TeeWriterImpl<WA: Write, WB: Write> {
156    a: WA,
157    b: WB,
158}
159
160impl<WA: Write, WB: Write> Write for TeeWriterImpl<WA, WB> {
161    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
162        self.a.write_all(buf)?;
163        self.b.write_all(buf)?;
164        Ok(buf.len())
165    }
166    fn flush(&mut self) -> io::Result<()> {
167        self.a.flush()?;
168        self.b.flush()
169    }
170}
171
172impl<WA: Write, WB: Write> std::fmt::Debug for TeeWriterImpl<WA, WB> {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct("TeeWriterImpl").finish_non_exhaustive()
175    }
176}
177
178impl<A: MakeWriter, B: MakeWriter> MakeWriter for TeeWriter<A, B> {
179    type Writer = TeeWriterImpl<A::Writer, B::Writer>;
180
181    fn make_writer(&self) -> Self::Writer {
182        TeeWriterImpl {
183            a: self.a.make_writer(),
184            b: self.b.make_writer(),
185        }
186    }
187}
188
189// ─── RollingFileWriter ────────────────────────────────────────────────
190
191/// Rolling policy. Spec 20 § 3.4.
192#[derive(Debug, Clone, Copy)]
193#[non_exhaustive]
194pub enum RollingPolicy {
195    /// Never rotate.
196    Never,
197    /// Rotate when file size hits `max_bytes`.
198    SizeBased {
199        /// Per-file size cap.
200        max_bytes: u64,
201    },
202    /// Rotate at midnight UTC.
203    Daily,
204    /// Rotate at the top of every hour UTC.
205    Hourly,
206    /// Rotate on size or age, whichever first.
207    SizeOrAge {
208        /// Per-file size cap.
209        max_bytes: u64,
210        /// Per-file age cap.
211        max_age: Duration,
212    },
213}
214
215/// Builder for [`RollingFileWriter`].
216#[derive(Debug, Clone)]
217pub struct RollingFileWriterBuilder {
218    directory: Option<PathBuf>,
219    prefix: Option<String>,
220    suffix: String,
221    policy: RollingPolicy,
222    keep: Option<usize>,
223}
224
225impl Default for RollingFileWriterBuilder {
226    fn default() -> Self {
227        Self {
228            directory: None,
229            prefix: None,
230            suffix: ".ndjson".to_string(),
231            policy: RollingPolicy::Daily,
232            keep: None,
233        }
234    }
235}
236
237impl RollingFileWriterBuilder {
238    /// Output directory (created if absent).
239    #[must_use]
240    pub fn directory(mut self, dir: impl Into<PathBuf>) -> Self {
241        self.directory = Some(dir.into());
242        self
243    }
244
245    /// Filename prefix (e.g. `obs`). Required.
246    #[must_use]
247    pub fn filename_prefix(mut self, p: impl Into<String>) -> Self {
248        self.prefix = Some(p.into());
249        self
250    }
251
252    /// Filename suffix (default `.ndjson`).
253    #[must_use]
254    pub fn filename_suffix(mut self, s: impl Into<String>) -> Self {
255        self.suffix = s.into();
256        self
257    }
258
259    /// Set the rolling policy.
260    #[must_use]
261    pub fn policy(mut self, p: RollingPolicy) -> Self {
262        self.policy = p;
263        self
264    }
265
266    /// Retain the last `n` rolled files. Older files are deleted at
267    /// rotation time.
268    #[must_use]
269    pub fn keep(mut self, n: usize) -> Self {
270        self.keep = Some(n);
271        self
272    }
273
274    /// Build the writer. Creates the directory if absent.
275    ///
276    /// # Errors
277    ///
278    /// Returns `io::Error` if the directory cannot be created or the
279    /// initial file cannot be opened.
280    pub fn build(self) -> io::Result<RollingFileWriter> {
281        let dir = self
282            .directory
283            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "directory is required"))?;
284        let prefix = self.prefix.unwrap_or_else(|| "obs".to_string());
285        std::fs::create_dir_all(&dir)?;
286        let inner = RollingInner {
287            directory: dir,
288            prefix,
289            suffix: self.suffix,
290            policy: self.policy,
291            keep: self.keep,
292            current: Mutex::new(None),
293        };
294        Ok(RollingFileWriter {
295            inner: Arc::new(inner),
296        })
297    }
298}
299
300/// Rolling file writer with size + time policies. Spec 20 § 3.4.
301#[derive(Clone)]
302pub struct RollingFileWriter {
303    inner: Arc<RollingInner>,
304}
305
306impl std::fmt::Debug for RollingFileWriter {
307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308        f.debug_struct("RollingFileWriter")
309            .field("directory", &self.inner.directory)
310            .field("prefix", &self.inner.prefix)
311            .field("policy", &self.inner.policy)
312            .finish()
313    }
314}
315
316struct RollingInner {
317    directory: PathBuf,
318    prefix: String,
319    suffix: String,
320    policy: RollingPolicy,
321    keep: Option<usize>,
322    current: Mutex<Option<RollingState>>,
323}
324
325struct RollingState {
326    file: std::fs::File,
327    bytes: u64,
328    opened_at: SystemTime,
329}
330
331impl RollingFileWriter {
332    /// Builder entry.
333    #[must_use]
334    pub fn builder() -> RollingFileWriterBuilder {
335        RollingFileWriterBuilder::default()
336    }
337}
338
339impl MakeWriter for RollingFileWriter {
340    type Writer = RollingFileHandle;
341    fn make_writer(&self) -> RollingFileHandle {
342        RollingFileHandle {
343            inner: Arc::clone(&self.inner),
344        }
345    }
346}
347
348/// Handle returned per-batch by `RollingFileWriter`. Each `write_all`
349/// rotates if the policy demands it.
350pub struct RollingFileHandle {
351    inner: Arc<RollingInner>,
352}
353
354impl std::fmt::Debug for RollingFileHandle {
355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356        f.debug_struct("RollingFileHandle").finish_non_exhaustive()
357    }
358}
359
360impl Write for RollingFileHandle {
361    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
362        self.inner.with_state(|state| state.file.write_all(buf))?;
363        self.inner.note_bytes(buf.len() as u64);
364        Ok(buf.len())
365    }
366    fn flush(&mut self) -> io::Result<()> {
367        // Spec 20 § 3.4 / spec 93 P2-3: pair `flush` with `sync_data`
368        // so a kill -9 between batches does not lose recent appends.
369        // The cost is one fsync per logical flush (one per batch),
370        // matching the AUDIT-spool default in P0-7.
371        self.inner.with_state(|state| {
372            state.file.flush()?;
373            state.file.sync_data()
374        })
375    }
376}
377
378impl RollingInner {
379    fn with_state<F, R>(&self, f: F) -> io::Result<R>
380    where
381        F: FnOnce(&mut RollingState) -> io::Result<R>,
382    {
383        let mut guard = self.current.lock();
384        if guard.is_none() {
385            *guard = Some(self.open_new()?);
386        }
387        let needs_rotate = match guard.as_ref() {
388            Some(state) => self.should_rotate(state),
389            None => false,
390        };
391        if needs_rotate {
392            *guard = Some(self.open_new()?);
393            self.maybe_evict_old();
394        }
395        let state = guard
396            .as_mut()
397            .ok_or_else(|| io::Error::other("rolling state missing after open"))?;
398        f(state)
399    }
400
401    fn note_bytes(&self, n: u64) {
402        if let Some(state) = self.current.lock().as_mut() {
403            state.bytes += n;
404        }
405    }
406
407    fn should_rotate(&self, state: &RollingState) -> bool {
408        match self.policy {
409            RollingPolicy::Never => false,
410            RollingPolicy::SizeBased { max_bytes } => state.bytes >= max_bytes,
411            RollingPolicy::Daily => {
412                let opened = state
413                    .opened_at
414                    .duration_since(UNIX_EPOCH)
415                    .map(|d| d.as_secs() / 86_400)
416                    .ok();
417                opened != now_unix_secs().map(|s| s / 86_400)
418            }
419            RollingPolicy::Hourly => {
420                let opened = state
421                    .opened_at
422                    .duration_since(UNIX_EPOCH)
423                    .map(|d| d.as_secs() / 3600)
424                    .ok();
425                opened != now_unix_secs().map(|s| s / 3600)
426            }
427            RollingPolicy::SizeOrAge { max_bytes, max_age } => {
428                if state.bytes >= max_bytes {
429                    return true;
430                }
431                state.opened_at.elapsed().unwrap_or_default() >= max_age
432            }
433        }
434    }
435
436    fn open_new(&self) -> io::Result<RollingState> {
437        // Spec 20 § 3.4 / spec 93 P2-3: filename layout follows the
438        // policy. Time-based rolls embed `YYYY-MM-DD[.HH]` so files
439        // sort chronologically; size-based rolls use a six-digit
440        // zero-padded counter so directories stay scannable. Falling
441        // back to `unix-counter` preserves the legacy behaviour for
442        // `Never` and `SizeOrAge` (where neither stamp suffices alone).
443        let stamp = match self.policy {
444            RollingPolicy::Daily => format_date_stamp(now_unix_secs().unwrap_or(0)),
445            RollingPolicy::Hourly => format_hour_stamp(now_unix_secs().unwrap_or(0)),
446            RollingPolicy::SizeBased { .. } => {
447                let counter = ROLL_COUNTER.fetch_add(1, Ordering::Relaxed);
448                format!("{counter:06}")
449            }
450            _ => {
451                let now = now_unix_secs().unwrap_or(0);
452                let counter = ROLL_COUNTER.fetch_add(1, Ordering::Relaxed);
453                format!("{now}-{counter}")
454            }
455        };
456        let filename = format!("{}.{stamp}{}", self.prefix, self.suffix);
457        let path = self.directory.join(&filename);
458        let file = std::fs::OpenOptions::new()
459            .create(true)
460            .append(true)
461            .open(&path)?;
462        // Spec 20 § 3.5: durably commit the directory entry so the new
463        // file survives a host crash that strands the inode in the
464        // page cache.
465        if let Ok(dir) = std::fs::File::open(&self.directory) {
466            let _ = dir.sync_all();
467        }
468        Ok(RollingState {
469            file,
470            bytes: 0,
471            opened_at: SystemTime::now(),
472        })
473    }
474
475    #[allow(dead_code)]
476    fn _ensure_helpers_used(&self) {
477        // Pin format_date_stamp / format_hour_stamp to silence dead-code
478        // warnings on platforms where the policy is never Daily/Hourly.
479        let _ = format_date_stamp;
480        let _ = format_hour_stamp;
481    }
482
483    fn maybe_evict_old(&self) {
484        let Some(keep) = self.keep else { return };
485        let Ok(read_dir) = std::fs::read_dir(&self.directory) else {
486            return;
487        };
488        let mut entries: Vec<_> = read_dir
489            .filter_map(Result::ok)
490            .filter(|e| {
491                e.file_name()
492                    .to_str()
493                    .is_some_and(|n| n.starts_with(&self.prefix) && n.ends_with(&self.suffix))
494            })
495            .collect();
496        entries.sort_by_key(|e| {
497            e.metadata()
498                .and_then(|m| m.modified())
499                .unwrap_or(SystemTime::UNIX_EPOCH)
500        });
501        if entries.len() > keep {
502            let extras = entries.len() - keep;
503            for entry in entries.into_iter().take(extras) {
504                let _ = std::fs::remove_file(entry.path());
505            }
506        }
507    }
508}
509
510static ROLL_COUNTER: AtomicU64 = AtomicU64::new(0);
511
512fn now_unix_secs() -> Option<u64> {
513    SystemTime::now()
514        .duration_since(UNIX_EPOCH)
515        .map(|d| d.as_secs())
516        .ok()
517}
518
519/// Format `secs_since_epoch` as `YYYY-MM-DD` in UTC. Used by daily
520/// rotation. Spec 20 § 3.4 / spec 93 P2-3.
521fn format_date_stamp(secs: u64) -> String {
522    let (y, m, d) = ymd_from_secs(secs);
523    format!("{y:04}-{m:02}-{d:02}")
524}
525
526/// Format `secs_since_epoch` as `YYYY-MM-DD.HH` in UTC. Used by hourly
527/// rotation.
528fn format_hour_stamp(secs: u64) -> String {
529    let (y, m, d) = ymd_from_secs(secs);
530    let h = (secs / 3600) % 24;
531    format!("{y:04}-{m:02}-{d:02}.{h:02}")
532}
533
534/// Convert UTC seconds-since-epoch to (year, month, day). Sentinel
535/// algorithm per Howard Hinnant's date library: avoids pulling in
536/// `chrono` for what is otherwise a single function.
537fn ymd_from_secs(secs: u64) -> (u32, u32, u32) {
538    let z = (secs / 86_400) as i64 + 719_468; // days since 0000-03-01
539    let era = z.div_euclid(146_097);
540    let doe = z - era * 146_097; // [0, 146096]
541    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; // [0, 399]
542    let y_internal = yoe + era * 400;
543    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); // [0, 365]
544    let mp = (5 * doy + 2) / 153; // [0, 11]
545    let d = (doy - (153 * mp + 2) / 5 + 1) as u32; // [1, 31]
546    let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32; // [1, 12]
547    let y = if m <= 2 { y_internal + 1 } else { y_internal } as u32;
548    (y, m, d)
549}
550
551// ─── NonBlockingWriter ────────────────────────────────────────────────
552
553const NON_BLOCKING_DEFAULT_CAPACITY: usize = 8192;
554
555/// Wraps a `MakeWriter` with a background thread + bounded
556/// `mpsc::SyncSender` channel. Overflow drops the line and increments
557/// the dropped counter. Spec 20 § 3.5.
558#[derive(Debug, Clone)]
559pub struct NonBlockingWriter {
560    sender: SyncSender<Vec<u8>>,
561    dropped: Arc<AtomicU64>,
562}
563
564/// Returned alongside `NonBlockingWriter`; flushes + joins the bg
565/// thread on drop.
566pub struct WorkerGuard {
567    shutdown: Arc<AtomicBool>,
568    join: Option<JoinHandle<()>>,
569}
570
571impl std::fmt::Debug for WorkerGuard {
572    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573        f.debug_struct("WorkerGuard").finish_non_exhaustive()
574    }
575}
576
577impl Drop for WorkerGuard {
578    fn drop(&mut self) {
579        self.shutdown.store(true, Ordering::SeqCst);
580        if let Some(j) = self.join.take() {
581            let _ = j.join();
582        }
583    }
584}
585
586impl NonBlockingWriter {
587    /// Wrap `inner` with a background thread. Returns the
588    /// non-blocking writer and a `WorkerGuard` whose `Drop` flushes +
589    /// joins.
590    pub fn new<M>(inner: M, capacity: usize) -> (Self, WorkerGuard)
591    where
592        M: MakeWriter,
593    {
594        let cap = if capacity == 0 {
595            NON_BLOCKING_DEFAULT_CAPACITY
596        } else {
597            capacity
598        };
599        let (tx, rx) = sync_channel::<Vec<u8>>(cap);
600        let dropped = Arc::new(AtomicU64::new(0));
601        let shutdown = Arc::new(AtomicBool::new(false));
602        let shutdown_in_thread = Arc::clone(&shutdown);
603        let inner = Arc::new(inner);
604        let join = thread::spawn(move || run_loop(inner, rx, shutdown_in_thread));
605        (
606            Self {
607                sender: tx,
608                dropped,
609            },
610            WorkerGuard {
611                shutdown,
612                join: Some(join),
613            },
614        )
615    }
616
617    /// Total bytes dropped due to channel pressure.
618    #[must_use]
619    pub fn dropped_total(&self) -> u64 {
620        self.dropped.load(Ordering::Relaxed)
621    }
622}
623
624fn run_loop<M: MakeWriter>(inner: Arc<M>, rx: mpsc::Receiver<Vec<u8>>, shutdown: Arc<AtomicBool>) {
625    while let Ok(buf) = rx.recv_timeout(Duration::from_millis(200)) {
626        let mut w = inner.make_writer();
627        let _ = w.write_all(&buf);
628        let _ = w.flush();
629        if shutdown.load(Ordering::Relaxed) {
630            break;
631        }
632    }
633    // Drain remaining queued buffers on shutdown.
634    while let Ok(buf) = rx.try_recv() {
635        let mut w = inner.make_writer();
636        let _ = w.write_all(&buf);
637        let _ = w.flush();
638    }
639}
640
641impl MakeWriter for NonBlockingWriter {
642    type Writer = NonBlockingHandle;
643    fn make_writer(&self) -> NonBlockingHandle {
644        NonBlockingHandle {
645            sender: self.sender.clone(),
646            dropped: Arc::clone(&self.dropped),
647            buf: Vec::with_capacity(256),
648        }
649    }
650}
651
652/// Per-batch handle. Buffers bytes, flushes to the bg sender on
653/// `flush()` / `Drop`.
654pub struct NonBlockingHandle {
655    sender: SyncSender<Vec<u8>>,
656    dropped: Arc<AtomicU64>,
657    buf: Vec<u8>,
658}
659
660impl std::fmt::Debug for NonBlockingHandle {
661    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
662        f.debug_struct("NonBlockingHandle")
663            .field("buffered", &self.buf.len())
664            .finish()
665    }
666}
667
668impl Write for NonBlockingHandle {
669    fn write(&mut self, b: &[u8]) -> io::Result<usize> {
670        self.buf.extend_from_slice(b);
671        Ok(b.len())
672    }
673
674    fn flush(&mut self) -> io::Result<()> {
675        if self.buf.is_empty() {
676            return Ok(());
677        }
678        let buf = std::mem::take(&mut self.buf);
679        match self.sender.try_send(buf) {
680            Ok(()) => Ok(()),
681            Err(mpsc::TrySendError::Full(_) | mpsc::TrySendError::Disconnected(_)) => {
682                self.dropped.fetch_add(1, Ordering::Relaxed);
683                Ok(())
684            }
685        }
686    }
687}
688
689impl Drop for NonBlockingHandle {
690    fn drop(&mut self) {
691        let _ = self.flush();
692    }
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698
699    #[test]
700    fn test_should_rotate_size_based() {
701        let dir = tempfile::tempdir().unwrap();
702        let writer = RollingFileWriter::builder()
703            .directory(dir.path())
704            .filename_prefix("test")
705            .policy(RollingPolicy::SizeBased { max_bytes: 16 })
706            .build()
707            .unwrap();
708        for _ in 0..5 {
709            let mut h = writer.make_writer();
710            h.write_all(b"hello world!").unwrap();
711            h.flush().unwrap();
712        }
713        let entries: Vec<_> = std::fs::read_dir(dir.path())
714            .unwrap()
715            .filter_map(Result::ok)
716            .collect();
717        assert!(
718            entries.len() >= 2,
719            "expected size-based rotation to produce >1 file"
720        );
721    }
722
723    #[test]
724    fn test_non_blocking_writer_should_flush_on_drop() {
725        let captured = Arc::new(parking_lot::Mutex::new(Vec::<u8>::new()));
726        struct FakeWriter(Arc<parking_lot::Mutex<Vec<u8>>>);
727        impl MakeWriter for FakeWriter {
728            type Writer = FakeHandle;
729            fn make_writer(&self) -> FakeHandle {
730                FakeHandle(Arc::clone(&self.0))
731            }
732        }
733        struct FakeHandle(Arc<parking_lot::Mutex<Vec<u8>>>);
734        impl Write for FakeHandle {
735            fn write(&mut self, b: &[u8]) -> io::Result<usize> {
736                self.0.lock().extend_from_slice(b);
737                Ok(b.len())
738            }
739            fn flush(&mut self) -> io::Result<()> {
740                Ok(())
741            }
742        }
743
744        let (writer, _guard) = NonBlockingWriter::new(FakeWriter(Arc::clone(&captured)), 16);
745        {
746            let mut h = writer.make_writer();
747            h.write_all(b"hello\n").unwrap();
748            h.flush().unwrap();
749        }
750        // Allow the bg thread to drain.
751        std::thread::sleep(Duration::from_millis(50));
752        assert!(captured.lock().starts_with(b"hello\n"));
753    }
754}