1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6use crate::wal::event::WalEvent;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10#[must_use]
11pub struct WalAppendTelemetrySnapshot {
12 pub append_success_total: u64,
14 pub append_failure_total: u64,
16}
17
18#[derive(Debug, Clone)]
24pub struct WalAppendTelemetry {
25 inner: Arc<WalAppendTelemetryInner>,
26}
27
28#[derive(Debug, Default)]
29struct WalAppendTelemetryInner {
30 append_success_total: AtomicU64,
31 append_failure_total: AtomicU64,
32}
33
34impl Default for WalAppendTelemetry {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl WalAppendTelemetry {
41 pub fn new() -> Self {
43 Self { inner: Arc::new(WalAppendTelemetryInner::default()) }
44 }
45
46 pub fn snapshot(&self) -> WalAppendTelemetrySnapshot {
48 WalAppendTelemetrySnapshot {
49 append_success_total: self.inner.append_success_total.load(Ordering::Relaxed),
50 append_failure_total: self.inner.append_failure_total.load(Ordering::Relaxed),
51 }
52 }
53
54 fn record_append_success(&self) {
55 Self::saturating_increment(&self.inner.append_success_total);
56 }
57
58 fn record_append_failure(&self) {
59 Self::saturating_increment(&self.inner.append_failure_total);
60 }
61
62 fn saturating_increment(counter: &AtomicU64) {
63 let mut current = counter.load(Ordering::Relaxed);
64 loop {
65 if current == u64::MAX {
66 return;
67 }
68
69 let next = current.saturating_add(1);
70 match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
71 {
72 Ok(_) => return,
73 Err(observed) => current = observed,
74 }
75 }
76 }
77}
78
79#[derive(Debug)]
85pub struct InstrumentedWalWriter<W: WalWriter> {
86 inner: W,
87 telemetry: WalAppendTelemetry,
88}
89
90impl<W: WalWriter> InstrumentedWalWriter<W> {
91 pub fn new(inner: W, telemetry: WalAppendTelemetry) -> Self {
93 Self { inner, telemetry }
94 }
95
96 pub fn telemetry(&self) -> &WalAppendTelemetry {
98 &self.telemetry
99 }
100
101 pub fn inner(&self) -> &W {
103 &self.inner
104 }
105}
106
107impl<W: WalWriter> WalWriter for InstrumentedWalWriter<W> {
108 fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError> {
109 match self.inner.append(event) {
110 Ok(()) => {
111 self.telemetry.record_append_success();
112 Ok(())
113 }
114 Err(error) => {
115 self.telemetry.record_append_failure();
116 Err(error)
117 }
118 }
119 }
120
121 fn flush(&mut self) -> Result<(), WalWriterError> {
122 self.inner.flush()
123 }
124
125 fn close(self) -> Result<(), WalWriterError> {
126 self.inner.close()
127 }
128}
129
130pub trait WalWriter {
132 fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError>;
134
135 fn flush(&mut self) -> Result<(), WalWriterError>;
137
138 fn close(self) -> Result<(), WalWriterError>;
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
144pub enum WalWriterError {
145 Closed,
147 IoError(String),
149 EncodeError(String),
151 SequenceViolation {
153 expected: u64,
155 provided: u64,
157 },
158 Poisoned,
161}
162
163impl std::fmt::Display for WalWriterError {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 match self {
166 WalWriterError::Closed => write!(f, "WAL writer is closed"),
167 WalWriterError::IoError(e) => write!(f, "I/O error: {e}"),
168 WalWriterError::EncodeError(e) => write!(f, "Encode error: {e}"),
169 WalWriterError::SequenceViolation { expected, provided } => {
170 write!(f, "Sequence violation: expected {expected}, got {provided}")
171 }
172 WalWriterError::Poisoned => {
173 write!(f, "WAL writer is permanently poisoned after truncation failure")
174 }
175 }
176 }
177}
178
179impl std::error::Error for WalWriterError {}
180
181#[cfg(test)]
182mod tests {
183 use std::sync::Arc;
184
185 use super::{InstrumentedWalWriter, WalAppendTelemetry, WalWriter, WalWriterError};
186 use crate::wal::event::WalEventType;
187
188 #[derive(Debug)]
189 struct SuccessWriter;
190
191 impl WalWriter for SuccessWriter {
192 fn append(&mut self, _event: &crate::wal::event::WalEvent) -> Result<(), WalWriterError> {
193 Ok(())
194 }
195
196 fn flush(&mut self) -> Result<(), WalWriterError> {
197 Ok(())
198 }
199
200 fn close(self) -> Result<(), WalWriterError> {
201 Ok(())
202 }
203 }
204
205 #[derive(Debug)]
206 struct FailureWriter {
207 error: WalWriterError,
208 }
209
210 impl WalWriter for FailureWriter {
211 fn append(&mut self, _event: &crate::wal::event::WalEvent) -> Result<(), WalWriterError> {
212 Err(self.error.clone())
213 }
214
215 fn flush(&mut self) -> Result<(), WalWriterError> {
216 Ok(())
217 }
218
219 fn close(self) -> Result<(), WalWriterError> {
220 Ok(())
221 }
222 }
223
224 #[derive(Debug)]
225 struct FlushErrorWriter {
226 flush_error: WalWriterError,
227 }
228
229 impl WalWriter for FlushErrorWriter {
230 fn append(&mut self, _event: &crate::wal::event::WalEvent) -> Result<(), WalWriterError> {
231 Ok(())
232 }
233
234 fn flush(&mut self) -> Result<(), WalWriterError> {
235 Err(self.flush_error.clone())
236 }
237
238 fn close(self) -> Result<(), WalWriterError> {
239 Ok(())
240 }
241 }
242
243 fn sample_event(sequence: u64) -> crate::wal::event::WalEvent {
244 crate::wal::event::WalEvent::new(
245 sequence,
246 WalEventType::EnginePaused { timestamp: sequence },
247 )
248 }
249
250 #[test]
251 fn instrumented_writer_increments_success_on_append_success() {
252 let telemetry = WalAppendTelemetry::new();
253 let mut writer = InstrumentedWalWriter::new(SuccessWriter, telemetry.clone());
254
255 writer.append(&sample_event(1)).expect("append should succeed");
256
257 let snapshot = telemetry.snapshot();
258 assert_eq!(snapshot.append_success_total, 1);
259 assert_eq!(snapshot.append_failure_total, 0);
260 }
261
262 #[test]
263 fn instrumented_writer_increments_failure_and_preserves_error_identity() {
264 let telemetry = WalAppendTelemetry::new();
265 let expected_error = WalWriterError::IoError("append failed".to_string());
266 let mut writer = InstrumentedWalWriter::new(
267 FailureWriter { error: expected_error.clone() },
268 telemetry.clone(),
269 );
270
271 let observed_error =
272 writer.append(&sample_event(1)).expect_err("append should return the underlying error");
273
274 assert_eq!(observed_error, expected_error);
275 let snapshot = telemetry.snapshot();
276 assert_eq!(snapshot.append_success_total, 0);
277 assert_eq!(snapshot.append_failure_total, 1);
278 }
279
280 #[test]
281 fn flush_errors_do_not_alter_append_counters() {
282 let telemetry = WalAppendTelemetry::new();
283 let mut writer = InstrumentedWalWriter::new(
284 FlushErrorWriter { flush_error: WalWriterError::IoError("flush failed".to_string()) },
285 telemetry.clone(),
286 );
287
288 writer.append(&sample_event(1)).expect("append should succeed");
289 let _ = writer.flush();
290
291 let snapshot = telemetry.snapshot();
292 assert_eq!(snapshot.append_success_total, 1);
293 assert_eq!(snapshot.append_failure_total, 0);
294 }
295
296 #[test]
297 fn counter_updates_saturate_at_u64_max_without_panic() {
298 let telemetry = WalAppendTelemetry::new();
299 telemetry.inner.append_success_total.store(u64::MAX, std::sync::atomic::Ordering::Relaxed);
300 telemetry.inner.append_failure_total.store(u64::MAX, std::sync::atomic::Ordering::Relaxed);
301
302 let mut success_writer = InstrumentedWalWriter::new(SuccessWriter, telemetry.clone());
303 let mut failure_writer = InstrumentedWalWriter::new(
304 FailureWriter { error: WalWriterError::IoError("append failed".to_string()) },
305 telemetry.clone(),
306 );
307
308 success_writer.append(&sample_event(1)).expect("append should still return success");
309 let _ = failure_writer.append(&sample_event(2));
310
311 let snapshot = telemetry.snapshot();
312 assert_eq!(snapshot.append_success_total, u64::MAX);
313 assert_eq!(snapshot.append_failure_total, u64::MAX);
314 }
315
316 #[test]
317 fn concurrent_append_success_paths_preserve_monotonic_totals() {
318 let telemetry = Arc::new(WalAppendTelemetry::new());
319 let threads = 8u64;
320 let appends_per_thread = 2_000u64;
321
322 let mut handles = Vec::new();
323 for _ in 0..threads {
324 let telemetry = Arc::clone(&telemetry);
325 handles.push(std::thread::spawn(move || {
326 let mut writer =
327 InstrumentedWalWriter::new(SuccessWriter, telemetry.as_ref().clone());
328 for sequence in 1..=appends_per_thread {
329 writer
330 .append(&sample_event(sequence))
331 .expect("append should succeed on every iteration");
332 }
333 }));
334 }
335
336 for handle in handles {
337 handle.join().expect("concurrent append worker should not panic");
338 }
339
340 let snapshot = telemetry.snapshot();
341 assert_eq!(snapshot.append_success_total, threads * appends_per_thread);
342 assert_eq!(snapshot.append_failure_total, 0);
343 }
344}