Skip to main content

kaya_wal/
lib.rs

1mod batch;
2mod codec;
3mod inspect;
4mod recovery;
5mod writer;
6
7pub use codec::{
8    decode_record, encode_record, DecodeRecordResult, WalPayload, WalRecord, WalRecordType,
9    WalWarning, WAL_HEADER_LEN, WAL_MAGIC, WAL_VERSION,
10};
11pub use inspect::{inspect_wal_path, WalInspection, WalInspectionRow};
12pub use recovery::{recover_wal, RecoveredRecord, WalRecoveryReport};
13pub use writer::{AppendResult, SegmentId, WalWriter};
14
15#[cfg(test)]
16mod tests {
17    use std::sync::Arc;
18
19    use kaya_core::{DurabilityMode, WalBatchConfig, WalConfig};
20    use kaya_io::SimDisk;
21
22    use super::*;
23
24    fn block_on<F: std::future::Future>(f: F) -> F::Output {
25        tokio::runtime::Builder::new_current_thread()
26            .build()
27            .unwrap()
28            .block_on(f)
29    }
30
31    fn block_on_multi<F: std::future::Future>(f: F) -> F::Output {
32        tokio::runtime::Builder::new_multi_thread()
33            .enable_all()
34            .build()
35            .unwrap()
36            .block_on(f)
37    }
38
39    fn batch_config(records: usize) -> WalConfig {
40        WalConfig {
41            batch: WalBatchConfig {
42                batch_max_records: records,
43                ..WalBatchConfig::default()
44            },
45            ..WalConfig::default()
46        }
47    }
48
49    async fn strict_put(writer: &WalWriter<SimDisk>, key: u8) {
50        writer
51            .append(
52                WalPayload::Put {
53                    key: vec![key],
54                    value: vec![key, key],
55                },
56                DurabilityMode::Strict,
57            )
58            .await
59            .unwrap();
60    }
61
62    // KD-0206 — durable-prefix invariant:
63    // Every record appended with Strict durability before a crash must appear
64    // in the recovery report after crash+restart.
65    #[test]
66    fn wal_durable_prefix_survives_crash() {
67        block_on(async {
68            let disk = Arc::new(SimDisk::new());
69            let config = WalConfig::default();
70            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
71
72            let mut strict_lsns = Vec::new();
73            for i in 0_u8..5 {
74                let result = writer
75                    .append(
76                        WalPayload::Put {
77                            key: vec![i],
78                            value: vec![i, i],
79                        },
80                        DurabilityMode::Strict,
81                    )
82                    .await
83                    .unwrap();
84                strict_lsns.push(result.lsn);
85            }
86
87            disk.crash();
88
89            let report = recover_wal(config, disk).await.unwrap();
90            let recovered_lsns: Vec<_> = report.records.iter().map(|r| r.record.lsn).collect();
91            for lsn in &strict_lsns {
92                assert!(
93                    recovered_lsns.contains(lsn),
94                    "strictly written LSN {lsn} missing after crash"
95                );
96            }
97            assert_eq!(
98                report.records.len(),
99                strict_lsns.len(),
100                "expected exactly the strict records"
101            );
102        });
103    }
104
105    // Relaxed (non-fsynced) records are not in stable storage, so they must be
106    // absent after a crash that resets volatile to stable.
107    #[test]
108    fn wal_relaxed_writes_lost_after_crash() {
109        block_on(async {
110            let disk = Arc::new(SimDisk::new());
111            let config = WalConfig::default();
112            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
113
114            for i in 0_u8..3 {
115                writer
116                    .append(
117                        WalPayload::Put {
118                            key: vec![i],
119                            value: vec![i],
120                        },
121                        DurabilityMode::Relaxed,
122                    )
123                    .await
124                    .unwrap();
125            }
126
127            // Crash before any fsync.
128            disk.crash();
129
130            let report = recover_wal(config, disk).await.unwrap();
131            assert_eq!(
132                report.records.len(),
133                0,
134                "relaxed-only records should be lost after crash"
135            );
136        });
137    }
138
139    // Recovery must be idempotent: running it twice on the same disk produces
140    // the same set of records.
141    #[test]
142    fn wal_recovery_is_idempotent() {
143        block_on(async {
144            let disk = Arc::new(SimDisk::new());
145            let config = WalConfig::default();
146            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
147
148            for i in 0_u8..3 {
149                writer
150                    .append(
151                        WalPayload::Put {
152                            key: vec![i],
153                            value: vec![i],
154                        },
155                        DurabilityMode::Strict,
156                    )
157                    .await
158                    .unwrap();
159            }
160
161            disk.crash();
162
163            let report1 = recover_wal(config.clone(), disk.clone()).await.unwrap();
164            let report2 = recover_wal(config, disk).await.unwrap();
165
166            assert_eq!(
167                report1.records.len(),
168                report2.records.len(),
169                "recovery must be idempotent"
170            );
171            for (r1, r2) in report1.records.iter().zip(report2.records.iter()) {
172                assert_eq!(r1.record.lsn, r2.record.lsn);
173                assert_eq!(r1.record.payload, r2.record.payload);
174            }
175        });
176    }
177
178    // Strictly ACKed records survive crash; trailing unsynced data does not.
179    // The recovery path must truncate the partial tail and return only the
180    // durable prefix.
181    #[test]
182    fn wal_partial_tail_truncated_after_crash() {
183        block_on(async {
184            let disk = Arc::new(SimDisk::new());
185            let config = WalConfig::default();
186            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
187
188            // One strict record → fsynced to stable.
189            writer
190                .append(
191                    WalPayload::Put {
192                        key: b"k1".to_vec(),
193                        value: b"v1".to_vec(),
194                    },
195                    DurabilityMode::Strict,
196                )
197                .await
198                .unwrap();
199
200            // One relaxed record → stays in volatile only.
201            writer
202                .append(
203                    WalPayload::Put {
204                        key: b"k2".to_vec(),
205                        value: b"v2".to_vec(),
206                    },
207                    DurabilityMode::Relaxed,
208                )
209                .await
210                .unwrap();
211
212            disk.crash();
213
214            let report = recover_wal(config, disk).await.unwrap();
215            assert_eq!(report.records.len(), 1, "only the strict record survives");
216            assert_eq!(
217                report.records[0].record.payload,
218                WalPayload::Put {
219                    key: b"k1".to_vec(),
220                    value: b"v1".to_vec()
221                }
222            );
223        });
224    }
225
226    // Multi-segment WAL: writing across a segment boundary must be transparent
227    // to recovery.
228    #[test]
229    fn wal_multi_segment_recovery() {
230        block_on(async {
231            let disk = Arc::new(SimDisk::new());
232            // Tiny segment size to force rotation after the first record.
233            let config = WalConfig {
234                segment_max_bytes: 64,
235                ..WalConfig::default()
236            };
237            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
238
239            let mut lsns = Vec::new();
240            for i in 0_u8..4 {
241                let r = writer
242                    .append(
243                        WalPayload::Put {
244                            key: vec![i],
245                            value: vec![i],
246                        },
247                        DurabilityMode::Strict,
248                    )
249                    .await
250                    .unwrap();
251                lsns.push(r.lsn);
252            }
253
254            disk.crash();
255
256            let report = recover_wal(config, disk).await.unwrap();
257            assert_eq!(report.records.len(), lsns.len());
258            for (expected, recovered) in lsns.iter().zip(report.records.iter()) {
259                assert_eq!(*expected, recovered.record.lsn);
260            }
261        });
262    }
263
264    // Batch disabled (default): each strict append performs its own fsync.
265    #[test]
266    fn wal_batch_disabled_one_fsync_per_strict_append() {
267        block_on(async {
268            let disk = Arc::new(SimDisk::new());
269            let config = WalConfig::default();
270            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
271
272            for key in 0_u8..4 {
273                strict_put(&writer, key).await;
274            }
275
276            assert_eq!(disk.fsync_file_count(Some("wal")), 4);
277        });
278    }
279
280    // Batch enabled: N strict appends share one group fsync.
281    #[test]
282    fn wal_batch_enabled_group_commit_single_fsync() {
283        block_on_multi(async {
284            let disk = Arc::new(SimDisk::new());
285            let config = batch_config(4);
286            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
287
288            let mut handles = Vec::new();
289            for key in 0_u8..4 {
290                let writer = writer.clone();
291                handles.push(tokio::spawn(async move {
292                    strict_put(&writer, key).await;
293                }));
294            }
295            for handle in handles {
296                handle.await.unwrap();
297            }
298
299            assert_eq!(disk.fsync_file_count(Some("wal")), 1);
300        });
301    }
302
303    // Crash while a partial batch is still in volatile storage: only the durable prefix survives.
304    #[test]
305    fn wal_batch_crash_mid_batch_keeps_durable_prefix_only() {
306        block_on_multi(async {
307            let disk = Arc::new(SimDisk::new());
308            let config = batch_config(4);
309            let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
310
311            // Complete one full batch (4 records, 1 fsync).
312            let mut flushed = Vec::new();
313            for key in 0_u8..4 {
314                let writer = writer.clone();
315                flushed.push(tokio::spawn(async move {
316                    strict_put(&writer, key).await;
317                }));
318            }
319            for handle in flushed {
320                handle.await.unwrap();
321            }
322
323            // Start a second batch but crash before it is group-committed.
324            let mut pending = Vec::new();
325            for key in 10_u8..13 {
326                let writer = writer.clone();
327                pending.push(tokio::spawn(async move {
328                    strict_put(&writer, key).await;
329                }));
330            }
331            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
332            disk.crash();
333            for handle in pending {
334                handle.abort();
335            }
336
337            let report = recover_wal(config, disk).await.unwrap();
338            assert_eq!(
339                report.records.len(),
340                4,
341                "only the first batch should survive"
342            );
343            let mut recovered_keys: Vec<u8> = report
344                .records
345                .iter()
346                .filter_map(|r| match &r.record.payload {
347                    WalPayload::Put { key, .. } => key.first().copied(),
348                    _ => None,
349                })
350                .collect();
351            recovered_keys.sort();
352            assert_eq!(recovered_keys, vec![0, 1, 2, 3]);
353        });
354    }
355
356    // KD-0503: malformed WAL record input must not panic.
357    #[test]
358    fn fuzz_wal_decoder_no_panic() {
359        let cases: &[&[u8]] = &[
360            b"",
361            &[0u8; 1],
362            &[0u8; 39],
363            &[0u8; 40],
364            &[0xffu8; 40],
365            &[0u8; 100],
366            &[0xffu8; 100],
367            b"\x4b\x41\x59\x41\x01\x00garbage_after_magic",
368            b"\x00\xff\x80\x7f\xde\xad\xbe\xef\xca\xfe\xba\xbe",
369        ];
370        for input in cases {
371            let _ = decode_record(input, 0, u32::MAX); // must not panic
372        }
373    }
374}