kstone_core/
wal_ring.rs

1/// Ring buffer WAL for Phase 1.3+
2///
3/// Implements a circular write-ahead log using the WAL region from the file layout.
4/// Supports wrap-around, group commit, and compaction.
5
6use bytes::{BytesMut, BufMut};
7use parking_lot::Mutex;
8use std::collections::VecDeque;
9use std::fs::{File, OpenOptions};
10use std::io::{Read, Write, Seek, SeekFrom};
11use std::path::Path;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use crate::{Error, Result, Record, Lsn, layout::Region, types::checksum};
15
16/// WAL record header size: lsn(8) + len(4)
17const RECORD_HEADER_SIZE: usize = 12;
18
19/// WAL entry in the ring buffer
20/// Format: [lsn(8) | len(4) | data(bincode) | crc32c(4)]
21struct WalEntry {
22    lsn: Lsn,
23    record: Record,
24}
25
26/// Ring buffer WAL
27pub struct WalRing {
28    inner: Arc<Mutex<WalRingInner>>,
29}
30
31struct WalRingInner {
32    file: File,
33    region: Region,
34
35    // Ring buffer state
36    write_offset: u64,      // Current write position (relative to region start)
37    checkpoint_lsn: Lsn,    // Oldest LSN still needed
38    next_lsn: Lsn,          // Next LSN to assign
39
40    // Batching state
41    pending: VecDeque<WalEntry>,
42    last_flush: Instant,
43    batch_timeout: Duration,
44}
45
46impl WalRing {
47    /// Create a new ring buffer WAL
48    pub fn create(path: impl AsRef<Path>, region: Region) -> Result<Self> {
49        let mut file = OpenOptions::new()
50            .read(true)
51            .write(true)
52            .create(true)
53            .open(path)?;
54
55        // Initialize ring buffer with zeros
56        file.seek(SeekFrom::Start(region.offset))?;
57        let zeros = vec![0u8; region.size as usize];
58        file.write_all(&zeros)?;
59        file.sync_all()?;
60
61        Ok(Self {
62            inner: Arc::new(Mutex::new(WalRingInner {
63                file,
64                region,
65                write_offset: 0,
66                checkpoint_lsn: 0,
67                next_lsn: 1,
68                pending: VecDeque::new(),
69                last_flush: Instant::now(),
70                batch_timeout: Duration::from_millis(10), // 10ms default
71            })),
72        })
73    }
74
75    /// Open existing ring buffer WAL and recover
76    pub fn open(path: impl AsRef<Path>, region: Region) -> Result<Self> {
77        let mut file = OpenOptions::new()
78            .read(true)
79            .write(true)
80            .open(path)?;
81
82        // Recover all records from the ring
83        let records = Self::recover(&mut file, &region)?;
84
85        let max_lsn = if records.is_empty() {
86            0
87        } else {
88            records.iter().map(|(lsn, _)| *lsn).max().unwrap_or(0)
89        };
90
91        Ok(Self {
92            inner: Arc::new(Mutex::new(WalRingInner {
93                file,
94                region,
95                write_offset: 0, // Reset to beginning, will overwrite on next write
96                checkpoint_lsn: 0,
97                next_lsn: max_lsn + 1,
98                pending: VecDeque::new(),
99                last_flush: Instant::now(),
100                batch_timeout: Duration::from_millis(10),
101            })),
102        })
103    }
104
105    /// Append a record to the WAL (buffered)
106    pub fn append(&self, record: Record) -> Result<Lsn> {
107        let mut inner = self.inner.lock();
108
109        let lsn = inner.next_lsn;
110        inner.next_lsn += 1;
111
112        inner.pending.push_back(WalEntry { lsn, record });
113
114        // Auto-flush if batch timeout exceeded
115        if inner.last_flush.elapsed() >= inner.batch_timeout {
116            Self::flush_inner(&mut inner)?;
117        }
118
119        Ok(lsn)
120    }
121
122    /// Flush pending records to disk (group commit)
123    pub fn flush(&self) -> Result<()> {
124        let mut inner = self.inner.lock();
125        Self::flush_inner(&mut inner)
126    }
127
128    /// Internal flush implementation
129    fn flush_inner(inner: &mut WalRingInner) -> Result<()> {
130        if inner.pending.is_empty() {
131            return Ok(());
132        }
133
134        // Serialize all pending records
135        let mut buf = BytesMut::new();
136
137        for entry in &inner.pending {
138            let data = bincode::serialize(&entry.record)
139                .map_err(|e| Error::Internal(format!("Serialize error: {}", e)))?;
140
141            // Record: [lsn(8) | len(4) | data | crc32c(4)]
142            buf.put_u64_le(entry.lsn);
143            buf.put_u32_le(data.len() as u32);
144            buf.put_slice(&data);
145
146            let crc = checksum::compute(&data);
147            buf.put_u32_le(crc);
148        }
149
150        let total_size = buf.len() as u64;
151
152        // Check if we need to wrap around
153        if inner.write_offset + total_size > inner.region.size {
154            // Wrap to beginning
155            inner.write_offset = 0;
156        }
157
158        // Write to file
159        let file_offset = inner.region.offset + inner.write_offset;
160        inner.file.seek(SeekFrom::Start(file_offset))?;
161        inner.file.write_all(&buf)?;
162        inner.file.sync_all()?;
163
164        // Update state
165        inner.write_offset += total_size;
166        inner.pending.clear();
167        inner.last_flush = Instant::now();
168
169        Ok(())
170    }
171
172    /// Recover all records from the ring buffer
173    fn recover(file: &mut File, region: &Region) -> Result<Vec<(Lsn, Record)>> {
174        let mut records = Vec::new();
175
176        // Read entire ring buffer
177        file.seek(SeekFrom::Start(region.offset))?;
178        let mut ring_data = vec![0u8; region.size as usize];
179
180        // Read what's available (file might be smaller than ring size)
181        let bytes_read = file.read(&mut ring_data)?;
182        if bytes_read == 0 {
183            return Ok(records); // Empty file
184        }
185
186        let mut offset = 0usize;
187
188        // Scan for valid records
189        while offset + RECORD_HEADER_SIZE + 4 < ring_data.len() {
190            // Try to parse record header
191            let lsn = u64::from_le_bytes([
192                ring_data[offset],
193                ring_data[offset + 1],
194                ring_data[offset + 2],
195                ring_data[offset + 3],
196                ring_data[offset + 4],
197                ring_data[offset + 5],
198                ring_data[offset + 6],
199                ring_data[offset + 7],
200            ]);
201
202            // LSN of 0 indicates empty space (uninitialized or wrapped-over)
203            if lsn == 0 {
204                break;
205            }
206
207            let len = u32::from_le_bytes([
208                ring_data[offset + 8],
209                ring_data[offset + 9],
210                ring_data[offset + 10],
211                ring_data[offset + 11],
212            ]) as usize;
213
214            // Check if we have enough space for data + CRC
215            if offset + RECORD_HEADER_SIZE + len + 4 > ring_data.len() {
216                break; // Incomplete record at end
217            }
218
219            // Extract data and CRC
220            let data_start = offset + RECORD_HEADER_SIZE;
221            let data_end = data_start + len;
222            let data = &ring_data[data_start..data_end];
223
224            let crc_offset = data_end;
225            let expected_crc = u32::from_le_bytes([
226                ring_data[crc_offset],
227                ring_data[crc_offset + 1],
228                ring_data[crc_offset + 2],
229                ring_data[crc_offset + 3],
230            ]);
231
232            // Verify checksum
233            if checksum::verify(data, expected_crc) {
234                // Valid record
235                match bincode::deserialize::<Record>(data) {
236                    Ok(record) => {
237                        records.push((lsn, record));
238                        offset = crc_offset + 4;
239                    }
240                    Err(_) => {
241                        // Corrupted record, skip
242                        break;
243                    }
244                }
245            } else {
246                // Invalid checksum, likely end of valid data
247                break;
248            }
249        }
250
251        // Sort by LSN (in case of wrap-around)
252        records.sort_by_key(|(lsn, _)| *lsn);
253
254        Ok(records)
255    }
256
257    /// Read all records from WAL
258    pub fn read_all(&self) -> Result<Vec<(Lsn, Record)>> {
259        let inner = self.inner.lock();
260        let mut file = inner.file.try_clone()?;
261        drop(inner);
262
263        let inner = self.inner.lock();
264        let region = inner.region;
265        drop(inner);
266
267        Self::recover(&mut file, &region)
268    }
269
270    /// Set checkpoint LSN (for compaction)
271    pub fn set_checkpoint(&self, lsn: Lsn) -> Result<()> {
272        let mut inner = self.inner.lock();
273        inner.checkpoint_lsn = lsn;
274        Ok(())
275    }
276
277    /// Compact WAL by removing entries before checkpoint
278    /// Note: In a ring buffer, this just means we can overwrite them on next wrap
279    pub fn compact(&self) -> Result<()> {
280        // In ring buffer WAL, compaction is implicit:
281        // - Records before checkpoint_lsn can be overwritten on wrap-around
282        // - No explicit truncation needed
283        Ok(())
284    }
285
286    /// Get next LSN
287    pub fn next_lsn(&self) -> Lsn {
288        let inner = self.inner.lock();
289        inner.next_lsn
290    }
291
292    /// Set batch timeout for group commit
293    pub fn set_batch_timeout(&self, timeout: Duration) {
294        let mut inner = self.inner.lock();
295        inner.batch_timeout = timeout;
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use crate::{Key, Value, layout::Region};
303    use tempfile::NamedTempFile;
304    use std::collections::HashMap;
305
306    #[test]
307    fn test_wal_ring_create_and_append() {
308        let tmp = NamedTempFile::new().unwrap();
309        let region = Region::new(0, 64 * 1024); // 64KB ring
310
311        let wal = WalRing::create(tmp.path(), region).unwrap();
312
313        let key = Key::new(b"test".to_vec());
314        let mut item = HashMap::new();
315        item.insert("value".to_string(), Value::string("hello"));
316        let record = Record::put(key, item, 1);
317
318        let lsn = wal.append(record).unwrap();
319        assert_eq!(lsn, 1);
320
321        wal.flush().unwrap();
322
323        let records = wal.read_all().unwrap();
324        assert_eq!(records.len(), 1);
325        assert_eq!(records[0].0, 1);
326    }
327
328    #[test]
329    fn test_wal_ring_recovery() {
330        let tmp = NamedTempFile::new().unwrap();
331        let region = Region::new(0, 64 * 1024);
332
333        // Write some records
334        {
335            let wal = WalRing::create(tmp.path(), region).unwrap();
336
337            for i in 0..5 {
338                let key = Key::new(format!("key{}", i).into_bytes());
339                let item = HashMap::new();
340                let record = Record::put(key, item, i);
341                wal.append(record).unwrap();
342            }
343
344            wal.flush().unwrap();
345        }
346
347        // Reopen and verify recovery
348        let wal = WalRing::open(tmp.path(), region).unwrap();
349        assert_eq!(wal.next_lsn(), 6);
350
351        let records = wal.read_all().unwrap();
352        assert_eq!(records.len(), 5);
353    }
354
355    #[test]
356    fn test_wal_ring_group_commit() {
357        let tmp = NamedTempFile::new().unwrap();
358        let region = Region::new(0, 64 * 1024);
359
360        let wal = WalRing::create(tmp.path(), region).unwrap();
361
362        // Append multiple records without flushing
363        for i in 0..10 {
364            let key = Key::new(format!("key{}", i).into_bytes());
365            let item = HashMap::new();
366            let record = Record::put(key, item, i);
367            wal.append(record).unwrap();
368        }
369
370        // Single flush commits all
371        wal.flush().unwrap();
372
373        let records = wal.read_all().unwrap();
374        assert_eq!(records.len(), 10);
375    }
376
377    #[test]
378    fn test_wal_ring_wrap_around() {
379        let tmp = NamedTempFile::new().unwrap();
380        let region = Region::new(0, 1024); // Small ring to force wrap
381
382        let wal = WalRing::create(tmp.path(), region).unwrap();
383
384        // Write enough to cause wrap-around
385        for i in 0..50 {
386            let key = Key::new(format!("key{}", i).into_bytes());
387            let mut item = HashMap::new();
388            item.insert("data".to_string(), Value::string("x".repeat(50)));
389            let record = Record::put(key, item, i);
390            wal.append(record).unwrap();
391            wal.flush().unwrap();
392        }
393
394        // Should still be able to read (though older records may be overwritten)
395        let records = wal.read_all().unwrap();
396        assert!(!records.is_empty());
397    }
398
399    #[test]
400    fn test_wal_ring_checkpoint() {
401        let tmp = NamedTempFile::new().unwrap();
402        let region = Region::new(0, 64 * 1024);
403
404        let wal = WalRing::create(tmp.path(), region).unwrap();
405
406        for i in 0..10 {
407            let key = Key::new(format!("key{}", i).into_bytes());
408            let item = HashMap::new();
409            let record = Record::put(key, item, i);
410            wal.append(record).unwrap();
411        }
412
413        wal.flush().unwrap();
414
415        // Set checkpoint
416        wal.set_checkpoint(5).unwrap();
417        wal.compact().unwrap(); // No-op for ring buffer, but should succeed
418
419        let records = wal.read_all().unwrap();
420        assert_eq!(records.len(), 10); // Compact doesn't remove in ring buffer
421    }
422}