Skip to main content

emit_file/
lib.rs

1/*!
2Emit diagnostic events to rolling files.
3
4All file IO is performed on batches in a dedicated background thread.
5
6This library writes newline delimited JSON by default, like:
7
8```text
9{"ts_start":"2024-05-29T03:35:13.922768000Z","ts":"2024-05-29T03:35:13.943506000Z","module":"my_app","msg":"in_ctxt failed with `a` is odd","tpl":"in_ctxt failed with `err`","a":1,"err":"`a` is odd","lvl":"warn","span_id":"0a3686d1b788b277","span_parent":"1a50b58f2ef93f3b","trace_id":"8dd5d1f11af6ba1db4124072024933cb"}
10```
11
12# Getting started
13
14Add `emit` and `emit_file` to your `Cargo.toml`:
15
16```toml
17[dependencies.emit]
18version = "1.17.2"
19
20[dependencies.emit_file]
21version = "1.17.2"
22```
23
24Initialize `emit` using a rolling file set:
25
26```
27fn main() {
28    let rt = emit::setup()
29        .emit_to(emit_file::set("./target/logs/my_app.txt").spawn())
30        .init();
31
32    // Your app code goes here
33
34    rt.blocking_flush(std::time::Duration::from_secs(30));
35}
36```
37
38The input to [`set`] is a template for log file naming. The example earlier used `./target/logs/my_app.txt`. From this template, log files will be written to `./target/logs`, each log file name will start with `my_app`, and use `.txt` as its extension.
39
40# File naming
41
42Log files are created using the following naming scheme:
43
44```text
45{prefix}.{date}.{counter}.{id}.{ext}
46```
47
48where:
49
50- `prefix`: A user-defined name that groups all log files related to the same application together.
51- `date`: The rollover interval the file was created in. This isn't necessarily related to the timestamps of events within the file.
52- `counter`: The number of milliseconds since the start of the current rollover interval when the file was created.
53- `id`: A unique identifier for the file in the interval.
54- `ext`: A user-defined file extension.
55
56In the following log file:
57
58```text
59log.2024-05-27-03-00.00012557.37c57fa1.txt
60```
61
62the parts are:
63
64- `prefix`: `log`.
65- `date`: `2024-05-27-03-00`.
66- `counter`: `00012557`.
67- `id`: `37c57fa1`.
68- `ext`: `txt`.
69
70# When files roll
71
72Diagnostic events are only ever written to a single file at a time. That file changes when:
73
741. The application restarts and [`FileSetBuilder::reuse_files`] is false.
752. The rollover period changes. This is set by [`FileSetBuilder::roll_by_day`], [`FileSetBuilder::roll_by_hour`], and [`FileSetBuilder::roll_by_minute`].
763. The size of the file exceeds [`FileSetBuilder::max_file_size_bytes`].
774. Writing to the file fails.
78
79# Durability
80
81Diagnostic events are written to files in asynchronous batches. Under normal operation, after a call to [`emit::Emitter::blocking_flush`], all events emitted before the call are guaranteed to be written and synced via Rust's [`std::fs::File::sync_all`] method. This is usually enough to guarantee durability.
82
83# Handling IO failures
84
85If writing a batch fails while attempting to write to a file then the file being written to is considered poisoned and no future attempts will be made to write to it. The batch will instead be retried on a new file. Batches that fail attempting to sync are not retried. Since batches don't have explicit transactions, it's possible on failure for part or all of the failed batch to actually be present in the original file. That means diagnostic events may be duplicated in the case of an IO error while writing them.
86
87# Troubleshooting
88
89If you're not seeing diagnostics appear in files as expected, you can rule out configuration issues in `emit_file` by configuring `emit`'s internal logger, and collect metrics from it:
90
91```
92# mod emit_term {
93#     pub fn stdout() -> impl emit::runtime::InternalEmitter + Send + Sync + 'static {
94#        emit::runtime::AssertInternal(emit::emitter::from_fn(|_| {}))
95#     }
96# }
97use emit::metric::Source;
98
99fn main() {
100    // 1. Initialize the internal logger
101    //    Diagnostics produced by `emit_file` itself will go here
102    let internal = emit::setup()
103        .emit_to(emit_term::stdout())
104        .init_internal();
105
106    let mut reporter = emit::metric::Reporter::new();
107
108    let rt = emit::setup()
109        .emit_to({
110            let files = emit_file::set("./target/logs/my_app.txt").spawn();
111
112            // 2. Add `emit_file`'s metrics to a reporter so we can see what it's up to
113            //    You can do this independently of the internal emitter
114            reporter.add_source(files.metric_source());
115
116            files
117        })
118        .init();
119
120    // Your app code goes here
121
122    rt.blocking_flush(std::time::Duration::from_secs(30));
123
124    // 3. Report metrics after attempting to flush
125    //    You could also do this periodically as your application runs
126    reporter.emit_metrics(&internal.emitter());
127}
128```
129
130Diagnostics include when batches are written, and any failures observed along the way.
131*/
132
133#![doc(html_logo_url = "https://raw.githubusercontent.com/emit-rs/emit/main/asset/logo.svg")]
134#![deny(missing_docs)]
135
136mod internal_metrics;
137
138use std::{
139    fmt,
140    io::{self, Write},
141    mem,
142    path::{Path, PathBuf},
143    sync::Arc,
144    thread,
145};
146
147use emit::{
148    clock::{Clock, ErasedClock},
149    platform::{rand_rng::RandRng, system_clock::SystemClock},
150    rng::{ErasedRng, Rng},
151};
152use emit_batcher::BatchError;
153use internal_metrics::InternalMetrics;
154
155const DEFAULT_ROLL_BY: RollBy = RollBy::Hour;
156const DEFAULT_MAX_FILES: usize = 32;
157const DEFAULT_MAX_FILE_SIZE_BYTES: usize = 1024 * 1024 * 1024; // 1GiB
158const DEFAULT_REUSE_FILES: bool = false;
159
160pub use internal_metrics::*;
161
162/**
163An error attempting to create a [`FileSet`].
164*/
165pub struct Error(Box<dyn std::error::Error + Send + Sync>);
166
167impl Error {
168    fn new(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
169        Error(e.into())
170    }
171}
172
173impl fmt::Debug for Error {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        fmt::Debug::fmt(&self.0, f)
176    }
177}
178
179impl fmt::Display for Error {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        fmt::Display::fmt(&self.0, f)
182    }
183}
184
185impl std::error::Error for Error {
186    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
187        self.0.source()
188    }
189}
190
191/**
192Create a builder for a [`FileSet`] using the default newline-delimited JSON format.
193
194The builder will use `file_set` as its template for naming log files. See the crate root documentation for details on how this argument is interpreted.
195
196It will use the other following defaults:
197
198- Roll by hour.
199- 32 max files.
200- 1GiB max file size.
201
202The [`FileSetBuilder`] has configuration options for managing the number and size of log files.
203
204Once configured, call [`FileSetBuilder::spawn`] to complete the builder, passing the resulting [`FileSet`] to [`emit::Setup::emit_to`].
205*/
206#[cfg(feature = "default_writer")]
207pub fn set(file_set: impl AsRef<Path>) -> FileSetBuilder {
208    FileSetBuilder::new(file_set.as_ref())
209}
210
211/**
212Create a builder for a [`FileSet`].
213
214The builder will use `file_set` as its template for naming log files. See the crate root documentation for details on how this argument is interpreted.
215
216The `writer` is used to format incoming [`emit::Event`]s into their on-disk format. If formatting fails then the event will be discarded.
217
218The `writer` may finish each event with the separator. If it doesn't, then it will be added automatically.
219*/
220pub fn set_with_writer(
221    file_set: impl AsRef<Path>,
222    writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
223        + Send
224        + Sync
225        + 'static,
226    separator: &'static [u8],
227) -> FileSetBuilder {
228    FileSetBuilder::new_with_writer(file_set.as_ref(), writer, separator)
229}
230
231/**
232A builder for a [`FileSet`].
233
234Use [`set`] or [`set_with_writer`] to begin a [`FileSetBuilder`].
235
236The [`FileSetBuilder`] has configuration options for managing the number and size of log files.
237
238Once configured, call [`FileSetBuilder::spawn`] to complete the builder, passing the resulting [`FileSet`] to [`emit::Setup::emit_to`].
239*/
240pub struct FileSetBuilder {
241    file_set: PathBuf,
242    roll_by: RollBy,
243    max_files: usize,
244    max_file_size_bytes: usize,
245    reuse_files: bool,
246    writer: Box<
247        dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
248            + Send
249            + Sync,
250    >,
251    separator: &'static [u8],
252}
253
254#[derive(Debug, Clone, Copy)]
255enum RollBy {
256    Day,
257    Hour,
258    Minute,
259}
260
261impl FileSetBuilder {
262    /**
263    Create a new [`FileSetBuilder`] using the default newline-delimited JSON format.
264
265    The builder will use `file_set` as its template for naming log files. See the crate root documentation for details on how this argument is interpreted.
266
267    It will use the other following defaults:
268
269    - Roll by hour.
270    - 32 max files.
271    - 1GiB max file size.
272    */
273    #[cfg(feature = "default_writer")]
274    pub fn new(file_set: impl Into<PathBuf>) -> Self {
275        Self::new_with_writer(file_set, default_writer, b"\n")
276    }
277
278    /**
279    Create a builder for a [`FileSet`].
280
281    The builder will use `file_set` as its template for naming log files. See the crate root documentation for details on how this argument is interpreted.
282
283    The `writer` is used to format incoming [`emit::Event`]s into their on-disk format. If formatting fails then the event will be discarded.
284
285    The `writer` may finish each event with the separator. If it doesn't, then it will be added automatically.
286
287    It will use the other following defaults:
288
289    - Roll by hour.
290    - 32 max files.
291    - 1GiB max file size.
292    */
293    pub fn new_with_writer(
294        file_set: impl Into<PathBuf>,
295        writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
296            + Send
297            + Sync
298            + 'static,
299        separator: &'static [u8],
300    ) -> Self {
301        FileSetBuilder {
302            file_set: file_set.into(),
303            roll_by: DEFAULT_ROLL_BY,
304            max_files: DEFAULT_MAX_FILES,
305            max_file_size_bytes: DEFAULT_MAX_FILE_SIZE_BYTES,
306            reuse_files: DEFAULT_REUSE_FILES,
307            writer: Box::new(writer),
308            separator,
309        }
310    }
311
312    /**
313    Create rolling log files based on the calendar day of when they're written to.
314    */
315    pub fn roll_by_day(mut self) -> Self {
316        self.roll_by = RollBy::Day;
317        self
318    }
319
320    /**
321    Create rolling log files based on the calendar day and hour of when they're written to.
322    */
323    pub fn roll_by_hour(mut self) -> Self {
324        self.roll_by = RollBy::Hour;
325        self
326    }
327
328    /**
329    Create rolling log files based on the calendar day, hour, and minute of when they're written to.
330    */
331    pub fn roll_by_minute(mut self) -> Self {
332        self.roll_by = RollBy::Minute;
333        self
334    }
335
336    /**
337    The maximum number of log files to keep.
338
339    Files are deleted from oldest first whenever a new file is created. Older files are determined based on the time period they belong to.
340    */
341    pub fn max_files(mut self, max_files: usize) -> Self {
342        self.max_files = max_files;
343        self
344    }
345
346    /**
347    The maximum size of a file before new writes will roll over to a new one.
348
349    The same time period can contain multiple log files. More recently created log files will sort ahead of older ones.
350    */
351    pub fn max_file_size_bytes(mut self, max_file_size_bytes: usize) -> Self {
352        self.max_file_size_bytes = max_file_size_bytes;
353        self
354    }
355
356    /**
357    Whether to re-use log files when first attempting to write to them.
358
359    This method can be used for applications that are started a lot and may result in lots of active log files.
360
361    Before writing new events to the log file, it will have the configured separator defensively written to it. This ensures any previous partial write doesn't corrupt any new writes.
362    */
363    pub fn reuse_files(mut self, reuse_files: bool) -> Self {
364        self.reuse_files = reuse_files;
365        self
366    }
367
368    /**
369    Specify a writer for incoming [`emit::Event`]s.
370
371    The `writer` is used to format incoming [`emit::Event`]s into their on-disk format. If formatting fails then the event will be discarded.
372
373    The `writer` may finish each event with the separator. If it doesn't, then it will be added automatically.
374    */
375    pub fn writer(
376        mut self,
377        writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
378            + Send
379            + Sync
380            + 'static,
381        separator: &'static [u8],
382    ) -> Self {
383        self.writer = Box::new(writer);
384        self.separator = separator;
385        self
386    }
387
388    /**
389    Complete the builder, returning a [`FileSet`] to pass to [`emit::Setup::emit_to`].
390
391    If the file set configuration is invalid this method won't fail or panic, it will discard any events emitted to it. In these cases it will log to [`emit::runtime::internal`] and increment the `configuration_failed` metric on [`FileSet::metric_source`]. See the _Troubleshooting_ section of the crate root docs for more details.
392    */
393    pub fn spawn(self) -> FileSet {
394        let metrics = Arc::new(InternalMetrics::default());
395
396        let inner = match self.spawn_inner(metrics.clone()) {
397            Ok(inner) => Some(inner),
398            Err(err) => {
399                emit::error!(
400                    rt: emit::runtime::internal(),
401                    "file set configuration is invalid; no events will be written: {err}"
402                );
403
404                metrics.configuration_failed.increment();
405
406                None
407            }
408        };
409
410        FileSet { metrics, inner }
411    }
412
413    fn spawn_inner(self, metrics: Arc<InternalMetrics>) -> Result<FileSetInner, Error> {
414        let (dir, file_prefix, file_ext) = dir_prefix_ext(self.file_set).map_err(Error::new)?;
415
416        let mut worker = Worker::new(
417            metrics.clone(),
418            StdFilesystem::new(),
419            SystemClock::new(),
420            RandRng::new(),
421            dir,
422            file_prefix,
423            file_ext,
424            self.roll_by,
425            self.reuse_files,
426            self.max_files,
427            self.max_file_size_bytes,
428            self.separator,
429        );
430
431        let (sender, receiver) = emit_batcher::bounded(10_000);
432
433        let handle = emit_batcher::sync::spawn("emit_file_worker", receiver, move |batch| {
434            worker.on_batch(batch)
435        })
436        .map_err(Error::new)?;
437
438        Ok(FileSetInner {
439            sender,
440            metrics,
441            writer: self.writer,
442            separator: self.separator,
443            _handle: handle,
444        })
445    }
446}
447
448/**
449A handle to an asynchronous, background, rolling file writer.
450
451Create a file set through the [`set`] function, calling [`FileSetBuilder::spawn`] to complete configuration. Pass the resulting [`FileSet`] to [`emit::Setup::emit_to`] to configure `emit` to write diagnostic events through it.
452*/
453pub struct FileSet {
454    inner: Option<FileSetInner>,
455    metrics: Arc<InternalMetrics>,
456}
457
458struct FileSetInner {
459    sender: emit_batcher::Sender<EventBatch>,
460    metrics: Arc<InternalMetrics>,
461    writer: Box<
462        dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
463            + Send
464            + Sync,
465    >,
466    separator: &'static [u8],
467    _handle: thread::JoinHandle<()>,
468}
469
470impl emit::Emitter for FileSet {
471    fn emit<E: emit::event::ToEvent>(&self, evt: E) {
472        self.inner.emit(evt)
473    }
474
475    fn blocking_flush(&self, timeout: std::time::Duration) -> bool {
476        self.inner.blocking_flush(timeout)
477    }
478}
479
480impl emit::Emitter for FileSetInner {
481    fn emit<E: emit::event::ToEvent>(&self, evt: E) {
482        let evt = evt.to_event();
483
484        // NOTE: We could use a rolling capacity to pre-allocate this if we want
485        let mut buf = FileBuf::new();
486
487        match (self.writer)(&mut buf, &evt.erase()) {
488            Ok(()) => {
489                // If the buffer didn't finish with the configured separator
490                // then write it now
491                if !buf.0.ends_with(self.separator) {
492                    buf.extend_from_slice(self.separator);
493                }
494
495                self.sender.send(buf.into_boxed_slice());
496            }
497            Err(err) => {
498                self.metrics.event_format_failed.increment();
499
500                emit::warn!(
501                    rt: emit::runtime::internal(),
502                    "failed to format file event payload: {err}",
503                )
504            }
505        };
506    }
507
508    fn blocking_flush(&self, timeout: std::time::Duration) -> bool {
509        emit_batcher::blocking_flush(&self.sender, timeout)
510    }
511}
512
513impl FileSet {
514    /**
515    Get an [`emit::metric::Source`] for instrumentation produced by the file set.
516
517    These metrics can be used to monitor the running health of your diagnostic pipeline.
518    */
519    pub fn metric_source(&self) -> FileSetMetrics {
520        FileSetMetrics {
521            channel_metrics: self
522                .inner
523                .as_ref()
524                .map(|inner| inner.sender.metric_source()),
525            metrics: self.metrics.clone(),
526        }
527    }
528}
529
530/**
531A buffer to format [`emit::Event`]s into before writing them to a file.
532*/
533pub struct FileBuf(Vec<u8>);
534
535impl FileBuf {
536    fn new() -> Self {
537        FileBuf(Vec::new())
538    }
539
540    /**
541    Push a byte onto the end of the buffer.
542    */
543    pub fn push(&mut self, byte: u8) {
544        self.0.push(byte)
545    }
546
547    /**
548    Push a slice of bytes onto the end of the buffer.
549    */
550    pub fn extend_from_slice(&mut self, bytes: &[u8]) {
551        self.0.extend_from_slice(bytes)
552    }
553
554    fn into_boxed_slice(self) -> Box<[u8]> {
555        self.0.into_boxed_slice()
556    }
557}
558
559impl io::Write for FileBuf {
560    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
561        self.0.write(buf)
562    }
563
564    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
565        self.0.write_all(buf)
566    }
567
568    fn flush(&mut self) -> io::Result<()> {
569        self.0.flush()
570    }
571}
572
573#[cfg(feature = "default_writer")]
574fn default_writer(
575    buf: &mut FileBuf,
576    evt: &emit::Event<&dyn emit::props::ErasedProps>,
577) -> io::Result<()> {
578    use std::ops::ControlFlow;
579
580    use emit::{
581        well_known::{KEY_MDL, KEY_MSG, KEY_TPL, KEY_TS, KEY_TS_START},
582        Props as _,
583    };
584
585    struct EventValue<'a, P>(&'a emit::Event<'a, P>);
586
587    impl<'a, P: emit::Props> sval::Value for EventValue<'a, P> {
588        fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(
589            &'sval self,
590            stream: &mut S,
591        ) -> sval::Result {
592            stream.record_begin(None, None, None, None)?;
593
594            if let Some(extent) = self.0.extent() {
595                if let Some(range) = extent.as_range() {
596                    stream.record_value_begin(None, &sval::Label::new(KEY_TS_START))?;
597                    sval::stream_display(&mut *stream, &range.start)?;
598                    stream.record_value_end(None, &sval::Label::new(KEY_TS_START))?;
599                }
600
601                stream.record_value_begin(None, &sval::Label::new(KEY_TS))?;
602                sval::stream_display(&mut *stream, extent.as_point())?;
603                stream.record_value_end(None, &sval::Label::new(KEY_TS))?;
604            }
605
606            stream.record_value_begin(None, &sval::Label::new(KEY_MDL))?;
607            sval::stream_display(&mut *stream, self.0.mdl())?;
608            stream.record_value_end(None, &sval::Label::new(KEY_MDL))?;
609
610            stream.record_value_begin(None, &sval::Label::new(KEY_MSG))?;
611            sval::stream_display(&mut *stream, self.0.msg())?;
612            stream.record_value_end(None, &sval::Label::new(KEY_MSG))?;
613
614            stream.record_value_begin(None, &sval::Label::new(KEY_TPL))?;
615            sval::stream_display(&mut *stream, self.0.tpl())?;
616            stream.record_value_end(None, &sval::Label::new(KEY_TPL))?;
617
618            let _ = self.0.props().dedup().for_each(|k, v| {
619                match (|| {
620                    stream.record_value_begin(None, &sval::Label::new_computed(k.get()))?;
621                    stream.value_computed(&v)?;
622                    stream.record_value_end(None, &sval::Label::new_computed(k.get()))?;
623
624                    Ok::<(), sval::Error>(())
625                })() {
626                    Ok(()) => ControlFlow::Continue(()),
627                    Err(_) => ControlFlow::Break(()),
628                }
629            });
630
631            stream.record_end(None, None, None)
632        }
633    }
634
635    sval_json::stream_to_io_write(buf, EventValue(evt))
636        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
637
638    Ok(())
639}
640
641struct EventBatch {
642    bufs: Vec<Box<[u8]>>,
643    remaining_bytes: usize,
644    index: usize,
645}
646
647impl EventBatch {
648    fn new() -> Self {
649        EventBatch {
650            bufs: Vec::new(),
651            remaining_bytes: 0,
652            index: 0,
653        }
654    }
655
656    fn push(&mut self, buf: impl Into<Box<[u8]>>) {
657        let item = buf.into();
658
659        self.remaining_bytes += item.len();
660        self.bufs.push(item);
661    }
662}
663
664impl emit_batcher::Channel for EventBatch {
665    type Item = Box<[u8]>;
666
667    fn new() -> Self {
668        EventBatch::new()
669    }
670
671    fn push<'a>(&mut self, item: Self::Item) {
672        self.push(item)
673    }
674
675    fn len(&self) -> usize {
676        self.bufs.len() - self.index
677    }
678
679    fn clear(&mut self) {
680        self.bufs.clear()
681    }
682}
683
684impl EventBatch {
685    fn current(&self) -> Option<&[u8]> {
686        self.bufs.get(self.index).map(|buf| &**buf)
687    }
688
689    fn advance(&mut self) {
690        let advanced = mem::take(&mut self.bufs[self.index]);
691
692        self.index += 1;
693        self.remaining_bytes -= advanced.len();
694    }
695}
696
697struct Worker {
698    metrics: Arc<InternalMetrics>,
699    clock: Box<dyn ErasedClock + Send + Sync>,
700    rng: Box<dyn ErasedRng + Send + Sync>,
701    fs: Box<dyn Filesystem + Send + Sync>,
702    active_file: Option<ActiveFile>,
703    roll_by: RollBy,
704    max_files: usize,
705    max_file_size_bytes: usize,
706    reuse_files: bool,
707    dir: String,
708    file_prefix: String,
709    file_ext: String,
710    separator: &'static [u8],
711}
712
713impl Worker {
714    fn new(
715        metrics: Arc<InternalMetrics>,
716        fs: impl Filesystem + Send + Sync + 'static,
717        clock: impl Clock + Send + Sync + 'static,
718        rng: impl Rng + Send + Sync + 'static,
719        dir: String,
720        file_prefix: String,
721        file_ext: String,
722        roll_by: RollBy,
723        reuse_files: bool,
724        max_files: usize,
725        max_file_size_bytes: usize,
726        separator: &'static [u8],
727    ) -> Self {
728        Worker {
729            metrics,
730            fs: Box::new(fs),
731            clock: Box::new(clock),
732            rng: Box::new(rng),
733            active_file: None,
734            roll_by,
735            max_files,
736            max_file_size_bytes,
737            reuse_files,
738            dir,
739            file_prefix,
740            file_ext,
741            separator,
742        }
743    }
744
745    #[emit::span(rt: emit::runtime::internal(), guard: span, "write file batch")]
746    fn on_batch(&mut self, mut batch: EventBatch) -> Result<(), BatchError<EventBatch>> {
747        let ts = self.clock.now().unwrap();
748        let parts = ts.to_parts();
749
750        let file_ts = file_ts(self.roll_by, parts);
751
752        let mut file = self.active_file.take();
753        let mut file_set = ActiveFileSet::empty(&self.metrics, &self.dir);
754
755        if file.is_none() {
756            if let Err(err) = self.fs.create_dir_all(Path::new(&self.dir)) {
757                span.complete_with(emit::span::completion::from_fn(|span| {
758                    emit::warn!(
759                        rt: emit::runtime::internal(),
760                        extent: span.extent(),
761                        props: span.props(),
762                        "failed to create root directory {path}: {err}",
763                        #[emit::as_debug]
764                        path: &self.dir,
765                        err,
766                    )
767                }));
768
769                return Err(emit_batcher::BatchError::retry(err, batch));
770            }
771
772            let _ = file_set
773                .read(&self.fs, &self.file_prefix, &self.file_ext)
774                .map_err(|err| {
775                    self.metrics.file_set_read_failed.increment();
776
777                    emit::warn!(
778                        rt: emit::runtime::internal(),
779                        "failed to files in read {path}: {err}",
780                        #[emit::as_debug]
781                        path: &file_set.dir,
782                        err,
783                    );
784
785                    err
786                });
787
788            if self.reuse_files {
789                if let Some(file_name) = file_set.current_file_name() {
790                    let mut path = PathBuf::from(&self.dir);
791                    path.push(file_name);
792
793                    file = ActiveFile::try_open_reuse(&self.fs, &path)
794                        .map_err(|err| {
795                            self.metrics.file_open_failed.increment();
796
797                            emit::warn!(
798                                rt: emit::runtime::internal(),
799                                "failed to open {path}: {err}",
800                                #[emit::as_debug]
801                                path,
802                                err,
803                            );
804
805                            err
806                        })
807                        .ok()
808                }
809            }
810        }
811
812        file = file.filter(|file| {
813            file.file_size_bytes + batch.remaining_bytes <= self.max_file_size_bytes
814                && file.file_ts == file_ts
815        });
816
817        let mut file = if let Some(file) = file {
818            file
819        } else {
820            // Leave room for the file we're about to create
821            file_set.apply_retention(&self.fs, self.max_files.saturating_sub(1));
822
823            let mut path = PathBuf::from(self.dir.clone());
824
825            let file_id = file_id(
826                rolling_millis(self.roll_by, ts, parts),
827                rolling_id(&self.rng),
828            );
829
830            path.push(file_name(
831                &self.file_prefix,
832                &self.file_ext,
833                &file_ts,
834                &file_id,
835            ));
836
837            match ActiveFile::try_open_create(&self.fs, &path) {
838                Ok(file) => {
839                    self.metrics.file_create.increment();
840
841                    emit::debug!(
842                        rt: emit::runtime::internal(),
843                        "created {path}",
844                        #[emit::as_debug]
845                        path: file.file_path,
846                    );
847
848                    file
849                }
850                Err(err) => {
851                    self.metrics.file_create_failed.increment();
852
853                    emit::warn!(
854                        rt: emit::runtime::internal(),
855                        "failed to create {path}: {err}",
856                        #[emit::as_debug]
857                        path,
858                        err,
859                    );
860
861                    return Err(emit_batcher::BatchError::retry(err, batch));
862                }
863            }
864        };
865
866        let written_bytes = batch.remaining_bytes;
867
868        while let Some(buf) = batch.current() {
869            if let Err(err) = file.write_event(buf, self.separator) {
870                self.metrics.file_write_failed.increment();
871
872                span.complete_with(emit::span::completion::from_fn(|span| {
873                    emit::warn!(
874                        rt: emit::runtime::internal(),
875                        extent: span.extent(),
876                        props: span.props(),
877                        "failed to write event to {path}: {err}",
878                        #[emit::as_debug]
879                        path: file.file_path,
880                        err,
881                    )
882                }));
883
884                return Err(emit_batcher::BatchError::retry(err, batch));
885            }
886
887            batch.advance();
888        }
889
890        file.file
891            .flush()
892            .map_err(|e| emit_batcher::BatchError::no_retry(e))?;
893        file.file
894            .sync_all()
895            .map_err(|e| emit_batcher::BatchError::no_retry(e))?;
896
897        span.complete_with(emit::span::completion::from_fn(|span| {
898            emit::debug!(
899                rt: emit::runtime::internal(),
900                extent: span.extent(),
901                props: span.props(),
902                "wrote {written_bytes} bytes to {path}",
903                written_bytes,
904                #[emit::as_debug]
905                path: file.file_path,
906            )
907        }));
908
909        // Set the active file so the next batch can attempt to use it
910        // At this point the file is expected to be valid
911        self.active_file = Some(file);
912
913        Ok(())
914    }
915}
916
917struct ActiveFileSet<'a> {
918    dir: &'a str,
919    metrics: &'a InternalMetrics,
920    file_set: Vec<String>,
921}
922
923impl<'a> ActiveFileSet<'a> {
924    fn empty(metrics: &'a InternalMetrics, dir: &'a str) -> Self {
925        ActiveFileSet {
926            metrics,
927            dir,
928            file_set: Vec::new(),
929        }
930    }
931
932    fn read(
933        &mut self,
934        fs: impl Filesystem,
935        file_prefix: &str,
936        file_ext: &str,
937    ) -> Result<(), io::Error> {
938        self.file_set = Vec::new();
939
940        let read_dir = fs.read_dir_files(Path::new(&self.dir))?;
941
942        let mut file_set = Vec::new();
943
944        for path in read_dir {
945            let Some(file_name) = path.file_name() else {
946                continue;
947            };
948
949            let Some(file_name) = file_name.to_str() else {
950                continue;
951            };
952
953            if file_name.starts_with(&file_prefix) && file_name.ends_with(&file_ext) {
954                file_set.push(file_name.to_owned());
955            }
956        }
957
958        file_set.sort_by(|a, b| a.cmp(b).reverse());
959
960        self.file_set = file_set;
961
962        Ok(())
963    }
964
965    fn current_file_name(&self) -> Option<&str> {
966        // NOTE: If the clock shifts back (either jitters or through daylight savings)
967        // Then we may return a file from the future here instead of one that better
968        // matches the current timestamp. In these cases we'll end up creating a new file
969        // instead of potentially reusing one that does match.
970        self.file_set.first().map(|file_name| &**file_name)
971    }
972
973    fn apply_retention(&mut self, fs: impl Filesystem, max_files: usize) {
974        while self.file_set.len() >= max_files {
975            let mut path = PathBuf::from(self.dir);
976            path.push(self.file_set.pop().unwrap());
977
978            if let Err(err) = fs.remove_file(&path) {
979                self.metrics.file_delete_failed.increment();
980
981                emit::warn!(
982                    rt: emit::runtime::internal(),
983                    "failed to delete {path}: {err}",
984                    #[emit::as_debug]
985                    path,
986                    err,
987                );
988            } else {
989                self.metrics.file_delete.increment();
990
991                emit::debug!(
992                    rt: emit::runtime::internal(),
993                    "deleted {path}",
994                    #[emit::as_debug]
995                    path,
996                );
997            }
998        }
999    }
1000}
1001
1002struct ActiveFile {
1003    file: Box<dyn File + Send + Sync>,
1004    file_path: PathBuf,
1005    file_ts: String,
1006    file_needs_recovery: bool,
1007    file_size_bytes: usize,
1008}
1009
1010impl ActiveFile {
1011    fn try_open_reuse(
1012        fs: impl Filesystem,
1013        file_path: impl AsRef<Path>,
1014    ) -> Result<ActiveFile, io::Error> {
1015        let file_path = file_path.as_ref();
1016
1017        let file_ts = read_file_path_ts(file_path)?.to_owned();
1018
1019        let file = fs.open_existing(file_path)?;
1020
1021        let file_size_bytes = file.len()?;
1022
1023        Ok(ActiveFile {
1024            file,
1025            file_ts,
1026            file_path: file_path.into(),
1027            // The file is in an unknown state, so defensively assume
1028            // it needs to be recovered
1029            file_needs_recovery: true,
1030            file_size_bytes,
1031        })
1032    }
1033
1034    fn try_open_create(
1035        fs: impl Filesystem,
1036        file_path: impl AsRef<Path>,
1037    ) -> Result<ActiveFile, io::Error> {
1038        let file_path = file_path.as_ref();
1039
1040        let file_ts = read_file_path_ts(file_path)?.to_owned();
1041
1042        let file = fs.open_new(file_path)?;
1043
1044        // Sync the existence of this new file to the parent directory
1045        // This is only important on some platforms and filesystems
1046        fs.sync_parent(file_path)?;
1047
1048        Ok(ActiveFile {
1049            file,
1050            file_ts,
1051            file_path: file_path.into(),
1052            file_needs_recovery: false,
1053            file_size_bytes: 0,
1054        })
1055    }
1056
1057    fn write_event(&mut self, event_buf: &[u8], separator: &'static [u8]) -> Result<(), io::Error> {
1058        // If the file may be corrupted then terminate
1059        // any previously written content with a separator.
1060        // This ensures the event that's about to be written
1061        // isn't mangled together with an incomplete one written
1062        // previously
1063        if self.file_needs_recovery {
1064            self.file_size_bytes += separator.len();
1065            self.file.write_all(separator)?;
1066        }
1067
1068        self.file_needs_recovery = true;
1069
1070        self.file_size_bytes += event_buf.len();
1071        self.file.write_all(event_buf)?;
1072
1073        self.file_needs_recovery = false;
1074        Ok(())
1075    }
1076}
1077
1078fn dir_prefix_ext(file_set: impl AsRef<Path>) -> Result<(String, String, String), Error> {
1079    let file_set = file_set.as_ref();
1080
1081    let dir = if let Some(parent) = file_set.parent() {
1082        parent
1083            .to_str()
1084            .ok_or_else(|| "paths must be valid UTF8")
1085            .map_err(Error::new)?
1086            .to_owned()
1087    } else {
1088        String::new()
1089    };
1090
1091    let prefix = file_set
1092        .file_stem()
1093        .ok_or_else(|| "paths must include a file name")
1094        .map_err(Error::new)?
1095        .to_str()
1096        .ok_or_else(|| "paths must be valid UTF8")
1097        .map_err(Error::new)?
1098        .to_owned();
1099
1100    let ext = if let Some(ext) = file_set.extension() {
1101        ext.to_str()
1102            .ok_or_else(|| "paths must be valid UTF8")
1103            .map_err(Error::new)?
1104            .to_owned()
1105    } else {
1106        String::from("log")
1107    };
1108
1109    Ok((dir, prefix, ext))
1110}
1111
1112fn rolling_millis(roll_by: RollBy, ts: emit::Timestamp, parts: emit::timestamp::Parts) -> u32 {
1113    let truncated = match roll_by {
1114        RollBy::Day => emit::Timestamp::from_parts(emit::timestamp::Parts {
1115            years: parts.years,
1116            months: parts.months,
1117            days: parts.days,
1118            ..Default::default()
1119        })
1120        .unwrap(),
1121        RollBy::Hour => emit::Timestamp::from_parts(emit::timestamp::Parts {
1122            years: parts.years,
1123            months: parts.months,
1124            days: parts.days,
1125            hours: parts.hours,
1126            ..Default::default()
1127        })
1128        .unwrap(),
1129        RollBy::Minute => emit::Timestamp::from_parts(emit::timestamp::Parts {
1130            years: parts.years,
1131            months: parts.months,
1132            days: parts.days,
1133            hours: parts.hours,
1134            minutes: parts.minutes,
1135            ..Default::default()
1136        })
1137        .unwrap(),
1138    };
1139
1140    ts.duration_since(truncated).unwrap().as_millis() as u32
1141}
1142
1143fn rolling_id(rng: impl emit::Rng) -> u32 {
1144    rng.gen_u64().unwrap() as u32
1145}
1146
1147fn file_ts(roll_by: RollBy, parts: emit::timestamp::Parts) -> String {
1148    match roll_by {
1149        RollBy::Day => format!(
1150            "{:>04}-{:>02}-{:>02}",
1151            parts.years, parts.months, parts.days,
1152        ),
1153        RollBy::Hour => format!(
1154            "{:>04}-{:>02}-{:>02}-{:>02}",
1155            parts.years, parts.months, parts.days, parts.hours,
1156        ),
1157        RollBy::Minute => format!(
1158            "{:>04}-{:>02}-{:>02}-{:>02}-{:>02}",
1159            parts.years, parts.months, parts.days, parts.hours, parts.minutes,
1160        ),
1161    }
1162}
1163
1164fn file_id(rolling_millis: u32, rolling_id: u32) -> String {
1165    format!("{:<08}.{:<08x}", rolling_millis, rolling_id)
1166}
1167
1168fn read_file_name_ts(file_name: &str) -> Result<&str, io::Error> {
1169    file_name.split('.').skip(1).next().ok_or_else(|| {
1170        io::Error::new(
1171            io::ErrorKind::Other,
1172            "could not determine timestamp from filename",
1173        )
1174    })
1175}
1176
1177fn read_file_path_ts(path: &Path) -> Result<&str, io::Error> {
1178    let file_name = path
1179        .file_name()
1180        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "unable to determine filename"))?
1181        .to_str()
1182        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "file names must be valid UTF8"))?;
1183
1184    read_file_name_ts(file_name)
1185}
1186
1187fn file_name(file_prefix: &str, file_ext: &str, ts: &str, id: &str) -> String {
1188    format!("{}.{}.{}.{}", file_prefix, ts, id, file_ext)
1189}
1190
1191trait Filesystem {
1192    fn create_dir_all(&self, path: &Path) -> io::Result<()>;
1193
1194    fn sync_parent(&self, path: &Path) -> io::Result<()>;
1195
1196    fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>>;
1197
1198    fn remove_file(&self, path: &Path) -> io::Result<()>;
1199
1200    fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>>;
1201
1202    fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>>;
1203}
1204
1205impl<'a, F: Filesystem + ?Sized> Filesystem for &'a F {
1206    fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1207        (**self).create_dir_all(path)
1208    }
1209
1210    fn sync_parent(&self, path: &Path) -> io::Result<()> {
1211        (**self).sync_parent(path)
1212    }
1213
1214    fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1215        (**self).read_dir_files(path)
1216    }
1217
1218    fn remove_file(&self, path: &Path) -> io::Result<()> {
1219        (**self).remove_file(path)
1220    }
1221
1222    fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1223        (**self).open_new(path)
1224    }
1225
1226    fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1227        (**self).open_existing(path)
1228    }
1229}
1230
1231impl<F: Filesystem + ?Sized> Filesystem for Box<F> {
1232    fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1233        (**self).create_dir_all(path)
1234    }
1235
1236    fn sync_parent(&self, path: &Path) -> io::Result<()> {
1237        (**self).sync_parent(path)
1238    }
1239
1240    fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1241        (**self).read_dir_files(path)
1242    }
1243
1244    fn remove_file(&self, path: &Path) -> io::Result<()> {
1245        (**self).remove_file(path)
1246    }
1247
1248    fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1249        (**self).open_new(path)
1250    }
1251
1252    fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1253        (**self).open_existing(path)
1254    }
1255}
1256
1257struct StdFilesystem;
1258
1259impl StdFilesystem {
1260    fn new() -> Self {
1261        StdFilesystem
1262    }
1263}
1264
1265impl Filesystem for StdFilesystem {
1266    fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1267        std::fs::create_dir_all(path)
1268    }
1269
1270    fn sync_parent(&self, path: &Path) -> io::Result<()> {
1271        #[cfg(any(target_os = "linux", target_os = "macos"))]
1272        {
1273            if let Some(parent) = path.parent() {
1274                let _ = std::fs::OpenOptions::new()
1275                    .read(true)
1276                    .open(parent)?
1277                    .sync_all();
1278            }
1279
1280            Ok(())
1281        }
1282
1283        #[cfg(target_os = "windows")]
1284        {
1285            let _ = path;
1286
1287            Ok(())
1288        }
1289    }
1290
1291    fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1292        let iter = std::fs::read_dir(path)?.filter_map(|entry| {
1293            let entry = entry.ok()?;
1294
1295            if entry.metadata().ok()?.is_file() {
1296                Some(entry.path())
1297            } else {
1298                None
1299            }
1300        });
1301
1302        Ok(Box::new(iter))
1303    }
1304
1305    fn remove_file(&self, path: &Path) -> io::Result<()> {
1306        std::fs::remove_file(path)
1307    }
1308
1309    fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1310        let file = std::fs::OpenOptions::new()
1311            .create_new(true)
1312            .read(false)
1313            .append(true)
1314            .open(path)?;
1315
1316        Ok(Box::new(StdFile::new(file)))
1317    }
1318
1319    fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1320        let file = std::fs::OpenOptions::new()
1321            .read(false)
1322            .append(true)
1323            .open(path)?;
1324
1325        Ok(Box::new(StdFile::new(file)))
1326    }
1327}
1328
1329trait File: Write {
1330    fn len(&self) -> io::Result<usize>;
1331
1332    fn sync_all(&mut self) -> io::Result<()>;
1333}
1334
1335impl<'a, F: File + ?Sized> File for &'a mut F {
1336    fn len(&self) -> io::Result<usize> {
1337        (**self).len()
1338    }
1339
1340    fn sync_all(&mut self) -> io::Result<()> {
1341        (**self).sync_all()
1342    }
1343}
1344
1345impl<F: File + ?Sized> File for Box<F> {
1346    fn len(&self) -> io::Result<usize> {
1347        (**self).len()
1348    }
1349
1350    fn sync_all(&mut self) -> io::Result<()> {
1351        (**self).sync_all()
1352    }
1353}
1354
1355struct StdFile(std::fs::File);
1356
1357impl StdFile {
1358    fn new(file: std::fs::File) -> Self {
1359        StdFile(file)
1360    }
1361}
1362
1363impl File for StdFile {
1364    fn len(&self) -> io::Result<usize> {
1365        Ok(self.0.metadata()?.len() as usize)
1366    }
1367
1368    fn sync_all(&mut self) -> io::Result<()> {
1369        self.0.sync_all()
1370    }
1371}
1372
1373impl Write for StdFile {
1374    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1375        self.0.write(buf)
1376    }
1377
1378    fn flush(&mut self) -> io::Result<()> {
1379        self.0.flush()
1380    }
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385    use super::*;
1386
1387    use std::{
1388        cmp,
1389        collections::{HashMap, HashSet},
1390        mem,
1391        sync::Mutex,
1392        time::Duration,
1393    };
1394
1395    #[derive(Clone)]
1396    struct InMemoryFilesystem {
1397        incoming: Arc<Mutex<HashMap<String, InMemoryFile>>>,
1398        outgoing: Arc<Mutex<HashMap<String, InMemoryFile>>>,
1399        committed: Arc<Mutex<HashMap<String, InMemoryFile>>>,
1400    }
1401
1402    impl InMemoryFilesystem {
1403        fn new() -> Self {
1404            InMemoryFilesystem {
1405                incoming: Arc::new(Mutex::new(HashMap::new())),
1406                outgoing: Arc::new(Mutex::new(HashMap::new())),
1407                committed: Arc::new(Mutex::new(HashMap::new())),
1408            }
1409        }
1410
1411        fn get(&self, path: impl AsRef<str>) -> InMemoryFile {
1412            self.committed
1413                .lock()
1414                .unwrap()
1415                .get(path.as_ref())
1416                .unwrap()
1417                .clone()
1418        }
1419
1420        fn iter(&self) -> impl Iterator<Item = (String, InMemoryFile)> {
1421            self.committed
1422                .lock()
1423                .unwrap()
1424                .iter()
1425                .map(|(path, file)| (path.to_owned(), file.clone()))
1426                .collect::<Vec<_>>()
1427                .into_iter()
1428        }
1429    }
1430
1431    #[derive(Clone)]
1432    struct InMemoryFile {
1433        incoming: Arc<Mutex<Vec<u8>>>,
1434        committed: Arc<Mutex<Vec<u8>>>,
1435    }
1436
1437    impl InMemoryFile {
1438        fn new() -> Self {
1439            InMemoryFile {
1440                incoming: Arc::new(Mutex::new(Vec::new())),
1441                committed: Arc::new(Mutex::new(Vec::new())),
1442            }
1443        }
1444
1445        fn contents(&self) -> Vec<u8> {
1446            self.committed.lock().unwrap().clone()
1447        }
1448    }
1449
1450    fn pathstr(path: &Path) -> String {
1451        path.to_str().unwrap().replace('\\', "/")
1452    }
1453
1454    impl Filesystem for InMemoryFilesystem {
1455        fn create_dir_all(&self, _: &Path) -> io::Result<()> {
1456            Ok(())
1457        }
1458
1459        fn sync_parent(&self, path: &Path) -> io::Result<()> {
1460            let parent = pathstr(path.parent().unwrap());
1461
1462            let mut incoming = self.incoming.lock().unwrap();
1463            let mut outgoing = self.outgoing.lock().unwrap();
1464            let mut committed = self.committed.lock().unwrap();
1465
1466            // Add incoming entries to the committed set
1467            let mut retain_incoming = HashSet::new();
1468            for (path, file) in incoming.iter() {
1469                if path.starts_with(&*parent) {
1470                    assert!(
1471                        committed.insert(path.to_owned(), file.clone()).is_none(),
1472                        "duplicate file {path}"
1473                    );
1474                } else {
1475                    assert!(retain_incoming.insert(path.to_owned()));
1476                }
1477            }
1478
1479            // Clean up incoming and outgoing sets
1480            incoming.retain(|path, _| retain_incoming.contains(&*path));
1481            outgoing.retain(|path, _| !path.starts_with(&*parent));
1482
1483            Ok(())
1484        }
1485
1486        fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1487            let parent = pathstr(path);
1488
1489            let iter = self
1490                .committed
1491                .lock()
1492                .unwrap()
1493                .iter()
1494                .map(|(path, _)| path)
1495                .filter(|path| path.starts_with(&*parent))
1496                .map(|path| PathBuf::from(path))
1497                .collect::<Vec<_>>()
1498                .into_iter();
1499
1500            Ok(Box::new(iter))
1501        }
1502
1503        fn remove_file(&self, path: &Path) -> io::Result<()> {
1504            let path = pathstr(path);
1505
1506            let mut outgoing = self.outgoing.lock().unwrap();
1507            let mut committed = self.committed.lock().unwrap();
1508
1509            let file = committed.remove(&*path).unwrap();
1510
1511            assert!(
1512                outgoing.insert(path.clone(), file).is_none(),
1513                "already deleted file {path}"
1514            );
1515
1516            Ok(())
1517        }
1518
1519        fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1520            let path = pathstr(path);
1521
1522            let file = InMemoryFile::new();
1523
1524            let mut incoming = self.incoming.lock().unwrap();
1525            let committed = self.committed.lock().unwrap();
1526
1527            assert!(
1528                !committed.contains_key(&*path),
1529                "file {path} already exists"
1530            );
1531            assert!(
1532                incoming.insert(path.clone(), file.clone()).is_none(),
1533                "file {path} already exists"
1534            );
1535
1536            Ok(Box::new(file))
1537        }
1538
1539        fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1540            let path = pathstr(path);
1541
1542            let committed = self.committed.lock().unwrap();
1543
1544            Ok(Box::new(committed.get(&*path).unwrap().clone()))
1545        }
1546    }
1547
1548    impl File for InMemoryFile {
1549        fn len(&self) -> io::Result<usize> {
1550            Ok(self.committed.lock().unwrap().len())
1551        }
1552
1553        fn sync_all(&mut self) -> io::Result<()> {
1554            let incoming = mem::take(&mut *self.incoming.lock().unwrap());
1555            let mut committed = self.committed.lock().unwrap();
1556
1557            committed.extend(incoming);
1558
1559            Ok(())
1560        }
1561    }
1562
1563    impl Write for InMemoryFile {
1564        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1565            self.incoming.lock().unwrap().extend_from_slice(buf);
1566
1567            Ok(buf.len())
1568        }
1569
1570        fn flush(&mut self) -> io::Result<()> {
1571            Ok(())
1572        }
1573    }
1574
1575    #[derive(Clone)]
1576    struct TestClock(Arc<Mutex<emit::Timestamp>>);
1577
1578    impl TestClock {
1579        fn new() -> Self {
1580            TestClock(Arc::new(Mutex::new(emit::Timestamp::MIN)))
1581        }
1582
1583        fn advance(&self, by: Duration) {
1584            *self.0.lock().unwrap() += by;
1585        }
1586    }
1587
1588    impl emit::Clock for TestClock {
1589        fn now(&self) -> Option<emit::Timestamp> {
1590            Some(*self.0.lock().unwrap())
1591        }
1592    }
1593
1594    #[derive(Clone)]
1595    struct TestRng(Arc<Mutex<u128>>);
1596
1597    impl TestRng {
1598        fn new() -> Self {
1599            TestRng(Arc::new(Mutex::new(0)))
1600        }
1601
1602        fn increment(&self) {
1603            *self.0.lock().unwrap() += 1;
1604        }
1605    }
1606
1607    impl emit::Rng for TestRng {
1608        fn fill<A: AsMut<[u8]>>(&self, mut arr: A) -> Option<A> {
1609            let fill = self.0.lock().unwrap().to_le_bytes();
1610
1611            let mut buf = arr.as_mut();
1612
1613            while buf.len() > 0 {
1614                let copy = cmp::min(fill.len(), buf.len());
1615
1616                buf.copy_from_slice(&fill[..copy]);
1617
1618                buf = &mut buf[copy..];
1619            }
1620
1621            Some(arr)
1622        }
1623    }
1624
1625    #[test]
1626    fn worker_basic() {
1627        let fs = InMemoryFilesystem::new();
1628        let clock = TestClock::new();
1629        let rng = TestRng::new();
1630        let metrics = Arc::new(InternalMetrics::default());
1631
1632        let mut worker = Worker::new(
1633            metrics.clone(),
1634            fs.clone(),
1635            clock.clone(),
1636            rng.clone(),
1637            "logs".to_string(),
1638            "test".to_string(),
1639            "log".to_string(),
1640            RollBy::Minute,
1641            false,
1642            10,
1643            1024,
1644            b"\n",
1645        );
1646
1647        let mut batch = EventBatch::new();
1648        batch.push(*b"1\n");
1649        let Ok(()) = worker.on_batch(batch) else {
1650            panic!("failed to write batch");
1651        };
1652
1653        let mut batch = EventBatch::new();
1654        batch.push(*b"2\n");
1655        batch.push(*b"3\n");
1656        let Ok(()) = worker.on_batch(batch) else {
1657            panic!("failed to write batch");
1658        };
1659
1660        assert_eq!(1, fs.iter().count());
1661
1662        // Advance the clock; this will produce a new file
1663        clock.advance(Duration::from_secs(120));
1664
1665        let mut batch = EventBatch::new();
1666        batch.push(*b"1\n");
1667        let Ok(()) = worker.on_batch(batch) else {
1668            panic!("failed to write batch");
1669        };
1670
1671        assert_eq!(2, fs.iter().count());
1672
1673        assert_eq!(
1674            *b"1\n2\n3\n",
1675            *fs.get("logs/test.1970-01-01-00-00.00000000.00000000.log")
1676                .contents()
1677        );
1678        assert_eq!(
1679            *b"1\n",
1680            *fs.get("logs/test.1970-01-01-00-02.00000000.00000000.log")
1681                .contents()
1682        );
1683    }
1684
1685    #[test]
1686    fn worker_no_reuse() {
1687        let fs = InMemoryFilesystem::new();
1688        let clock = TestClock::new();
1689        let rng = TestRng::new();
1690        let metrics = Arc::new(InternalMetrics::default());
1691
1692        let mut worker = Worker::new(
1693            metrics.clone(),
1694            fs.clone(),
1695            clock.clone(),
1696            rng.clone(),
1697            "logs".to_string(),
1698            "test".to_string(),
1699            "log".to_string(),
1700            RollBy::Minute,
1701            false,
1702            10,
1703            1024,
1704            b"\n",
1705        );
1706
1707        let mut batch = EventBatch::new();
1708        batch.push(*b"1\n");
1709        let Ok(()) = worker.on_batch(batch) else {
1710            panic!("failed to write batch");
1711        };
1712
1713        drop(worker);
1714
1715        rng.increment();
1716
1717        // Re-open the worker
1718        // This should result in a new file
1719        let mut worker = Worker::new(
1720            metrics.clone(),
1721            fs.clone(),
1722            clock.clone(),
1723            rng.clone(),
1724            "logs".to_string(),
1725            "test".to_string(),
1726            "log".to_string(),
1727            RollBy::Minute,
1728            false,
1729            10,
1730            1024,
1731            b"\n",
1732        );
1733
1734        let mut batch = EventBatch::new();
1735        batch.push(*b"2\n");
1736        let Ok(()) = worker.on_batch(batch) else {
1737            panic!("failed to write batch");
1738        };
1739
1740        assert_eq!(2, fs.iter().count());
1741
1742        assert_eq!(
1743            *b"1\n",
1744            *fs.get("logs/test.1970-01-01-00-00.00000000.00000000.log")
1745                .contents()
1746        );
1747        assert_eq!(
1748            *b"2\n",
1749            *fs.get("logs/test.1970-01-01-00-00.00000000.00000001.log")
1750                .contents()
1751        );
1752    }
1753
1754    #[test]
1755    fn worker_reuse() {
1756        let fs = InMemoryFilesystem::new();
1757        let clock = TestClock::new();
1758        let rng = TestRng::new();
1759        let metrics = Arc::new(InternalMetrics::default());
1760
1761        let mut worker = Worker::new(
1762            metrics.clone(),
1763            fs.clone(),
1764            clock.clone(),
1765            rng.clone(),
1766            "logs".to_string(),
1767            "test".to_string(),
1768            "log".to_string(),
1769            RollBy::Minute,
1770            true,
1771            10,
1772            1024,
1773            b"\n",
1774        );
1775
1776        let mut batch = EventBatch::new();
1777        batch.push(*b"1\n");
1778        let Ok(()) = worker.on_batch(batch) else {
1779            panic!("failed to write batch");
1780        };
1781
1782        drop(worker);
1783
1784        // Re-open the worker
1785        // This should re-use the existing file
1786        let mut worker = Worker::new(
1787            metrics.clone(),
1788            fs.clone(),
1789            clock.clone(),
1790            rng.clone(),
1791            "logs".to_string(),
1792            "test".to_string(),
1793            "log".to_string(),
1794            RollBy::Minute,
1795            true,
1796            10,
1797            1024,
1798            b"\n",
1799        );
1800
1801        let mut batch = EventBatch::new();
1802        batch.push(*b"2\n");
1803        let Ok(()) = worker.on_batch(batch) else {
1804            panic!("failed to write batch");
1805        };
1806
1807        assert_eq!(1, fs.iter().count());
1808
1809        // We currently always append a newline on each iteration
1810        // This could be optimized away in the future if we want
1811        assert_eq!(
1812            *b"1\n\n2\n",
1813            *fs.get("logs/test.1970-01-01-00-00.00000000.00000000.log")
1814                .contents()
1815        );
1816    }
1817
1818    #[test]
1819    fn file_closes_bg_thread_on_drop() {
1820        let mut files = set_with_writer(
1821            "./target/logs/file_closes_bg_thread_on_drop/logs.txt",
1822            |_, _| Ok(()),
1823            b"\0",
1824        )
1825        .spawn();
1826
1827        let handle = {
1828            let inner = files.inner.take().unwrap();
1829
1830            inner._handle
1831        };
1832
1833        drop(files);
1834
1835        // Ensure the background thread is torn down
1836        handle.join().unwrap();
1837    }
1838}