Skip to main content

crispy_stream_checker/
checkpoint.rs

1//! Resume checkpoint writer with buffered writes.
2//!
3//! Translated from IPTVChecker-Python `CheckpointWriter` class:
4//!
5//! ```python
6//! class CheckpointWriter:
7//!     def __init__(self, log_file, flush_interval=0.25, flush_threshold=128):
8//!         self._log_file = log_file
9//!         self._flush_interval = flush_interval
10//!         self._flush_threshold = flush_threshold
11//!         self._buffer = []
12//!         self._lock = threading.Lock()
13//!         self._last_flush = time.monotonic()
14//!
15//!     def write(self, entry):
16//!         with self._lock:
17//!             self._buffer.append(entry)
18//!             now = time.monotonic()
19//!             if len(self._buffer) >= self._flush_threshold or \
20//!                (now - self._last_flush) >= self._flush_interval:
21//!                 self._flush_locked()
22//!
23//!     def _flush_locked(self):
24//!         if not self._buffer:
25//!             return
26//!         try:
27//!             with open(self._log_file, 'a', encoding='utf-8', errors='replace') as f:
28//!                 for entry in self._buffer:
29//!                     f.write(entry + "\n")
30//!         except OSError as exc:
31//!             logging.error(...)
32//!         self._buffer.clear()
33//!         self._last_flush = time.monotonic()
34//!
35//!     def flush(self):
36//!         with self._lock:
37//!             self._flush_locked()
38//!
39//!     def close(self):
40//!         self.flush()
41//! ```
42
43use std::io::Write;
44use std::path::{Path, PathBuf};
45use std::sync::Mutex;
46use std::time::Instant;
47
48use tracing::error;
49
50/// Buffered checkpoint writer for resume logging.
51///
52/// Entries are buffered in memory and flushed to disk when either:
53/// - The buffer reaches `flush_threshold` entries, or
54/// - `flush_interval` has elapsed since the last flush.
55pub struct CheckpointWriter {
56    inner: Mutex<CheckpointInner>,
57}
58
59struct CheckpointInner {
60    log_file: PathBuf,
61    buffer: Vec<String>,
62    flush_threshold: usize,
63    flush_interval: std::time::Duration,
64    last_flush: Instant,
65}
66
67impl CheckpointWriter {
68    /// Create a new checkpoint writer.
69    ///
70    /// - `log_file`: Path to the checkpoint log file (appended to).
71    /// - `flush_threshold`: Maximum entries before auto-flush (default: 128).
72    /// - `flush_interval`: Maximum time between flushes (default: 250ms).
73    pub fn new(
74        log_file: impl AsRef<Path>,
75        flush_threshold: usize,
76        flush_interval: std::time::Duration,
77    ) -> Self {
78        Self {
79            inner: Mutex::new(CheckpointInner {
80                log_file: log_file.as_ref().to_path_buf(),
81                buffer: Vec::new(),
82                flush_threshold,
83                flush_interval,
84                last_flush: Instant::now(),
85            }),
86        }
87    }
88
89    /// Create with default settings (threshold=128, interval=250ms).
90    pub fn with_defaults(log_file: impl AsRef<Path>) -> Self {
91        Self::new(log_file, 128, std::time::Duration::from_millis(250))
92    }
93
94    /// Buffer a checkpoint entry, flushing if thresholds are reached.
95    pub fn write(&self, entry: impl Into<String>) {
96        let mut inner = self.inner.lock().expect("checkpoint lock poisoned");
97        inner.buffer.push(entry.into());
98
99        let now = Instant::now();
100        if inner.buffer.len() >= inner.flush_threshold
101            || now.duration_since(inner.last_flush) >= inner.flush_interval
102        {
103            flush_locked(&mut inner);
104        }
105    }
106
107    /// Force-flush all buffered entries to disk.
108    pub fn flush(&self) {
109        let mut inner = self.inner.lock().expect("checkpoint lock poisoned");
110        flush_locked(&mut inner);
111    }
112
113    /// Flush and close (consumes the writer).
114    pub fn close(self) {
115        self.flush();
116    }
117
118    /// Return the number of currently buffered (unflushed) entries.
119    #[cfg(test)]
120    fn buffered_count(&self) -> usize {
121        let inner = self.inner.lock().expect("checkpoint lock poisoned");
122        inner.buffer.len()
123    }
124}
125
126/// Flush the internal buffer to disk (called with lock held).
127fn flush_locked(inner: &mut CheckpointInner) {
128    if inner.buffer.is_empty() {
129        return;
130    }
131
132    match std::fs::OpenOptions::new()
133        .create(true)
134        .append(true)
135        .open(&inner.log_file)
136    {
137        Ok(mut f) => {
138            for entry in &inner.buffer {
139                if let Err(e) = writeln!(f, "{entry}") {
140                    error!(
141                        file = inner.log_file.display().to_string(),
142                        error = %e,
143                        "failed to write checkpoint entry"
144                    );
145                    break;
146                }
147            }
148        }
149        Err(e) => {
150            error!(
151                file = inner.log_file.display().to_string(),
152                error = %e,
153                "failed to open checkpoint log"
154            );
155        }
156    }
157
158    inner.buffer.clear();
159    inner.last_flush = Instant::now();
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn buffers_entries_below_threshold() {
168        let dir = tempfile::tempdir().unwrap();
169        let path = dir.path().join("checkpoint.log");
170
171        // High threshold so entries stay buffered.
172        let writer = CheckpointWriter::new(&path, 1000, std::time::Duration::from_secs(3600));
173
174        writer.write("entry1");
175        writer.write("entry2");
176
177        assert_eq!(writer.buffered_count(), 2);
178        assert!(!path.exists() || std::fs::read_to_string(&path).unwrap().is_empty());
179    }
180
181    #[test]
182    fn flushes_on_threshold() {
183        let dir = tempfile::tempdir().unwrap();
184        let path = dir.path().join("checkpoint.log");
185
186        let writer = CheckpointWriter::new(&path, 2, std::time::Duration::from_secs(3600));
187
188        writer.write("entry1");
189        writer.write("entry2"); // triggers flush
190
191        assert_eq!(writer.buffered_count(), 0);
192        let content = std::fs::read_to_string(&path).unwrap();
193        assert!(content.contains("entry1"));
194        assert!(content.contains("entry2"));
195    }
196
197    #[test]
198    fn manual_flush_writes_all() {
199        let dir = tempfile::tempdir().unwrap();
200        let path = dir.path().join("checkpoint.log");
201
202        let writer = CheckpointWriter::new(&path, 1000, std::time::Duration::from_secs(3600));
203
204        writer.write("a");
205        writer.write("b");
206        writer.write("c");
207        writer.flush();
208
209        let content = std::fs::read_to_string(&path).unwrap();
210        let lines: Vec<&str> = content.lines().collect();
211        assert_eq!(lines, vec!["a", "b", "c"]);
212    }
213
214    #[test]
215    fn close_flushes() {
216        let dir = tempfile::tempdir().unwrap();
217        let path = dir.path().join("checkpoint.log");
218
219        let writer = CheckpointWriter::new(&path, 1000, std::time::Duration::from_secs(3600));
220        writer.write("final");
221        writer.close();
222
223        let content = std::fs::read_to_string(&path).unwrap();
224        assert!(content.contains("final"));
225    }
226}