1mod batch;
2mod codec;
3mod inspect;
4mod recovery;
5mod writer;
6
7pub use codec::{
8 decode_record, encode_record, DecodeRecordResult, WalPayload, WalRecord, WalRecordType,
9 WalWarning, WAL_HEADER_LEN, WAL_MAGIC, WAL_VERSION,
10};
11pub use inspect::{inspect_wal_path, WalInspection, WalInspectionRow};
12pub use recovery::{recover_wal, RecoveredRecord, WalRecoveryReport};
13pub use writer::{AppendResult, SegmentId, WalWriter};
14
15#[cfg(test)]
16mod tests {
17 use std::sync::Arc;
18
19 use kaya_core::{DurabilityMode, WalBatchConfig, WalConfig};
20 use kaya_io::SimDisk;
21
22 use super::*;
23
24 fn block_on<F: std::future::Future>(f: F) -> F::Output {
25 tokio::runtime::Builder::new_current_thread()
26 .build()
27 .unwrap()
28 .block_on(f)
29 }
30
31 fn block_on_multi<F: std::future::Future>(f: F) -> F::Output {
32 tokio::runtime::Builder::new_multi_thread()
33 .enable_all()
34 .build()
35 .unwrap()
36 .block_on(f)
37 }
38
39 fn batch_config(records: usize) -> WalConfig {
40 WalConfig {
41 batch: WalBatchConfig {
42 batch_max_records: records,
43 ..WalBatchConfig::default()
44 },
45 ..WalConfig::default()
46 }
47 }
48
49 async fn strict_put(writer: &WalWriter<SimDisk>, key: u8) {
50 writer
51 .append(
52 WalPayload::Put {
53 key: vec![key],
54 value: vec![key, key],
55 },
56 DurabilityMode::Strict,
57 )
58 .await
59 .unwrap();
60 }
61
62 #[test]
66 fn wal_durable_prefix_survives_crash() {
67 block_on(async {
68 let disk = Arc::new(SimDisk::new());
69 let config = WalConfig::default();
70 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
71
72 let mut strict_lsns = Vec::new();
73 for i in 0_u8..5 {
74 let result = writer
75 .append(
76 WalPayload::Put {
77 key: vec![i],
78 value: vec![i, i],
79 },
80 DurabilityMode::Strict,
81 )
82 .await
83 .unwrap();
84 strict_lsns.push(result.lsn);
85 }
86
87 disk.crash();
88
89 let report = recover_wal(config, disk).await.unwrap();
90 let recovered_lsns: Vec<_> = report.records.iter().map(|r| r.record.lsn).collect();
91 for lsn in &strict_lsns {
92 assert!(
93 recovered_lsns.contains(lsn),
94 "strictly written LSN {lsn} missing after crash"
95 );
96 }
97 assert_eq!(
98 report.records.len(),
99 strict_lsns.len(),
100 "expected exactly the strict records"
101 );
102 });
103 }
104
105 #[test]
108 fn wal_relaxed_writes_lost_after_crash() {
109 block_on(async {
110 let disk = Arc::new(SimDisk::new());
111 let config = WalConfig::default();
112 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
113
114 for i in 0_u8..3 {
115 writer
116 .append(
117 WalPayload::Put {
118 key: vec![i],
119 value: vec![i],
120 },
121 DurabilityMode::Relaxed,
122 )
123 .await
124 .unwrap();
125 }
126
127 disk.crash();
129
130 let report = recover_wal(config, disk).await.unwrap();
131 assert_eq!(
132 report.records.len(),
133 0,
134 "relaxed-only records should be lost after crash"
135 );
136 });
137 }
138
139 #[test]
142 fn wal_recovery_is_idempotent() {
143 block_on(async {
144 let disk = Arc::new(SimDisk::new());
145 let config = WalConfig::default();
146 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
147
148 for i in 0_u8..3 {
149 writer
150 .append(
151 WalPayload::Put {
152 key: vec![i],
153 value: vec![i],
154 },
155 DurabilityMode::Strict,
156 )
157 .await
158 .unwrap();
159 }
160
161 disk.crash();
162
163 let report1 = recover_wal(config.clone(), disk.clone()).await.unwrap();
164 let report2 = recover_wal(config, disk).await.unwrap();
165
166 assert_eq!(
167 report1.records.len(),
168 report2.records.len(),
169 "recovery must be idempotent"
170 );
171 for (r1, r2) in report1.records.iter().zip(report2.records.iter()) {
172 assert_eq!(r1.record.lsn, r2.record.lsn);
173 assert_eq!(r1.record.payload, r2.record.payload);
174 }
175 });
176 }
177
178 #[test]
182 fn wal_partial_tail_truncated_after_crash() {
183 block_on(async {
184 let disk = Arc::new(SimDisk::new());
185 let config = WalConfig::default();
186 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
187
188 writer
190 .append(
191 WalPayload::Put {
192 key: b"k1".to_vec(),
193 value: b"v1".to_vec(),
194 },
195 DurabilityMode::Strict,
196 )
197 .await
198 .unwrap();
199
200 writer
202 .append(
203 WalPayload::Put {
204 key: b"k2".to_vec(),
205 value: b"v2".to_vec(),
206 },
207 DurabilityMode::Relaxed,
208 )
209 .await
210 .unwrap();
211
212 disk.crash();
213
214 let report = recover_wal(config, disk).await.unwrap();
215 assert_eq!(report.records.len(), 1, "only the strict record survives");
216 assert_eq!(
217 report.records[0].record.payload,
218 WalPayload::Put {
219 key: b"k1".to_vec(),
220 value: b"v1".to_vec()
221 }
222 );
223 });
224 }
225
226 #[test]
229 fn wal_multi_segment_recovery() {
230 block_on(async {
231 let disk = Arc::new(SimDisk::new());
232 let config = WalConfig {
234 segment_max_bytes: 64,
235 ..WalConfig::default()
236 };
237 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
238
239 let mut lsns = Vec::new();
240 for i in 0_u8..4 {
241 let r = writer
242 .append(
243 WalPayload::Put {
244 key: vec![i],
245 value: vec![i],
246 },
247 DurabilityMode::Strict,
248 )
249 .await
250 .unwrap();
251 lsns.push(r.lsn);
252 }
253
254 disk.crash();
255
256 let report = recover_wal(config, disk).await.unwrap();
257 assert_eq!(report.records.len(), lsns.len());
258 for (expected, recovered) in lsns.iter().zip(report.records.iter()) {
259 assert_eq!(*expected, recovered.record.lsn);
260 }
261 });
262 }
263
264 #[test]
266 fn wal_batch_disabled_one_fsync_per_strict_append() {
267 block_on(async {
268 let disk = Arc::new(SimDisk::new());
269 let config = WalConfig::default();
270 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
271
272 for key in 0_u8..4 {
273 strict_put(&writer, key).await;
274 }
275
276 assert_eq!(disk.fsync_file_count(Some("wal")), 4);
277 });
278 }
279
280 #[test]
282 fn wal_batch_enabled_group_commit_single_fsync() {
283 block_on_multi(async {
284 let disk = Arc::new(SimDisk::new());
285 let config = batch_config(4);
286 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
287
288 let mut handles = Vec::new();
289 for key in 0_u8..4 {
290 let writer = writer.clone();
291 handles.push(tokio::spawn(async move {
292 strict_put(&writer, key).await;
293 }));
294 }
295 for handle in handles {
296 handle.await.unwrap();
297 }
298
299 assert_eq!(disk.fsync_file_count(Some("wal")), 1);
300 });
301 }
302
303 #[test]
305 fn wal_batch_crash_mid_batch_keeps_durable_prefix_only() {
306 block_on_multi(async {
307 let disk = Arc::new(SimDisk::new());
308 let config = batch_config(4);
309 let writer = WalWriter::open(config.clone(), disk.clone()).await.unwrap();
310
311 let mut flushed = Vec::new();
313 for key in 0_u8..4 {
314 let writer = writer.clone();
315 flushed.push(tokio::spawn(async move {
316 strict_put(&writer, key).await;
317 }));
318 }
319 for handle in flushed {
320 handle.await.unwrap();
321 }
322
323 let mut pending = Vec::new();
325 for key in 10_u8..13 {
326 let writer = writer.clone();
327 pending.push(tokio::spawn(async move {
328 strict_put(&writer, key).await;
329 }));
330 }
331 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
332 disk.crash();
333 for handle in pending {
334 handle.abort();
335 }
336
337 let report = recover_wal(config, disk).await.unwrap();
338 assert_eq!(
339 report.records.len(),
340 4,
341 "only the first batch should survive"
342 );
343 let mut recovered_keys: Vec<u8> = report
344 .records
345 .iter()
346 .filter_map(|r| match &r.record.payload {
347 WalPayload::Put { key, .. } => key.first().copied(),
348 _ => None,
349 })
350 .collect();
351 recovered_keys.sort();
352 assert_eq!(recovered_keys, vec![0, 1, 2, 3]);
353 });
354 }
355
356 #[test]
358 fn fuzz_wal_decoder_no_panic() {
359 let cases: &[&[u8]] = &[
360 b"",
361 &[0u8; 1],
362 &[0u8; 39],
363 &[0u8; 40],
364 &[0xffu8; 40],
365 &[0u8; 100],
366 &[0xffu8; 100],
367 b"\x4b\x41\x59\x41\x01\x00garbage_after_magic",
368 b"\x00\xff\x80\x7f\xde\xad\xbe\xef\xca\xfe\xba\xbe",
369 ];
370 for input in cases {
371 let _ = decode_record(input, 0, u32::MAX); }
373 }
374}