crispy_stream_checker/
checkpoint.rs1use std::io::Write;
44use std::path::{Path, PathBuf};
45use std::sync::Mutex;
46use std::time::Instant;
47
48use tracing::error;
49
50pub struct CheckpointWriter {
56 inner: Mutex<CheckpointInner>,
57}
58
59struct CheckpointInner {
60 log_file: PathBuf,
61 buffer: Vec<String>,
62 flush_threshold: usize,
63 flush_interval: std::time::Duration,
64 last_flush: Instant,
65}
66
67impl CheckpointWriter {
68 pub fn new(
74 log_file: impl AsRef<Path>,
75 flush_threshold: usize,
76 flush_interval: std::time::Duration,
77 ) -> Self {
78 Self {
79 inner: Mutex::new(CheckpointInner {
80 log_file: log_file.as_ref().to_path_buf(),
81 buffer: Vec::new(),
82 flush_threshold,
83 flush_interval,
84 last_flush: Instant::now(),
85 }),
86 }
87 }
88
89 pub fn with_defaults(log_file: impl AsRef<Path>) -> Self {
91 Self::new(log_file, 128, std::time::Duration::from_millis(250))
92 }
93
94 pub fn write(&self, entry: impl Into<String>) {
96 let mut inner = self.inner.lock().expect("checkpoint lock poisoned");
97 inner.buffer.push(entry.into());
98
99 let now = Instant::now();
100 if inner.buffer.len() >= inner.flush_threshold
101 || now.duration_since(inner.last_flush) >= inner.flush_interval
102 {
103 flush_locked(&mut inner);
104 }
105 }
106
107 pub fn flush(&self) {
109 let mut inner = self.inner.lock().expect("checkpoint lock poisoned");
110 flush_locked(&mut inner);
111 }
112
113 pub fn close(self) {
115 self.flush();
116 }
117
118 #[cfg(test)]
120 fn buffered_count(&self) -> usize {
121 let inner = self.inner.lock().expect("checkpoint lock poisoned");
122 inner.buffer.len()
123 }
124}
125
126fn flush_locked(inner: &mut CheckpointInner) {
128 if inner.buffer.is_empty() {
129 return;
130 }
131
132 match std::fs::OpenOptions::new()
133 .create(true)
134 .append(true)
135 .open(&inner.log_file)
136 {
137 Ok(mut f) => {
138 for entry in &inner.buffer {
139 if let Err(e) = writeln!(f, "{entry}") {
140 error!(
141 file = inner.log_file.display().to_string(),
142 error = %e,
143 "failed to write checkpoint entry"
144 );
145 break;
146 }
147 }
148 }
149 Err(e) => {
150 error!(
151 file = inner.log_file.display().to_string(),
152 error = %e,
153 "failed to open checkpoint log"
154 );
155 }
156 }
157
158 inner.buffer.clear();
159 inner.last_flush = Instant::now();
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn buffers_entries_below_threshold() {
168 let dir = tempfile::tempdir().unwrap();
169 let path = dir.path().join("checkpoint.log");
170
171 let writer = CheckpointWriter::new(&path, 1000, std::time::Duration::from_secs(3600));
173
174 writer.write("entry1");
175 writer.write("entry2");
176
177 assert_eq!(writer.buffered_count(), 2);
178 assert!(!path.exists() || std::fs::read_to_string(&path).unwrap().is_empty());
179 }
180
181 #[test]
182 fn flushes_on_threshold() {
183 let dir = tempfile::tempdir().unwrap();
184 let path = dir.path().join("checkpoint.log");
185
186 let writer = CheckpointWriter::new(&path, 2, std::time::Duration::from_secs(3600));
187
188 writer.write("entry1");
189 writer.write("entry2"); assert_eq!(writer.buffered_count(), 0);
192 let content = std::fs::read_to_string(&path).unwrap();
193 assert!(content.contains("entry1"));
194 assert!(content.contains("entry2"));
195 }
196
197 #[test]
198 fn manual_flush_writes_all() {
199 let dir = tempfile::tempdir().unwrap();
200 let path = dir.path().join("checkpoint.log");
201
202 let writer = CheckpointWriter::new(&path, 1000, std::time::Duration::from_secs(3600));
203
204 writer.write("a");
205 writer.write("b");
206 writer.write("c");
207 writer.flush();
208
209 let content = std::fs::read_to_string(&path).unwrap();
210 let lines: Vec<&str> = content.lines().collect();
211 assert_eq!(lines, vec!["a", "b", "c"]);
212 }
213
214 #[test]
215 fn close_flushes() {
216 let dir = tempfile::tempdir().unwrap();
217 let path = dir.path().join("checkpoint.log");
218
219 let writer = CheckpointWriter::new(&path, 1000, std::time::Duration::from_secs(3600));
220 writer.write("final");
221 writer.close();
222
223 let content = std::fs::read_to_string(&path).unwrap();
224 assert!(content.contains("final"));
225 }
226}