1#![doc(html_logo_url = "https://raw.githubusercontent.com/emit-rs/emit/main/asset/logo.svg")]
134#![deny(missing_docs)]
135
136mod internal_metrics;
137
138use std::{
139 fmt,
140 io::{self, Write},
141 mem,
142 path::{Path, PathBuf},
143 sync::Arc,
144 thread,
145};
146
147use emit::{
148 clock::{Clock, ErasedClock},
149 platform::{rand_rng::RandRng, system_clock::SystemClock},
150 rng::{ErasedRng, Rng},
151};
152use emit_batcher::BatchError;
153use internal_metrics::InternalMetrics;
154
155const DEFAULT_ROLL_BY: RollBy = RollBy::Hour;
156const DEFAULT_MAX_FILES: usize = 32;
157const DEFAULT_MAX_FILE_SIZE_BYTES: usize = 1024 * 1024 * 1024; const DEFAULT_REUSE_FILES: bool = false;
159
160pub use internal_metrics::*;
161
162pub struct Error(Box<dyn std::error::Error + Send + Sync>);
166
167impl Error {
168 fn new(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
169 Error(e.into())
170 }
171}
172
173impl fmt::Debug for Error {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 fmt::Debug::fmt(&self.0, f)
176 }
177}
178
179impl fmt::Display for Error {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 fmt::Display::fmt(&self.0, f)
182 }
183}
184
185impl std::error::Error for Error {
186 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
187 self.0.source()
188 }
189}
190
191#[cfg(feature = "default_writer")]
207pub fn set(file_set: impl AsRef<Path>) -> FileSetBuilder {
208 FileSetBuilder::new(file_set.as_ref())
209}
210
211pub fn set_with_writer(
221 file_set: impl AsRef<Path>,
222 writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
223 + Send
224 + Sync
225 + 'static,
226 separator: &'static [u8],
227) -> FileSetBuilder {
228 FileSetBuilder::new_with_writer(file_set.as_ref(), writer, separator)
229}
230
231pub struct FileSetBuilder {
241 file_set: PathBuf,
242 roll_by: RollBy,
243 max_files: usize,
244 max_file_size_bytes: usize,
245 reuse_files: bool,
246 writer: Box<
247 dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
248 + Send
249 + Sync,
250 >,
251 separator: &'static [u8],
252}
253
254#[derive(Debug, Clone, Copy)]
255enum RollBy {
256 Day,
257 Hour,
258 Minute,
259}
260
261impl FileSetBuilder {
262 #[cfg(feature = "default_writer")]
274 pub fn new(file_set: impl Into<PathBuf>) -> Self {
275 Self::new_with_writer(file_set, default_writer, b"\n")
276 }
277
278 pub fn new_with_writer(
294 file_set: impl Into<PathBuf>,
295 writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
296 + Send
297 + Sync
298 + 'static,
299 separator: &'static [u8],
300 ) -> Self {
301 FileSetBuilder {
302 file_set: file_set.into(),
303 roll_by: DEFAULT_ROLL_BY,
304 max_files: DEFAULT_MAX_FILES,
305 max_file_size_bytes: DEFAULT_MAX_FILE_SIZE_BYTES,
306 reuse_files: DEFAULT_REUSE_FILES,
307 writer: Box::new(writer),
308 separator,
309 }
310 }
311
312 pub fn roll_by_day(mut self) -> Self {
316 self.roll_by = RollBy::Day;
317 self
318 }
319
320 pub fn roll_by_hour(mut self) -> Self {
324 self.roll_by = RollBy::Hour;
325 self
326 }
327
328 pub fn roll_by_minute(mut self) -> Self {
332 self.roll_by = RollBy::Minute;
333 self
334 }
335
336 pub fn max_files(mut self, max_files: usize) -> Self {
342 self.max_files = max_files;
343 self
344 }
345
346 pub fn max_file_size_bytes(mut self, max_file_size_bytes: usize) -> Self {
352 self.max_file_size_bytes = max_file_size_bytes;
353 self
354 }
355
356 pub fn reuse_files(mut self, reuse_files: bool) -> Self {
364 self.reuse_files = reuse_files;
365 self
366 }
367
368 pub fn writer(
376 mut self,
377 writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
378 + Send
379 + Sync
380 + 'static,
381 separator: &'static [u8],
382 ) -> Self {
383 self.writer = Box::new(writer);
384 self.separator = separator;
385 self
386 }
387
388 pub fn spawn(self) -> FileSet {
394 let metrics = Arc::new(InternalMetrics::default());
395
396 let inner = match self.spawn_inner(metrics.clone()) {
397 Ok(inner) => Some(inner),
398 Err(err) => {
399 emit::error!(
400 rt: emit::runtime::internal(),
401 "file set configuration is invalid; no events will be written: {err}"
402 );
403
404 metrics.configuration_failed.increment();
405
406 None
407 }
408 };
409
410 FileSet { metrics, inner }
411 }
412
413 fn spawn_inner(self, metrics: Arc<InternalMetrics>) -> Result<FileSetInner, Error> {
414 let (dir, file_prefix, file_ext) = dir_prefix_ext(self.file_set).map_err(Error::new)?;
415
416 let mut worker = Worker::new(
417 metrics.clone(),
418 StdFilesystem::new(),
419 SystemClock::new(),
420 RandRng::new(),
421 dir,
422 file_prefix,
423 file_ext,
424 self.roll_by,
425 self.reuse_files,
426 self.max_files,
427 self.max_file_size_bytes,
428 self.separator,
429 );
430
431 let (sender, receiver) = emit_batcher::bounded(10_000);
432
433 let handle = emit_batcher::sync::spawn("emit_file_worker", receiver, move |batch| {
434 worker.on_batch(batch)
435 })
436 .map_err(Error::new)?;
437
438 Ok(FileSetInner {
439 sender,
440 metrics,
441 writer: self.writer,
442 separator: self.separator,
443 _handle: handle,
444 })
445 }
446}
447
448pub struct FileSet {
454 inner: Option<FileSetInner>,
455 metrics: Arc<InternalMetrics>,
456}
457
458struct FileSetInner {
459 sender: emit_batcher::Sender<EventBatch>,
460 metrics: Arc<InternalMetrics>,
461 writer: Box<
462 dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
463 + Send
464 + Sync,
465 >,
466 separator: &'static [u8],
467 _handle: thread::JoinHandle<()>,
468}
469
470impl emit::Emitter for FileSet {
471 fn emit<E: emit::event::ToEvent>(&self, evt: E) {
472 self.inner.emit(evt)
473 }
474
475 fn blocking_flush(&self, timeout: std::time::Duration) -> bool {
476 self.inner.blocking_flush(timeout)
477 }
478}
479
480impl emit::Emitter for FileSetInner {
481 fn emit<E: emit::event::ToEvent>(&self, evt: E) {
482 let evt = evt.to_event();
483
484 let mut buf = FileBuf::new();
486
487 match (self.writer)(&mut buf, &evt.erase()) {
488 Ok(()) => {
489 if !buf.0.ends_with(self.separator) {
492 buf.extend_from_slice(self.separator);
493 }
494
495 self.sender.send(buf.into_boxed_slice());
496 }
497 Err(err) => {
498 self.metrics.event_format_failed.increment();
499
500 emit::warn!(
501 rt: emit::runtime::internal(),
502 "failed to format file event payload: {err}",
503 )
504 }
505 };
506 }
507
508 fn blocking_flush(&self, timeout: std::time::Duration) -> bool {
509 emit_batcher::blocking_flush(&self.sender, timeout)
510 }
511}
512
513impl FileSet {
514 pub fn metric_source(&self) -> FileSetMetrics {
520 FileSetMetrics {
521 channel_metrics: self
522 .inner
523 .as_ref()
524 .map(|inner| inner.sender.metric_source()),
525 metrics: self.metrics.clone(),
526 }
527 }
528}
529
530pub struct FileBuf(Vec<u8>);
534
535impl FileBuf {
536 fn new() -> Self {
537 FileBuf(Vec::new())
538 }
539
540 pub fn push(&mut self, byte: u8) {
544 self.0.push(byte)
545 }
546
547 pub fn extend_from_slice(&mut self, bytes: &[u8]) {
551 self.0.extend_from_slice(bytes)
552 }
553
554 fn into_boxed_slice(self) -> Box<[u8]> {
555 self.0.into_boxed_slice()
556 }
557}
558
559impl io::Write for FileBuf {
560 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
561 self.0.write(buf)
562 }
563
564 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
565 self.0.write_all(buf)
566 }
567
568 fn flush(&mut self) -> io::Result<()> {
569 self.0.flush()
570 }
571}
572
573#[cfg(feature = "default_writer")]
574fn default_writer(
575 buf: &mut FileBuf,
576 evt: &emit::Event<&dyn emit::props::ErasedProps>,
577) -> io::Result<()> {
578 use std::ops::ControlFlow;
579
580 use emit::{
581 well_known::{KEY_MDL, KEY_MSG, KEY_TPL, KEY_TS, KEY_TS_START},
582 Props as _,
583 };
584
585 struct EventValue<'a, P>(&'a emit::Event<'a, P>);
586
587 impl<'a, P: emit::Props> sval::Value for EventValue<'a, P> {
588 fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(
589 &'sval self,
590 stream: &mut S,
591 ) -> sval::Result {
592 stream.record_begin(None, None, None, None)?;
593
594 if let Some(extent) = self.0.extent() {
595 if let Some(range) = extent.as_range() {
596 stream.record_value_begin(None, &sval::Label::new(KEY_TS_START))?;
597 sval::stream_display(&mut *stream, &range.start)?;
598 stream.record_value_end(None, &sval::Label::new(KEY_TS_START))?;
599 }
600
601 stream.record_value_begin(None, &sval::Label::new(KEY_TS))?;
602 sval::stream_display(&mut *stream, extent.as_point())?;
603 stream.record_value_end(None, &sval::Label::new(KEY_TS))?;
604 }
605
606 stream.record_value_begin(None, &sval::Label::new(KEY_MDL))?;
607 sval::stream_display(&mut *stream, self.0.mdl())?;
608 stream.record_value_end(None, &sval::Label::new(KEY_MDL))?;
609
610 stream.record_value_begin(None, &sval::Label::new(KEY_MSG))?;
611 sval::stream_display(&mut *stream, self.0.msg())?;
612 stream.record_value_end(None, &sval::Label::new(KEY_MSG))?;
613
614 stream.record_value_begin(None, &sval::Label::new(KEY_TPL))?;
615 sval::stream_display(&mut *stream, self.0.tpl())?;
616 stream.record_value_end(None, &sval::Label::new(KEY_TPL))?;
617
618 let _ = self.0.props().dedup().for_each(|k, v| {
619 match (|| {
620 stream.record_value_begin(None, &sval::Label::new_computed(k.get()))?;
621 stream.value_computed(&v)?;
622 stream.record_value_end(None, &sval::Label::new_computed(k.get()))?;
623
624 Ok::<(), sval::Error>(())
625 })() {
626 Ok(()) => ControlFlow::Continue(()),
627 Err(_) => ControlFlow::Break(()),
628 }
629 });
630
631 stream.record_end(None, None, None)
632 }
633 }
634
635 sval_json::stream_to_io_write(buf, EventValue(evt))
636 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
637
638 Ok(())
639}
640
641struct EventBatch {
642 bufs: Vec<Box<[u8]>>,
643 remaining_bytes: usize,
644 index: usize,
645}
646
647impl EventBatch {
648 fn new() -> Self {
649 EventBatch {
650 bufs: Vec::new(),
651 remaining_bytes: 0,
652 index: 0,
653 }
654 }
655
656 fn push(&mut self, buf: impl Into<Box<[u8]>>) {
657 let item = buf.into();
658
659 self.remaining_bytes += item.len();
660 self.bufs.push(item);
661 }
662}
663
664impl emit_batcher::Channel for EventBatch {
665 type Item = Box<[u8]>;
666
667 fn new() -> Self {
668 EventBatch::new()
669 }
670
671 fn push<'a>(&mut self, item: Self::Item) {
672 self.push(item)
673 }
674
675 fn len(&self) -> usize {
676 self.bufs.len() - self.index
677 }
678
679 fn clear(&mut self) {
680 self.bufs.clear()
681 }
682}
683
684impl EventBatch {
685 fn current(&self) -> Option<&[u8]> {
686 self.bufs.get(self.index).map(|buf| &**buf)
687 }
688
689 fn advance(&mut self) {
690 let advanced = mem::take(&mut self.bufs[self.index]);
691
692 self.index += 1;
693 self.remaining_bytes -= advanced.len();
694 }
695}
696
697struct Worker {
698 metrics: Arc<InternalMetrics>,
699 clock: Box<dyn ErasedClock + Send + Sync>,
700 rng: Box<dyn ErasedRng + Send + Sync>,
701 fs: Box<dyn Filesystem + Send + Sync>,
702 active_file: Option<ActiveFile>,
703 roll_by: RollBy,
704 max_files: usize,
705 max_file_size_bytes: usize,
706 reuse_files: bool,
707 dir: String,
708 file_prefix: String,
709 file_ext: String,
710 separator: &'static [u8],
711}
712
713impl Worker {
714 fn new(
715 metrics: Arc<InternalMetrics>,
716 fs: impl Filesystem + Send + Sync + 'static,
717 clock: impl Clock + Send + Sync + 'static,
718 rng: impl Rng + Send + Sync + 'static,
719 dir: String,
720 file_prefix: String,
721 file_ext: String,
722 roll_by: RollBy,
723 reuse_files: bool,
724 max_files: usize,
725 max_file_size_bytes: usize,
726 separator: &'static [u8],
727 ) -> Self {
728 Worker {
729 metrics,
730 fs: Box::new(fs),
731 clock: Box::new(clock),
732 rng: Box::new(rng),
733 active_file: None,
734 roll_by,
735 max_files,
736 max_file_size_bytes,
737 reuse_files,
738 dir,
739 file_prefix,
740 file_ext,
741 separator,
742 }
743 }
744
745 #[emit::span(rt: emit::runtime::internal(), guard: span, "write file batch")]
746 fn on_batch(&mut self, mut batch: EventBatch) -> Result<(), BatchError<EventBatch>> {
747 let ts = self.clock.now().unwrap();
748 let parts = ts.to_parts();
749
750 let file_ts = file_ts(self.roll_by, parts);
751
752 let mut file = self.active_file.take();
753 let mut file_set = ActiveFileSet::empty(&self.metrics, &self.dir);
754
755 if file.is_none() {
756 if let Err(err) = self.fs.create_dir_all(Path::new(&self.dir)) {
757 span.complete_with(emit::span::completion::from_fn(|span| {
758 emit::warn!(
759 rt: emit::runtime::internal(),
760 extent: span.extent(),
761 props: span.props(),
762 "failed to create root directory {path}: {err}",
763 #[emit::as_debug]
764 path: &self.dir,
765 err,
766 )
767 }));
768
769 return Err(emit_batcher::BatchError::retry(err, batch));
770 }
771
772 let _ = file_set
773 .read(&self.fs, &self.file_prefix, &self.file_ext)
774 .map_err(|err| {
775 self.metrics.file_set_read_failed.increment();
776
777 emit::warn!(
778 rt: emit::runtime::internal(),
779 "failed to files in read {path}: {err}",
780 #[emit::as_debug]
781 path: &file_set.dir,
782 err,
783 );
784
785 err
786 });
787
788 if self.reuse_files {
789 if let Some(file_name) = file_set.current_file_name() {
790 let mut path = PathBuf::from(&self.dir);
791 path.push(file_name);
792
793 file = ActiveFile::try_open_reuse(&self.fs, &path)
794 .map_err(|err| {
795 self.metrics.file_open_failed.increment();
796
797 emit::warn!(
798 rt: emit::runtime::internal(),
799 "failed to open {path}: {err}",
800 #[emit::as_debug]
801 path,
802 err,
803 );
804
805 err
806 })
807 .ok()
808 }
809 }
810 }
811
812 file = file.filter(|file| {
813 file.file_size_bytes + batch.remaining_bytes <= self.max_file_size_bytes
814 && file.file_ts == file_ts
815 });
816
817 let mut file = if let Some(file) = file {
818 file
819 } else {
820 file_set.apply_retention(&self.fs, self.max_files.saturating_sub(1));
822
823 let mut path = PathBuf::from(self.dir.clone());
824
825 let file_id = file_id(
826 rolling_millis(self.roll_by, ts, parts),
827 rolling_id(&self.rng),
828 );
829
830 path.push(file_name(
831 &self.file_prefix,
832 &self.file_ext,
833 &file_ts,
834 &file_id,
835 ));
836
837 match ActiveFile::try_open_create(&self.fs, &path) {
838 Ok(file) => {
839 self.metrics.file_create.increment();
840
841 emit::debug!(
842 rt: emit::runtime::internal(),
843 "created {path}",
844 #[emit::as_debug]
845 path: file.file_path,
846 );
847
848 file
849 }
850 Err(err) => {
851 self.metrics.file_create_failed.increment();
852
853 emit::warn!(
854 rt: emit::runtime::internal(),
855 "failed to create {path}: {err}",
856 #[emit::as_debug]
857 path,
858 err,
859 );
860
861 return Err(emit_batcher::BatchError::retry(err, batch));
862 }
863 }
864 };
865
866 let written_bytes = batch.remaining_bytes;
867
868 while let Some(buf) = batch.current() {
869 if let Err(err) = file.write_event(buf, self.separator) {
870 self.metrics.file_write_failed.increment();
871
872 span.complete_with(emit::span::completion::from_fn(|span| {
873 emit::warn!(
874 rt: emit::runtime::internal(),
875 extent: span.extent(),
876 props: span.props(),
877 "failed to write event to {path}: {err}",
878 #[emit::as_debug]
879 path: file.file_path,
880 err,
881 )
882 }));
883
884 return Err(emit_batcher::BatchError::retry(err, batch));
885 }
886
887 batch.advance();
888 }
889
890 file.file
891 .flush()
892 .map_err(|e| emit_batcher::BatchError::no_retry(e))?;
893 file.file
894 .sync_all()
895 .map_err(|e| emit_batcher::BatchError::no_retry(e))?;
896
897 span.complete_with(emit::span::completion::from_fn(|span| {
898 emit::debug!(
899 rt: emit::runtime::internal(),
900 extent: span.extent(),
901 props: span.props(),
902 "wrote {written_bytes} bytes to {path}",
903 written_bytes,
904 #[emit::as_debug]
905 path: file.file_path,
906 )
907 }));
908
909 self.active_file = Some(file);
912
913 Ok(())
914 }
915}
916
917struct ActiveFileSet<'a> {
918 dir: &'a str,
919 metrics: &'a InternalMetrics,
920 file_set: Vec<String>,
921}
922
923impl<'a> ActiveFileSet<'a> {
924 fn empty(metrics: &'a InternalMetrics, dir: &'a str) -> Self {
925 ActiveFileSet {
926 metrics,
927 dir,
928 file_set: Vec::new(),
929 }
930 }
931
932 fn read(
933 &mut self,
934 fs: impl Filesystem,
935 file_prefix: &str,
936 file_ext: &str,
937 ) -> Result<(), io::Error> {
938 self.file_set = Vec::new();
939
940 let read_dir = fs.read_dir_files(Path::new(&self.dir))?;
941
942 let mut file_set = Vec::new();
943
944 for path in read_dir {
945 let Some(file_name) = path.file_name() else {
946 continue;
947 };
948
949 let Some(file_name) = file_name.to_str() else {
950 continue;
951 };
952
953 if file_name.starts_with(&file_prefix) && file_name.ends_with(&file_ext) {
954 file_set.push(file_name.to_owned());
955 }
956 }
957
958 file_set.sort_by(|a, b| a.cmp(b).reverse());
959
960 self.file_set = file_set;
961
962 Ok(())
963 }
964
965 fn current_file_name(&self) -> Option<&str> {
966 self.file_set.first().map(|file_name| &**file_name)
971 }
972
973 fn apply_retention(&mut self, fs: impl Filesystem, max_files: usize) {
974 while self.file_set.len() >= max_files {
975 let mut path = PathBuf::from(self.dir);
976 path.push(self.file_set.pop().unwrap());
977
978 if let Err(err) = fs.remove_file(&path) {
979 self.metrics.file_delete_failed.increment();
980
981 emit::warn!(
982 rt: emit::runtime::internal(),
983 "failed to delete {path}: {err}",
984 #[emit::as_debug]
985 path,
986 err,
987 );
988 } else {
989 self.metrics.file_delete.increment();
990
991 emit::debug!(
992 rt: emit::runtime::internal(),
993 "deleted {path}",
994 #[emit::as_debug]
995 path,
996 );
997 }
998 }
999 }
1000}
1001
1002struct ActiveFile {
1003 file: Box<dyn File + Send + Sync>,
1004 file_path: PathBuf,
1005 file_ts: String,
1006 file_needs_recovery: bool,
1007 file_size_bytes: usize,
1008}
1009
1010impl ActiveFile {
1011 fn try_open_reuse(
1012 fs: impl Filesystem,
1013 file_path: impl AsRef<Path>,
1014 ) -> Result<ActiveFile, io::Error> {
1015 let file_path = file_path.as_ref();
1016
1017 let file_ts = read_file_path_ts(file_path)?.to_owned();
1018
1019 let file = fs.open_existing(file_path)?;
1020
1021 let file_size_bytes = file.len()?;
1022
1023 Ok(ActiveFile {
1024 file,
1025 file_ts,
1026 file_path: file_path.into(),
1027 file_needs_recovery: true,
1030 file_size_bytes,
1031 })
1032 }
1033
1034 fn try_open_create(
1035 fs: impl Filesystem,
1036 file_path: impl AsRef<Path>,
1037 ) -> Result<ActiveFile, io::Error> {
1038 let file_path = file_path.as_ref();
1039
1040 let file_ts = read_file_path_ts(file_path)?.to_owned();
1041
1042 let file = fs.open_new(file_path)?;
1043
1044 fs.sync_parent(file_path)?;
1047
1048 Ok(ActiveFile {
1049 file,
1050 file_ts,
1051 file_path: file_path.into(),
1052 file_needs_recovery: false,
1053 file_size_bytes: 0,
1054 })
1055 }
1056
1057 fn write_event(&mut self, event_buf: &[u8], separator: &'static [u8]) -> Result<(), io::Error> {
1058 if self.file_needs_recovery {
1064 self.file_size_bytes += separator.len();
1065 self.file.write_all(separator)?;
1066 }
1067
1068 self.file_needs_recovery = true;
1069
1070 self.file_size_bytes += event_buf.len();
1071 self.file.write_all(event_buf)?;
1072
1073 self.file_needs_recovery = false;
1074 Ok(())
1075 }
1076}
1077
1078fn dir_prefix_ext(file_set: impl AsRef<Path>) -> Result<(String, String, String), Error> {
1079 let file_set = file_set.as_ref();
1080
1081 let dir = if let Some(parent) = file_set.parent() {
1082 parent
1083 .to_str()
1084 .ok_or_else(|| "paths must be valid UTF8")
1085 .map_err(Error::new)?
1086 .to_owned()
1087 } else {
1088 String::new()
1089 };
1090
1091 let prefix = file_set
1092 .file_stem()
1093 .ok_or_else(|| "paths must include a file name")
1094 .map_err(Error::new)?
1095 .to_str()
1096 .ok_or_else(|| "paths must be valid UTF8")
1097 .map_err(Error::new)?
1098 .to_owned();
1099
1100 let ext = if let Some(ext) = file_set.extension() {
1101 ext.to_str()
1102 .ok_or_else(|| "paths must be valid UTF8")
1103 .map_err(Error::new)?
1104 .to_owned()
1105 } else {
1106 String::from("log")
1107 };
1108
1109 Ok((dir, prefix, ext))
1110}
1111
1112fn rolling_millis(roll_by: RollBy, ts: emit::Timestamp, parts: emit::timestamp::Parts) -> u32 {
1113 let truncated = match roll_by {
1114 RollBy::Day => emit::Timestamp::from_parts(emit::timestamp::Parts {
1115 years: parts.years,
1116 months: parts.months,
1117 days: parts.days,
1118 ..Default::default()
1119 })
1120 .unwrap(),
1121 RollBy::Hour => emit::Timestamp::from_parts(emit::timestamp::Parts {
1122 years: parts.years,
1123 months: parts.months,
1124 days: parts.days,
1125 hours: parts.hours,
1126 ..Default::default()
1127 })
1128 .unwrap(),
1129 RollBy::Minute => emit::Timestamp::from_parts(emit::timestamp::Parts {
1130 years: parts.years,
1131 months: parts.months,
1132 days: parts.days,
1133 hours: parts.hours,
1134 minutes: parts.minutes,
1135 ..Default::default()
1136 })
1137 .unwrap(),
1138 };
1139
1140 ts.duration_since(truncated).unwrap().as_millis() as u32
1141}
1142
1143fn rolling_id(rng: impl emit::Rng) -> u32 {
1144 rng.gen_u64().unwrap() as u32
1145}
1146
1147fn file_ts(roll_by: RollBy, parts: emit::timestamp::Parts) -> String {
1148 match roll_by {
1149 RollBy::Day => format!(
1150 "{:>04}-{:>02}-{:>02}",
1151 parts.years, parts.months, parts.days,
1152 ),
1153 RollBy::Hour => format!(
1154 "{:>04}-{:>02}-{:>02}-{:>02}",
1155 parts.years, parts.months, parts.days, parts.hours,
1156 ),
1157 RollBy::Minute => format!(
1158 "{:>04}-{:>02}-{:>02}-{:>02}-{:>02}",
1159 parts.years, parts.months, parts.days, parts.hours, parts.minutes,
1160 ),
1161 }
1162}
1163
1164fn file_id(rolling_millis: u32, rolling_id: u32) -> String {
1165 format!("{:<08}.{:<08x}", rolling_millis, rolling_id)
1166}
1167
1168fn read_file_name_ts(file_name: &str) -> Result<&str, io::Error> {
1169 file_name.split('.').skip(1).next().ok_or_else(|| {
1170 io::Error::new(
1171 io::ErrorKind::Other,
1172 "could not determine timestamp from filename",
1173 )
1174 })
1175}
1176
1177fn read_file_path_ts(path: &Path) -> Result<&str, io::Error> {
1178 let file_name = path
1179 .file_name()
1180 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "unable to determine filename"))?
1181 .to_str()
1182 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "file names must be valid UTF8"))?;
1183
1184 read_file_name_ts(file_name)
1185}
1186
1187fn file_name(file_prefix: &str, file_ext: &str, ts: &str, id: &str) -> String {
1188 format!("{}.{}.{}.{}", file_prefix, ts, id, file_ext)
1189}
1190
1191trait Filesystem {
1192 fn create_dir_all(&self, path: &Path) -> io::Result<()>;
1193
1194 fn sync_parent(&self, path: &Path) -> io::Result<()>;
1195
1196 fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>>;
1197
1198 fn remove_file(&self, path: &Path) -> io::Result<()>;
1199
1200 fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>>;
1201
1202 fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>>;
1203}
1204
1205impl<'a, F: Filesystem + ?Sized> Filesystem for &'a F {
1206 fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1207 (**self).create_dir_all(path)
1208 }
1209
1210 fn sync_parent(&self, path: &Path) -> io::Result<()> {
1211 (**self).sync_parent(path)
1212 }
1213
1214 fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1215 (**self).read_dir_files(path)
1216 }
1217
1218 fn remove_file(&self, path: &Path) -> io::Result<()> {
1219 (**self).remove_file(path)
1220 }
1221
1222 fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1223 (**self).open_new(path)
1224 }
1225
1226 fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1227 (**self).open_existing(path)
1228 }
1229}
1230
1231impl<F: Filesystem + ?Sized> Filesystem for Box<F> {
1232 fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1233 (**self).create_dir_all(path)
1234 }
1235
1236 fn sync_parent(&self, path: &Path) -> io::Result<()> {
1237 (**self).sync_parent(path)
1238 }
1239
1240 fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1241 (**self).read_dir_files(path)
1242 }
1243
1244 fn remove_file(&self, path: &Path) -> io::Result<()> {
1245 (**self).remove_file(path)
1246 }
1247
1248 fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1249 (**self).open_new(path)
1250 }
1251
1252 fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1253 (**self).open_existing(path)
1254 }
1255}
1256
1257struct StdFilesystem;
1258
1259impl StdFilesystem {
1260 fn new() -> Self {
1261 StdFilesystem
1262 }
1263}
1264
1265impl Filesystem for StdFilesystem {
1266 fn create_dir_all(&self, path: &Path) -> io::Result<()> {
1267 std::fs::create_dir_all(path)
1268 }
1269
1270 fn sync_parent(&self, path: &Path) -> io::Result<()> {
1271 #[cfg(any(target_os = "linux", target_os = "macos"))]
1272 {
1273 if let Some(parent) = path.parent() {
1274 let _ = std::fs::OpenOptions::new()
1275 .read(true)
1276 .open(parent)?
1277 .sync_all();
1278 }
1279
1280 Ok(())
1281 }
1282
1283 #[cfg(target_os = "windows")]
1284 {
1285 let _ = path;
1286
1287 Ok(())
1288 }
1289 }
1290
1291 fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1292 let iter = std::fs::read_dir(path)?.filter_map(|entry| {
1293 let entry = entry.ok()?;
1294
1295 if entry.metadata().ok()?.is_file() {
1296 Some(entry.path())
1297 } else {
1298 None
1299 }
1300 });
1301
1302 Ok(Box::new(iter))
1303 }
1304
1305 fn remove_file(&self, path: &Path) -> io::Result<()> {
1306 std::fs::remove_file(path)
1307 }
1308
1309 fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1310 let file = std::fs::OpenOptions::new()
1311 .create_new(true)
1312 .read(false)
1313 .append(true)
1314 .open(path)?;
1315
1316 Ok(Box::new(StdFile::new(file)))
1317 }
1318
1319 fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1320 let file = std::fs::OpenOptions::new()
1321 .read(false)
1322 .append(true)
1323 .open(path)?;
1324
1325 Ok(Box::new(StdFile::new(file)))
1326 }
1327}
1328
1329trait File: Write {
1330 fn len(&self) -> io::Result<usize>;
1331
1332 fn sync_all(&mut self) -> io::Result<()>;
1333}
1334
1335impl<'a, F: File + ?Sized> File for &'a mut F {
1336 fn len(&self) -> io::Result<usize> {
1337 (**self).len()
1338 }
1339
1340 fn sync_all(&mut self) -> io::Result<()> {
1341 (**self).sync_all()
1342 }
1343}
1344
1345impl<F: File + ?Sized> File for Box<F> {
1346 fn len(&self) -> io::Result<usize> {
1347 (**self).len()
1348 }
1349
1350 fn sync_all(&mut self) -> io::Result<()> {
1351 (**self).sync_all()
1352 }
1353}
1354
1355struct StdFile(std::fs::File);
1356
1357impl StdFile {
1358 fn new(file: std::fs::File) -> Self {
1359 StdFile(file)
1360 }
1361}
1362
1363impl File for StdFile {
1364 fn len(&self) -> io::Result<usize> {
1365 Ok(self.0.metadata()?.len() as usize)
1366 }
1367
1368 fn sync_all(&mut self) -> io::Result<()> {
1369 self.0.sync_all()
1370 }
1371}
1372
1373impl Write for StdFile {
1374 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1375 self.0.write(buf)
1376 }
1377
1378 fn flush(&mut self) -> io::Result<()> {
1379 self.0.flush()
1380 }
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385 use super::*;
1386
1387 use std::{
1388 cmp,
1389 collections::{HashMap, HashSet},
1390 mem,
1391 sync::Mutex,
1392 time::Duration,
1393 };
1394
1395 #[derive(Clone)]
1396 struct InMemoryFilesystem {
1397 incoming: Arc<Mutex<HashMap<String, InMemoryFile>>>,
1398 outgoing: Arc<Mutex<HashMap<String, InMemoryFile>>>,
1399 committed: Arc<Mutex<HashMap<String, InMemoryFile>>>,
1400 }
1401
1402 impl InMemoryFilesystem {
1403 fn new() -> Self {
1404 InMemoryFilesystem {
1405 incoming: Arc::new(Mutex::new(HashMap::new())),
1406 outgoing: Arc::new(Mutex::new(HashMap::new())),
1407 committed: Arc::new(Mutex::new(HashMap::new())),
1408 }
1409 }
1410
1411 fn get(&self, path: impl AsRef<str>) -> InMemoryFile {
1412 self.committed
1413 .lock()
1414 .unwrap()
1415 .get(path.as_ref())
1416 .unwrap()
1417 .clone()
1418 }
1419
1420 fn iter(&self) -> impl Iterator<Item = (String, InMemoryFile)> {
1421 self.committed
1422 .lock()
1423 .unwrap()
1424 .iter()
1425 .map(|(path, file)| (path.to_owned(), file.clone()))
1426 .collect::<Vec<_>>()
1427 .into_iter()
1428 }
1429 }
1430
1431 #[derive(Clone)]
1432 struct InMemoryFile {
1433 incoming: Arc<Mutex<Vec<u8>>>,
1434 committed: Arc<Mutex<Vec<u8>>>,
1435 }
1436
1437 impl InMemoryFile {
1438 fn new() -> Self {
1439 InMemoryFile {
1440 incoming: Arc::new(Mutex::new(Vec::new())),
1441 committed: Arc::new(Mutex::new(Vec::new())),
1442 }
1443 }
1444
1445 fn contents(&self) -> Vec<u8> {
1446 self.committed.lock().unwrap().clone()
1447 }
1448 }
1449
1450 fn pathstr(path: &Path) -> String {
1451 path.to_str().unwrap().replace('\\', "/")
1452 }
1453
1454 impl Filesystem for InMemoryFilesystem {
1455 fn create_dir_all(&self, _: &Path) -> io::Result<()> {
1456 Ok(())
1457 }
1458
1459 fn sync_parent(&self, path: &Path) -> io::Result<()> {
1460 let parent = pathstr(path.parent().unwrap());
1461
1462 let mut incoming = self.incoming.lock().unwrap();
1463 let mut outgoing = self.outgoing.lock().unwrap();
1464 let mut committed = self.committed.lock().unwrap();
1465
1466 let mut retain_incoming = HashSet::new();
1468 for (path, file) in incoming.iter() {
1469 if path.starts_with(&*parent) {
1470 assert!(
1471 committed.insert(path.to_owned(), file.clone()).is_none(),
1472 "duplicate file {path}"
1473 );
1474 } else {
1475 assert!(retain_incoming.insert(path.to_owned()));
1476 }
1477 }
1478
1479 incoming.retain(|path, _| retain_incoming.contains(&*path));
1481 outgoing.retain(|path, _| !path.starts_with(&*parent));
1482
1483 Ok(())
1484 }
1485
1486 fn read_dir_files(&self, path: &Path) -> io::Result<Box<dyn Iterator<Item = PathBuf>>> {
1487 let parent = pathstr(path);
1488
1489 let iter = self
1490 .committed
1491 .lock()
1492 .unwrap()
1493 .iter()
1494 .map(|(path, _)| path)
1495 .filter(|path| path.starts_with(&*parent))
1496 .map(|path| PathBuf::from(path))
1497 .collect::<Vec<_>>()
1498 .into_iter();
1499
1500 Ok(Box::new(iter))
1501 }
1502
1503 fn remove_file(&self, path: &Path) -> io::Result<()> {
1504 let path = pathstr(path);
1505
1506 let mut outgoing = self.outgoing.lock().unwrap();
1507 let mut committed = self.committed.lock().unwrap();
1508
1509 let file = committed.remove(&*path).unwrap();
1510
1511 assert!(
1512 outgoing.insert(path.clone(), file).is_none(),
1513 "already deleted file {path}"
1514 );
1515
1516 Ok(())
1517 }
1518
1519 fn open_new(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1520 let path = pathstr(path);
1521
1522 let file = InMemoryFile::new();
1523
1524 let mut incoming = self.incoming.lock().unwrap();
1525 let committed = self.committed.lock().unwrap();
1526
1527 assert!(
1528 !committed.contains_key(&*path),
1529 "file {path} already exists"
1530 );
1531 assert!(
1532 incoming.insert(path.clone(), file.clone()).is_none(),
1533 "file {path} already exists"
1534 );
1535
1536 Ok(Box::new(file))
1537 }
1538
1539 fn open_existing(&self, path: &Path) -> io::Result<Box<dyn File + Send + Sync>> {
1540 let path = pathstr(path);
1541
1542 let committed = self.committed.lock().unwrap();
1543
1544 Ok(Box::new(committed.get(&*path).unwrap().clone()))
1545 }
1546 }
1547
1548 impl File for InMemoryFile {
1549 fn len(&self) -> io::Result<usize> {
1550 Ok(self.committed.lock().unwrap().len())
1551 }
1552
1553 fn sync_all(&mut self) -> io::Result<()> {
1554 let incoming = mem::take(&mut *self.incoming.lock().unwrap());
1555 let mut committed = self.committed.lock().unwrap();
1556
1557 committed.extend(incoming);
1558
1559 Ok(())
1560 }
1561 }
1562
1563 impl Write for InMemoryFile {
1564 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1565 self.incoming.lock().unwrap().extend_from_slice(buf);
1566
1567 Ok(buf.len())
1568 }
1569
1570 fn flush(&mut self) -> io::Result<()> {
1571 Ok(())
1572 }
1573 }
1574
1575 #[derive(Clone)]
1576 struct TestClock(Arc<Mutex<emit::Timestamp>>);
1577
1578 impl TestClock {
1579 fn new() -> Self {
1580 TestClock(Arc::new(Mutex::new(emit::Timestamp::MIN)))
1581 }
1582
1583 fn advance(&self, by: Duration) {
1584 *self.0.lock().unwrap() += by;
1585 }
1586 }
1587
1588 impl emit::Clock for TestClock {
1589 fn now(&self) -> Option<emit::Timestamp> {
1590 Some(*self.0.lock().unwrap())
1591 }
1592 }
1593
1594 #[derive(Clone)]
1595 struct TestRng(Arc<Mutex<u128>>);
1596
1597 impl TestRng {
1598 fn new() -> Self {
1599 TestRng(Arc::new(Mutex::new(0)))
1600 }
1601
1602 fn increment(&self) {
1603 *self.0.lock().unwrap() += 1;
1604 }
1605 }
1606
1607 impl emit::Rng for TestRng {
1608 fn fill<A: AsMut<[u8]>>(&self, mut arr: A) -> Option<A> {
1609 let fill = self.0.lock().unwrap().to_le_bytes();
1610
1611 let mut buf = arr.as_mut();
1612
1613 while buf.len() > 0 {
1614 let copy = cmp::min(fill.len(), buf.len());
1615
1616 buf.copy_from_slice(&fill[..copy]);
1617
1618 buf = &mut buf[copy..];
1619 }
1620
1621 Some(arr)
1622 }
1623 }
1624
1625 #[test]
1626 fn worker_basic() {
1627 let fs = InMemoryFilesystem::new();
1628 let clock = TestClock::new();
1629 let rng = TestRng::new();
1630 let metrics = Arc::new(InternalMetrics::default());
1631
1632 let mut worker = Worker::new(
1633 metrics.clone(),
1634 fs.clone(),
1635 clock.clone(),
1636 rng.clone(),
1637 "logs".to_string(),
1638 "test".to_string(),
1639 "log".to_string(),
1640 RollBy::Minute,
1641 false,
1642 10,
1643 1024,
1644 b"\n",
1645 );
1646
1647 let mut batch = EventBatch::new();
1648 batch.push(*b"1\n");
1649 let Ok(()) = worker.on_batch(batch) else {
1650 panic!("failed to write batch");
1651 };
1652
1653 let mut batch = EventBatch::new();
1654 batch.push(*b"2\n");
1655 batch.push(*b"3\n");
1656 let Ok(()) = worker.on_batch(batch) else {
1657 panic!("failed to write batch");
1658 };
1659
1660 assert_eq!(1, fs.iter().count());
1661
1662 clock.advance(Duration::from_secs(120));
1664
1665 let mut batch = EventBatch::new();
1666 batch.push(*b"1\n");
1667 let Ok(()) = worker.on_batch(batch) else {
1668 panic!("failed to write batch");
1669 };
1670
1671 assert_eq!(2, fs.iter().count());
1672
1673 assert_eq!(
1674 *b"1\n2\n3\n",
1675 *fs.get("logs/test.1970-01-01-00-00.00000000.00000000.log")
1676 .contents()
1677 );
1678 assert_eq!(
1679 *b"1\n",
1680 *fs.get("logs/test.1970-01-01-00-02.00000000.00000000.log")
1681 .contents()
1682 );
1683 }
1684
1685 #[test]
1686 fn worker_no_reuse() {
1687 let fs = InMemoryFilesystem::new();
1688 let clock = TestClock::new();
1689 let rng = TestRng::new();
1690 let metrics = Arc::new(InternalMetrics::default());
1691
1692 let mut worker = Worker::new(
1693 metrics.clone(),
1694 fs.clone(),
1695 clock.clone(),
1696 rng.clone(),
1697 "logs".to_string(),
1698 "test".to_string(),
1699 "log".to_string(),
1700 RollBy::Minute,
1701 false,
1702 10,
1703 1024,
1704 b"\n",
1705 );
1706
1707 let mut batch = EventBatch::new();
1708 batch.push(*b"1\n");
1709 let Ok(()) = worker.on_batch(batch) else {
1710 panic!("failed to write batch");
1711 };
1712
1713 drop(worker);
1714
1715 rng.increment();
1716
1717 let mut worker = Worker::new(
1720 metrics.clone(),
1721 fs.clone(),
1722 clock.clone(),
1723 rng.clone(),
1724 "logs".to_string(),
1725 "test".to_string(),
1726 "log".to_string(),
1727 RollBy::Minute,
1728 false,
1729 10,
1730 1024,
1731 b"\n",
1732 );
1733
1734 let mut batch = EventBatch::new();
1735 batch.push(*b"2\n");
1736 let Ok(()) = worker.on_batch(batch) else {
1737 panic!("failed to write batch");
1738 };
1739
1740 assert_eq!(2, fs.iter().count());
1741
1742 assert_eq!(
1743 *b"1\n",
1744 *fs.get("logs/test.1970-01-01-00-00.00000000.00000000.log")
1745 .contents()
1746 );
1747 assert_eq!(
1748 *b"2\n",
1749 *fs.get("logs/test.1970-01-01-00-00.00000000.00000001.log")
1750 .contents()
1751 );
1752 }
1753
1754 #[test]
1755 fn worker_reuse() {
1756 let fs = InMemoryFilesystem::new();
1757 let clock = TestClock::new();
1758 let rng = TestRng::new();
1759 let metrics = Arc::new(InternalMetrics::default());
1760
1761 let mut worker = Worker::new(
1762 metrics.clone(),
1763 fs.clone(),
1764 clock.clone(),
1765 rng.clone(),
1766 "logs".to_string(),
1767 "test".to_string(),
1768 "log".to_string(),
1769 RollBy::Minute,
1770 true,
1771 10,
1772 1024,
1773 b"\n",
1774 );
1775
1776 let mut batch = EventBatch::new();
1777 batch.push(*b"1\n");
1778 let Ok(()) = worker.on_batch(batch) else {
1779 panic!("failed to write batch");
1780 };
1781
1782 drop(worker);
1783
1784 let mut worker = Worker::new(
1787 metrics.clone(),
1788 fs.clone(),
1789 clock.clone(),
1790 rng.clone(),
1791 "logs".to_string(),
1792 "test".to_string(),
1793 "log".to_string(),
1794 RollBy::Minute,
1795 true,
1796 10,
1797 1024,
1798 b"\n",
1799 );
1800
1801 let mut batch = EventBatch::new();
1802 batch.push(*b"2\n");
1803 let Ok(()) = worker.on_batch(batch) else {
1804 panic!("failed to write batch");
1805 };
1806
1807 assert_eq!(1, fs.iter().count());
1808
1809 assert_eq!(
1812 *b"1\n\n2\n",
1813 *fs.get("logs/test.1970-01-01-00-00.00000000.00000000.log")
1814 .contents()
1815 );
1816 }
1817
1818 #[test]
1819 fn file_closes_bg_thread_on_drop() {
1820 let mut files = set_with_writer(
1821 "./target/logs/file_closes_bg_thread_on_drop/logs.txt",
1822 |_, _| Ok(()),
1823 b"\0",
1824 )
1825 .spawn();
1826
1827 let handle = {
1828 let inner = files.inner.take().unwrap();
1829
1830 inner._handle
1831 };
1832
1833 drop(files);
1834
1835 handle.join().unwrap();
1837 }
1838}