Skip to main content

actionqueue_storage/wal/
writer.rs

1//! WAL writer interface.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6use crate::wal::event::WalEvent;
7
8/// Snapshot of authoritative WAL append telemetry totals.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10#[must_use]
11pub struct WalAppendTelemetrySnapshot {
12    /// Total successful WAL append operations.
13    pub append_success_total: u64,
14    /// Total failed WAL append operations.
15    pub append_failure_total: u64,
16}
17
18/// Shared authoritative telemetry for WAL append outcomes.
19///
20/// This type stores monotonic append-success and append-failure totals and is
21/// safe to share across threads. Counter updates use saturating semantics at
22/// `u64::MAX` and never wrap or panic.
23#[derive(Debug, Clone)]
24pub struct WalAppendTelemetry {
25    inner: Arc<WalAppendTelemetryInner>,
26}
27
28#[derive(Debug, Default)]
29struct WalAppendTelemetryInner {
30    append_success_total: AtomicU64,
31    append_failure_total: AtomicU64,
32}
33
34impl Default for WalAppendTelemetry {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl WalAppendTelemetry {
41    /// Creates a new zero-initialized WAL append telemetry handle.
42    pub fn new() -> Self {
43        Self { inner: Arc::new(WalAppendTelemetryInner::default()) }
44    }
45
46    /// Returns a point-in-time snapshot of append outcome totals.
47    pub fn snapshot(&self) -> WalAppendTelemetrySnapshot {
48        WalAppendTelemetrySnapshot {
49            append_success_total: self.inner.append_success_total.load(Ordering::Relaxed),
50            append_failure_total: self.inner.append_failure_total.load(Ordering::Relaxed),
51        }
52    }
53
54    fn record_append_success(&self) {
55        Self::saturating_increment(&self.inner.append_success_total);
56    }
57
58    fn record_append_failure(&self) {
59        Self::saturating_increment(&self.inner.append_failure_total);
60    }
61
62    fn saturating_increment(counter: &AtomicU64) {
63        let mut current = counter.load(Ordering::Relaxed);
64        loop {
65            if current == u64::MAX {
66                return;
67            }
68
69            let next = current.saturating_add(1);
70            match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
71            {
72                Ok(_) => return,
73                Err(observed) => current = observed,
74            }
75        }
76    }
77}
78
79/// WAL writer wrapper that records authoritative append outcomes.
80///
81/// This wrapper preserves the underlying writer behavior and error propagation.
82/// It only records append success/failure totals after each `append` result is
83/// known. `flush` and `close` are transparent pass-through operations.
84#[derive(Debug)]
85pub struct InstrumentedWalWriter<W: WalWriter> {
86    inner: W,
87    telemetry: WalAppendTelemetry,
88}
89
90impl<W: WalWriter> InstrumentedWalWriter<W> {
91    /// Wraps an inner writer with authoritative append telemetry.
92    pub fn new(inner: W, telemetry: WalAppendTelemetry) -> Self {
93        Self { inner, telemetry }
94    }
95
96    /// Returns a shared telemetry handle.
97    pub fn telemetry(&self) -> &WalAppendTelemetry {
98        &self.telemetry
99    }
100
101    /// Returns a reference to the inner writer.
102    pub fn inner(&self) -> &W {
103        &self.inner
104    }
105}
106
107impl<W: WalWriter> WalWriter for InstrumentedWalWriter<W> {
108    fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError> {
109        match self.inner.append(event) {
110            Ok(()) => {
111                self.telemetry.record_append_success();
112                Ok(())
113            }
114            Err(error) => {
115                self.telemetry.record_append_failure();
116                Err(error)
117            }
118        }
119    }
120
121    fn flush(&mut self) -> Result<(), WalWriterError> {
122        self.inner.flush()
123    }
124
125    fn close(self) -> Result<(), WalWriterError> {
126        self.inner.close()
127    }
128}
129
130/// A writer that can append events to the WAL.
131pub trait WalWriter {
132    /// Append an event to the WAL.
133    fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError>;
134
135    /// Flush pending writes to durable storage.
136    fn flush(&mut self) -> Result<(), WalWriterError>;
137
138    /// Close the writer, releasing any resources.
139    fn close(self) -> Result<(), WalWriterError>;
140}
141
142/// Errors that can occur during WAL writing.
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub enum WalWriterError {
145    /// The writer was closed.
146    Closed,
147    /// I/O error during write.
148    IoError(String),
149    /// The event could not be encoded.
150    EncodeError(String),
151    /// Sequence number violation (non-increasing or duplicate).
152    SequenceViolation {
153        /// The expected next sequence number.
154        expected: u64,
155        /// The sequence number that was provided.
156        provided: u64,
157    },
158    /// Writer is permanently poisoned after a truncation-recovery failure.
159    /// Callers must restart the process.
160    Poisoned,
161}
162
163impl std::fmt::Display for WalWriterError {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        match self {
166            WalWriterError::Closed => write!(f, "WAL writer is closed"),
167            WalWriterError::IoError(e) => write!(f, "I/O error: {e}"),
168            WalWriterError::EncodeError(e) => write!(f, "Encode error: {e}"),
169            WalWriterError::SequenceViolation { expected, provided } => {
170                write!(f, "Sequence violation: expected {expected}, got {provided}")
171            }
172            WalWriterError::Poisoned => {
173                write!(f, "WAL writer is permanently poisoned after truncation failure")
174            }
175        }
176    }
177}
178
179impl std::error::Error for WalWriterError {}
180
181#[cfg(test)]
182mod tests {
183    use std::sync::Arc;
184
185    use super::{InstrumentedWalWriter, WalAppendTelemetry, WalWriter, WalWriterError};
186    use crate::wal::event::WalEventType;
187
188    #[derive(Debug)]
189    struct SuccessWriter;
190
191    impl WalWriter for SuccessWriter {
192        fn append(&mut self, _event: &crate::wal::event::WalEvent) -> Result<(), WalWriterError> {
193            Ok(())
194        }
195
196        fn flush(&mut self) -> Result<(), WalWriterError> {
197            Ok(())
198        }
199
200        fn close(self) -> Result<(), WalWriterError> {
201            Ok(())
202        }
203    }
204
205    #[derive(Debug)]
206    struct FailureWriter {
207        error: WalWriterError,
208    }
209
210    impl WalWriter for FailureWriter {
211        fn append(&mut self, _event: &crate::wal::event::WalEvent) -> Result<(), WalWriterError> {
212            Err(self.error.clone())
213        }
214
215        fn flush(&mut self) -> Result<(), WalWriterError> {
216            Ok(())
217        }
218
219        fn close(self) -> Result<(), WalWriterError> {
220            Ok(())
221        }
222    }
223
224    #[derive(Debug)]
225    struct FlushErrorWriter {
226        flush_error: WalWriterError,
227    }
228
229    impl WalWriter for FlushErrorWriter {
230        fn append(&mut self, _event: &crate::wal::event::WalEvent) -> Result<(), WalWriterError> {
231            Ok(())
232        }
233
234        fn flush(&mut self) -> Result<(), WalWriterError> {
235            Err(self.flush_error.clone())
236        }
237
238        fn close(self) -> Result<(), WalWriterError> {
239            Ok(())
240        }
241    }
242
243    fn sample_event(sequence: u64) -> crate::wal::event::WalEvent {
244        crate::wal::event::WalEvent::new(
245            sequence,
246            WalEventType::EnginePaused { timestamp: sequence },
247        )
248    }
249
250    #[test]
251    fn instrumented_writer_increments_success_on_append_success() {
252        let telemetry = WalAppendTelemetry::new();
253        let mut writer = InstrumentedWalWriter::new(SuccessWriter, telemetry.clone());
254
255        writer.append(&sample_event(1)).expect("append should succeed");
256
257        let snapshot = telemetry.snapshot();
258        assert_eq!(snapshot.append_success_total, 1);
259        assert_eq!(snapshot.append_failure_total, 0);
260    }
261
262    #[test]
263    fn instrumented_writer_increments_failure_and_preserves_error_identity() {
264        let telemetry = WalAppendTelemetry::new();
265        let expected_error = WalWriterError::IoError("append failed".to_string());
266        let mut writer = InstrumentedWalWriter::new(
267            FailureWriter { error: expected_error.clone() },
268            telemetry.clone(),
269        );
270
271        let observed_error =
272            writer.append(&sample_event(1)).expect_err("append should return the underlying error");
273
274        assert_eq!(observed_error, expected_error);
275        let snapshot = telemetry.snapshot();
276        assert_eq!(snapshot.append_success_total, 0);
277        assert_eq!(snapshot.append_failure_total, 1);
278    }
279
280    #[test]
281    fn flush_errors_do_not_alter_append_counters() {
282        let telemetry = WalAppendTelemetry::new();
283        let mut writer = InstrumentedWalWriter::new(
284            FlushErrorWriter { flush_error: WalWriterError::IoError("flush failed".to_string()) },
285            telemetry.clone(),
286        );
287
288        writer.append(&sample_event(1)).expect("append should succeed");
289        let _ = writer.flush();
290
291        let snapshot = telemetry.snapshot();
292        assert_eq!(snapshot.append_success_total, 1);
293        assert_eq!(snapshot.append_failure_total, 0);
294    }
295
296    #[test]
297    fn counter_updates_saturate_at_u64_max_without_panic() {
298        let telemetry = WalAppendTelemetry::new();
299        telemetry.inner.append_success_total.store(u64::MAX, std::sync::atomic::Ordering::Relaxed);
300        telemetry.inner.append_failure_total.store(u64::MAX, std::sync::atomic::Ordering::Relaxed);
301
302        let mut success_writer = InstrumentedWalWriter::new(SuccessWriter, telemetry.clone());
303        let mut failure_writer = InstrumentedWalWriter::new(
304            FailureWriter { error: WalWriterError::IoError("append failed".to_string()) },
305            telemetry.clone(),
306        );
307
308        success_writer.append(&sample_event(1)).expect("append should still return success");
309        let _ = failure_writer.append(&sample_event(2));
310
311        let snapshot = telemetry.snapshot();
312        assert_eq!(snapshot.append_success_total, u64::MAX);
313        assert_eq!(snapshot.append_failure_total, u64::MAX);
314    }
315
316    #[test]
317    fn concurrent_append_success_paths_preserve_monotonic_totals() {
318        let telemetry = Arc::new(WalAppendTelemetry::new());
319        let threads = 8u64;
320        let appends_per_thread = 2_000u64;
321
322        let mut handles = Vec::new();
323        for _ in 0..threads {
324            let telemetry = Arc::clone(&telemetry);
325            handles.push(std::thread::spawn(move || {
326                let mut writer =
327                    InstrumentedWalWriter::new(SuccessWriter, telemetry.as_ref().clone());
328                for sequence in 1..=appends_per_thread {
329                    writer
330                        .append(&sample_event(sequence))
331                        .expect("append should succeed on every iteration");
332                }
333            }));
334        }
335
336        for handle in handles {
337            handle.join().expect("concurrent append worker should not panic");
338        }
339
340        let snapshot = telemetry.snapshot();
341        assert_eq!(snapshot.append_success_total, threads * appends_per_thread);
342        assert_eq!(snapshot.append_failure_total, 0);
343    }
344}