1#![allow(clippy::disallowed_types, clippy::disallowed_methods)]
11
12use std::{
13 io::{self, Write},
14 path::PathBuf,
15 sync::{
16 Arc,
17 atomic::{AtomicBool, AtomicU64, Ordering},
18 mpsc::{self, SyncSender, sync_channel},
19 },
20 thread::{self, JoinHandle},
21 time::{Duration, SystemTime, UNIX_EPOCH},
22};
23
24use obs_types::Severity;
25use parking_lot::Mutex;
26
27pub trait MakeWriter: Send + Sync + 'static {
30 type Writer: Write + Send + 'static;
33
34 fn make_writer(&self) -> Self::Writer;
36
37 fn make_writer_for(&self, _sev: Severity) -> Self::Writer {
40 self.make_writer()
41 }
42}
43
44#[derive(Debug, Default, Clone, Copy)]
46pub struct StdoutWriter;
47
48impl MakeWriter for StdoutWriter {
49 type Writer = io::Stdout;
50 fn make_writer(&self) -> io::Stdout {
51 io::stdout()
52 }
53}
54
55#[derive(Debug, Default, Clone, Copy)]
57pub struct StderrWriter;
58
59impl MakeWriter for StderrWriter {
60 type Writer = io::Stderr;
61 fn make_writer(&self) -> io::Stderr {
62 io::stderr()
63 }
64}
65
66#[derive(Debug, Clone)]
69pub struct LevelSplitWriter<L, H> {
70 low: L,
71 high: H,
72 threshold: Severity,
73}
74
75impl<L: MakeWriter, H: MakeWriter> LevelSplitWriter<L, H> {
76 #[must_use]
78 pub fn new(low: L, high: H) -> Self {
79 Self {
80 low,
81 high,
82 threshold: Severity::Warn,
83 }
84 }
85
86 #[must_use]
88 pub fn threshold(mut self, threshold: Severity) -> Self {
89 self.threshold = threshold;
90 self
91 }
92}
93
94pub struct ErasedWriter(Box<dyn Write + Send + 'static>);
98
99impl ErasedWriter {
100 #[must_use]
102 pub fn new<W: Write + Send + 'static>(w: W) -> Self {
103 Self(Box::new(w))
104 }
105}
106
107impl std::fmt::Debug for ErasedWriter {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("ErasedWriter").finish_non_exhaustive()
110 }
111}
112
113impl Write for ErasedWriter {
114 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
115 self.0.write(b)
116 }
117 fn flush(&mut self) -> io::Result<()> {
118 self.0.flush()
119 }
120}
121
122impl<L: MakeWriter, H: MakeWriter> MakeWriter for LevelSplitWriter<L, H> {
123 type Writer = ErasedWriter;
124
125 fn make_writer(&self) -> ErasedWriter {
126 ErasedWriter::new(self.low.make_writer())
127 }
128
129 fn make_writer_for(&self, sev: Severity) -> ErasedWriter {
130 if sev >= self.threshold {
131 ErasedWriter::new(self.high.make_writer_for(sev))
132 } else {
133 ErasedWriter::new(self.low.make_writer_for(sev))
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct TeeWriter<A, B> {
141 a: A,
142 b: B,
143}
144
145impl<A: MakeWriter, B: MakeWriter> TeeWriter<A, B> {
146 pub fn new(a: A, b: B) -> Self {
148 Self { a, b }
149 }
150}
151
152pub struct TeeWriterImpl<WA: Write, WB: Write> {
156 a: WA,
157 b: WB,
158}
159
160impl<WA: Write, WB: Write> Write for TeeWriterImpl<WA, WB> {
161 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
162 self.a.write_all(buf)?;
163 self.b.write_all(buf)?;
164 Ok(buf.len())
165 }
166 fn flush(&mut self) -> io::Result<()> {
167 self.a.flush()?;
168 self.b.flush()
169 }
170}
171
172impl<WA: Write, WB: Write> std::fmt::Debug for TeeWriterImpl<WA, WB> {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("TeeWriterImpl").finish_non_exhaustive()
175 }
176}
177
178impl<A: MakeWriter, B: MakeWriter> MakeWriter for TeeWriter<A, B> {
179 type Writer = TeeWriterImpl<A::Writer, B::Writer>;
180
181 fn make_writer(&self) -> Self::Writer {
182 TeeWriterImpl {
183 a: self.a.make_writer(),
184 b: self.b.make_writer(),
185 }
186 }
187}
188
189#[derive(Debug, Clone, Copy)]
193#[non_exhaustive]
194pub enum RollingPolicy {
195 Never,
197 SizeBased {
199 max_bytes: u64,
201 },
202 Daily,
204 Hourly,
206 SizeOrAge {
208 max_bytes: u64,
210 max_age: Duration,
212 },
213}
214
215#[derive(Debug, Clone)]
217pub struct RollingFileWriterBuilder {
218 directory: Option<PathBuf>,
219 prefix: Option<String>,
220 suffix: String,
221 policy: RollingPolicy,
222 keep: Option<usize>,
223}
224
225impl Default for RollingFileWriterBuilder {
226 fn default() -> Self {
227 Self {
228 directory: None,
229 prefix: None,
230 suffix: ".ndjson".to_string(),
231 policy: RollingPolicy::Daily,
232 keep: None,
233 }
234 }
235}
236
237impl RollingFileWriterBuilder {
238 #[must_use]
240 pub fn directory(mut self, dir: impl Into<PathBuf>) -> Self {
241 self.directory = Some(dir.into());
242 self
243 }
244
245 #[must_use]
247 pub fn filename_prefix(mut self, p: impl Into<String>) -> Self {
248 self.prefix = Some(p.into());
249 self
250 }
251
252 #[must_use]
254 pub fn filename_suffix(mut self, s: impl Into<String>) -> Self {
255 self.suffix = s.into();
256 self
257 }
258
259 #[must_use]
261 pub fn policy(mut self, p: RollingPolicy) -> Self {
262 self.policy = p;
263 self
264 }
265
266 #[must_use]
269 pub fn keep(mut self, n: usize) -> Self {
270 self.keep = Some(n);
271 self
272 }
273
274 pub fn build(self) -> io::Result<RollingFileWriter> {
281 let dir = self
282 .directory
283 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "directory is required"))?;
284 let prefix = self.prefix.unwrap_or_else(|| "obs".to_string());
285 std::fs::create_dir_all(&dir)?;
286 let inner = RollingInner {
287 directory: dir,
288 prefix,
289 suffix: self.suffix,
290 policy: self.policy,
291 keep: self.keep,
292 current: Mutex::new(None),
293 };
294 Ok(RollingFileWriter {
295 inner: Arc::new(inner),
296 })
297 }
298}
299
300#[derive(Clone)]
302pub struct RollingFileWriter {
303 inner: Arc<RollingInner>,
304}
305
306impl std::fmt::Debug for RollingFileWriter {
307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308 f.debug_struct("RollingFileWriter")
309 .field("directory", &self.inner.directory)
310 .field("prefix", &self.inner.prefix)
311 .field("policy", &self.inner.policy)
312 .finish()
313 }
314}
315
316struct RollingInner {
317 directory: PathBuf,
318 prefix: String,
319 suffix: String,
320 policy: RollingPolicy,
321 keep: Option<usize>,
322 current: Mutex<Option<RollingState>>,
323}
324
325struct RollingState {
326 file: std::fs::File,
327 bytes: u64,
328 opened_at: SystemTime,
329}
330
331impl RollingFileWriter {
332 #[must_use]
334 pub fn builder() -> RollingFileWriterBuilder {
335 RollingFileWriterBuilder::default()
336 }
337}
338
339impl MakeWriter for RollingFileWriter {
340 type Writer = RollingFileHandle;
341 fn make_writer(&self) -> RollingFileHandle {
342 RollingFileHandle {
343 inner: Arc::clone(&self.inner),
344 }
345 }
346}
347
348pub struct RollingFileHandle {
351 inner: Arc<RollingInner>,
352}
353
354impl std::fmt::Debug for RollingFileHandle {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 f.debug_struct("RollingFileHandle").finish_non_exhaustive()
357 }
358}
359
360impl Write for RollingFileHandle {
361 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
362 self.inner.with_state(|state| state.file.write_all(buf))?;
363 self.inner.note_bytes(buf.len() as u64);
364 Ok(buf.len())
365 }
366 fn flush(&mut self) -> io::Result<()> {
367 self.inner.with_state(|state| {
372 state.file.flush()?;
373 state.file.sync_data()
374 })
375 }
376}
377
378impl RollingInner {
379 fn with_state<F, R>(&self, f: F) -> io::Result<R>
380 where
381 F: FnOnce(&mut RollingState) -> io::Result<R>,
382 {
383 let mut guard = self.current.lock();
384 if guard.is_none() {
385 *guard = Some(self.open_new()?);
386 }
387 let needs_rotate = match guard.as_ref() {
388 Some(state) => self.should_rotate(state),
389 None => false,
390 };
391 if needs_rotate {
392 *guard = Some(self.open_new()?);
393 self.maybe_evict_old();
394 }
395 let state = guard
396 .as_mut()
397 .ok_or_else(|| io::Error::other("rolling state missing after open"))?;
398 f(state)
399 }
400
401 fn note_bytes(&self, n: u64) {
402 if let Some(state) = self.current.lock().as_mut() {
403 state.bytes += n;
404 }
405 }
406
407 fn should_rotate(&self, state: &RollingState) -> bool {
408 match self.policy {
409 RollingPolicy::Never => false,
410 RollingPolicy::SizeBased { max_bytes } => state.bytes >= max_bytes,
411 RollingPolicy::Daily => {
412 let opened = state
413 .opened_at
414 .duration_since(UNIX_EPOCH)
415 .map(|d| d.as_secs() / 86_400)
416 .ok();
417 opened != now_unix_secs().map(|s| s / 86_400)
418 }
419 RollingPolicy::Hourly => {
420 let opened = state
421 .opened_at
422 .duration_since(UNIX_EPOCH)
423 .map(|d| d.as_secs() / 3600)
424 .ok();
425 opened != now_unix_secs().map(|s| s / 3600)
426 }
427 RollingPolicy::SizeOrAge { max_bytes, max_age } => {
428 if state.bytes >= max_bytes {
429 return true;
430 }
431 state.opened_at.elapsed().unwrap_or_default() >= max_age
432 }
433 }
434 }
435
436 fn open_new(&self) -> io::Result<RollingState> {
437 let stamp = match self.policy {
444 RollingPolicy::Daily => format_date_stamp(now_unix_secs().unwrap_or(0)),
445 RollingPolicy::Hourly => format_hour_stamp(now_unix_secs().unwrap_or(0)),
446 RollingPolicy::SizeBased { .. } => {
447 let counter = ROLL_COUNTER.fetch_add(1, Ordering::Relaxed);
448 format!("{counter:06}")
449 }
450 _ => {
451 let now = now_unix_secs().unwrap_or(0);
452 let counter = ROLL_COUNTER.fetch_add(1, Ordering::Relaxed);
453 format!("{now}-{counter}")
454 }
455 };
456 let filename = format!("{}.{stamp}{}", self.prefix, self.suffix);
457 let path = self.directory.join(&filename);
458 let file = std::fs::OpenOptions::new()
459 .create(true)
460 .append(true)
461 .open(&path)?;
462 if let Ok(dir) = std::fs::File::open(&self.directory) {
466 let _ = dir.sync_all();
467 }
468 Ok(RollingState {
469 file,
470 bytes: 0,
471 opened_at: SystemTime::now(),
472 })
473 }
474
475 #[allow(dead_code)]
476 fn _ensure_helpers_used(&self) {
477 let _ = format_date_stamp;
480 let _ = format_hour_stamp;
481 }
482
483 fn maybe_evict_old(&self) {
484 let Some(keep) = self.keep else { return };
485 let Ok(read_dir) = std::fs::read_dir(&self.directory) else {
486 return;
487 };
488 let mut entries: Vec<_> = read_dir
489 .filter_map(Result::ok)
490 .filter(|e| {
491 e.file_name()
492 .to_str()
493 .is_some_and(|n| n.starts_with(&self.prefix) && n.ends_with(&self.suffix))
494 })
495 .collect();
496 entries.sort_by_key(|e| {
497 e.metadata()
498 .and_then(|m| m.modified())
499 .unwrap_or(SystemTime::UNIX_EPOCH)
500 });
501 if entries.len() > keep {
502 let extras = entries.len() - keep;
503 for entry in entries.into_iter().take(extras) {
504 let _ = std::fs::remove_file(entry.path());
505 }
506 }
507 }
508}
509
510static ROLL_COUNTER: AtomicU64 = AtomicU64::new(0);
511
512fn now_unix_secs() -> Option<u64> {
513 SystemTime::now()
514 .duration_since(UNIX_EPOCH)
515 .map(|d| d.as_secs())
516 .ok()
517}
518
519fn format_date_stamp(secs: u64) -> String {
522 let (y, m, d) = ymd_from_secs(secs);
523 format!("{y:04}-{m:02}-{d:02}")
524}
525
526fn format_hour_stamp(secs: u64) -> String {
529 let (y, m, d) = ymd_from_secs(secs);
530 let h = (secs / 3600) % 24;
531 format!("{y:04}-{m:02}-{d:02}.{h:02}")
532}
533
534fn ymd_from_secs(secs: u64) -> (u32, u32, u32) {
538 let z = (secs / 86_400) as i64 + 719_468; let era = z.div_euclid(146_097);
540 let doe = z - era * 146_097; let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; let y_internal = yoe + era * 400;
543 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = (doy - (153 * mp + 2) / 5 + 1) as u32; let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32; let y = if m <= 2 { y_internal + 1 } else { y_internal } as u32;
548 (y, m, d)
549}
550
551const NON_BLOCKING_DEFAULT_CAPACITY: usize = 8192;
554
555#[derive(Debug, Clone)]
559pub struct NonBlockingWriter {
560 sender: SyncSender<Vec<u8>>,
561 dropped: Arc<AtomicU64>,
562}
563
564pub struct WorkerGuard {
567 shutdown: Arc<AtomicBool>,
568 join: Option<JoinHandle<()>>,
569}
570
571impl std::fmt::Debug for WorkerGuard {
572 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573 f.debug_struct("WorkerGuard").finish_non_exhaustive()
574 }
575}
576
577impl Drop for WorkerGuard {
578 fn drop(&mut self) {
579 self.shutdown.store(true, Ordering::SeqCst);
580 if let Some(j) = self.join.take() {
581 let _ = j.join();
582 }
583 }
584}
585
586impl NonBlockingWriter {
587 pub fn new<M>(inner: M, capacity: usize) -> (Self, WorkerGuard)
591 where
592 M: MakeWriter,
593 {
594 let cap = if capacity == 0 {
595 NON_BLOCKING_DEFAULT_CAPACITY
596 } else {
597 capacity
598 };
599 let (tx, rx) = sync_channel::<Vec<u8>>(cap);
600 let dropped = Arc::new(AtomicU64::new(0));
601 let shutdown = Arc::new(AtomicBool::new(false));
602 let shutdown_in_thread = Arc::clone(&shutdown);
603 let inner = Arc::new(inner);
604 let join = thread::spawn(move || run_loop(inner, rx, shutdown_in_thread));
605 (
606 Self {
607 sender: tx,
608 dropped,
609 },
610 WorkerGuard {
611 shutdown,
612 join: Some(join),
613 },
614 )
615 }
616
617 #[must_use]
619 pub fn dropped_total(&self) -> u64 {
620 self.dropped.load(Ordering::Relaxed)
621 }
622}
623
624fn run_loop<M: MakeWriter>(inner: Arc<M>, rx: mpsc::Receiver<Vec<u8>>, shutdown: Arc<AtomicBool>) {
625 while let Ok(buf) = rx.recv_timeout(Duration::from_millis(200)) {
626 let mut w = inner.make_writer();
627 let _ = w.write_all(&buf);
628 let _ = w.flush();
629 if shutdown.load(Ordering::Relaxed) {
630 break;
631 }
632 }
633 while let Ok(buf) = rx.try_recv() {
635 let mut w = inner.make_writer();
636 let _ = w.write_all(&buf);
637 let _ = w.flush();
638 }
639}
640
641impl MakeWriter for NonBlockingWriter {
642 type Writer = NonBlockingHandle;
643 fn make_writer(&self) -> NonBlockingHandle {
644 NonBlockingHandle {
645 sender: self.sender.clone(),
646 dropped: Arc::clone(&self.dropped),
647 buf: Vec::with_capacity(256),
648 }
649 }
650}
651
652pub struct NonBlockingHandle {
655 sender: SyncSender<Vec<u8>>,
656 dropped: Arc<AtomicU64>,
657 buf: Vec<u8>,
658}
659
660impl std::fmt::Debug for NonBlockingHandle {
661 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
662 f.debug_struct("NonBlockingHandle")
663 .field("buffered", &self.buf.len())
664 .finish()
665 }
666}
667
668impl Write for NonBlockingHandle {
669 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
670 self.buf.extend_from_slice(b);
671 Ok(b.len())
672 }
673
674 fn flush(&mut self) -> io::Result<()> {
675 if self.buf.is_empty() {
676 return Ok(());
677 }
678 let buf = std::mem::take(&mut self.buf);
679 match self.sender.try_send(buf) {
680 Ok(()) => Ok(()),
681 Err(mpsc::TrySendError::Full(_) | mpsc::TrySendError::Disconnected(_)) => {
682 self.dropped.fetch_add(1, Ordering::Relaxed);
683 Ok(())
684 }
685 }
686 }
687}
688
689impl Drop for NonBlockingHandle {
690 fn drop(&mut self) {
691 let _ = self.flush();
692 }
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698
699 #[test]
700 fn test_should_rotate_size_based() {
701 let dir = tempfile::tempdir().unwrap();
702 let writer = RollingFileWriter::builder()
703 .directory(dir.path())
704 .filename_prefix("test")
705 .policy(RollingPolicy::SizeBased { max_bytes: 16 })
706 .build()
707 .unwrap();
708 for _ in 0..5 {
709 let mut h = writer.make_writer();
710 h.write_all(b"hello world!").unwrap();
711 h.flush().unwrap();
712 }
713 let entries: Vec<_> = std::fs::read_dir(dir.path())
714 .unwrap()
715 .filter_map(Result::ok)
716 .collect();
717 assert!(
718 entries.len() >= 2,
719 "expected size-based rotation to produce >1 file"
720 );
721 }
722
723 #[test]
724 fn test_non_blocking_writer_should_flush_on_drop() {
725 let captured = Arc::new(parking_lot::Mutex::new(Vec::<u8>::new()));
726 struct FakeWriter(Arc<parking_lot::Mutex<Vec<u8>>>);
727 impl MakeWriter for FakeWriter {
728 type Writer = FakeHandle;
729 fn make_writer(&self) -> FakeHandle {
730 FakeHandle(Arc::clone(&self.0))
731 }
732 }
733 struct FakeHandle(Arc<parking_lot::Mutex<Vec<u8>>>);
734 impl Write for FakeHandle {
735 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
736 self.0.lock().extend_from_slice(b);
737 Ok(b.len())
738 }
739 fn flush(&mut self) -> io::Result<()> {
740 Ok(())
741 }
742 }
743
744 let (writer, _guard) = NonBlockingWriter::new(FakeWriter(Arc::clone(&captured)), 16);
745 {
746 let mut h = writer.make_writer();
747 h.write_all(b"hello\n").unwrap();
748 h.flush().unwrap();
749 }
750 std::thread::sleep(Duration::from_millis(50));
752 assert!(captured.lock().starts_with(b"hello\n"));
753 }
754}