Skip to main content

sochdb_storage/
batch_wal.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Batched WAL with Vectored I/O
19//!
20//! This module implements batched WAL writing using vectored I/O (writev)
21//! to minimize syscall overhead and improve write throughput.
22//!
23//! ## Problem Analysis
24//!
25//! Current WAL writes one entry per syscall:
26//! - Syscall overhead: ~200-400 cycles
27//! - Context switch potential: ~1000 cycles
28//! - For 100K rows × 4 cols: 400K syscalls
29//!
30//! ## Solution
31//!
32//! Batch WAL entries into single vectored write:
33//! - Single syscall per batch (up to 1000 entries)
34//! - Vectored I/O (writev) eliminates intermediate copies
35//!
36//! ## Math
37//!
38//! Syscall amortization for N=100K entries, k=4 columns, B=1000 batch size:
39//!
40//! T_unbatched = N × k × S = 100K × 4 × 300 = 120M cycles
41//! T_batched = ⌈N × k / B⌉ × S = ⌈400K / 1000⌉ × 300 = 120K cycles
42//!
43//! **Speedup: 1000× (syscall portion only)**
44//!
45//! ## Performance
46//!
47//! Expected throughput: 10-20× improvement for bulk inserts
48
49use std::fs::{File, OpenOptions};
50use std::io::{self, IoSlice, Write};
51use std::path::Path;
52use std::sync::atomic::{AtomicU64, Ordering};
53
54use crate::txn_wal::TxnWalEntry;
55use parking_lot::Mutex;
56use sochdb_core::{Result, SochDBError};
57
58/// Batch header format:
59/// [magic: u32][version: u16][entry_count: u16][total_bytes: u32][checksum: u32]
60const BATCH_HEADER_SIZE: usize = 16;
61const BATCH_MAGIC: u32 = 0x42415443; // "BATC"
62const BATCH_VERSION: u16 = 1;
63
64/// Default maximum batch size (number of entries)
65pub const DEFAULT_MAX_BATCH_SIZE: usize = 1000;
66
67/// Default maximum batch bytes (64KB)
68pub const DEFAULT_MAX_BATCH_BYTES: usize = 64 * 1024;
69
70/// Statistics for batched WAL writer
71#[derive(Debug, Default, Clone)]
72pub struct BatchedWalStats {
73    /// Total entries written
74    pub entries_written: u64,
75    /// Total batches written
76    pub batches_written: u64,
77    /// Total bytes written
78    pub bytes_written: u64,
79    /// Total syncs performed
80    pub syncs_performed: u64,
81    /// Average batch size
82    pub avg_batch_size: f64,
83}
84
85/// Batched WAL writer with vectored I/O
86///
87/// Accumulates WAL entries and writes them in batches using writev()
88/// for optimal I/O performance.
89pub struct BatchedWalWriter {
90    /// File handle
91    file: File,
92    /// Pending entries (pre-serialized)
93    pending: Vec<Vec<u8>>,
94    /// Total pending bytes
95    pending_bytes: usize,
96    /// Maximum batch size (entries)
97    max_batch_size: usize,
98    /// Maximum batch bytes
99    max_batch_bytes: usize,
100    /// Batch header buffer (reused)
101    header_buf: Vec<u8>,
102    /// Statistics
103    stats: BatchedWalStats,
104}
105
106impl BatchedWalWriter {
107    /// Create a new batched WAL writer
108    ///
109    /// # Arguments
110    /// * `path` - Path to WAL file
111    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
112        Self::with_config(path, DEFAULT_MAX_BATCH_SIZE, DEFAULT_MAX_BATCH_BYTES)
113    }
114
115    /// Create with custom configuration
116    ///
117    /// # Arguments
118    /// * `path` - Path to WAL file
119    /// * `max_batch_size` - Maximum entries per batch
120    /// * `max_batch_bytes` - Maximum bytes per batch
121    pub fn with_config<P: AsRef<Path>>(
122        path: P,
123        max_batch_size: usize,
124        max_batch_bytes: usize,
125    ) -> Result<Self> {
126        let file = OpenOptions::new()
127            .create(true)
128            .append(true)
129            .open(path.as_ref())
130            .map_err(SochDBError::Io)?;
131
132        Ok(Self {
133            file,
134            pending: Vec::with_capacity(max_batch_size),
135            pending_bytes: 0,
136            max_batch_size,
137            max_batch_bytes,
138            header_buf: vec![0u8; BATCH_HEADER_SIZE],
139            stats: BatchedWalStats::default(),
140        })
141    }
142
143    /// Create from an existing file handle
144    pub fn from_file(file: File) -> Self {
145        Self {
146            file,
147            pending: Vec::with_capacity(DEFAULT_MAX_BATCH_SIZE),
148            pending_bytes: 0,
149            max_batch_size: DEFAULT_MAX_BATCH_SIZE,
150            max_batch_bytes: DEFAULT_MAX_BATCH_BYTES,
151            header_buf: vec![0u8; BATCH_HEADER_SIZE],
152            stats: BatchedWalStats::default(),
153        }
154    }
155
156    /// Add entry to pending batch
157    ///
158    /// Entry will be serialized and added to the pending batch.
159    /// Automatic flush occurs if batch limits are reached.
160    pub fn append(&mut self, entry: &TxnWalEntry) -> Result<()> {
161        let serialized = entry.to_bytes();
162        self.pending_bytes += serialized.len();
163        self.pending.push(serialized);
164
165        // Auto-flush if limits reached
166        if self.pending.len() >= self.max_batch_size || self.pending_bytes >= self.max_batch_bytes {
167            self.flush()?;
168        }
169
170        Ok(())
171    }
172
173    /// Add pre-serialized entry bytes
174    #[inline]
175    pub fn append_bytes(&mut self, bytes: Vec<u8>) -> Result<()> {
176        self.pending_bytes += bytes.len();
177        self.pending.push(bytes);
178
179        if self.pending.len() >= self.max_batch_size || self.pending_bytes >= self.max_batch_bytes {
180            self.flush()?;
181        }
182
183        Ok(())
184    }
185
186    /// Flush pending entries with vectored I/O
187    ///
188    /// Returns the number of entries written.
189    pub fn flush(&mut self) -> Result<usize> {
190        if self.pending.is_empty() {
191            return Ok(0);
192        }
193
194        let count = self.pending.len();
195
196        // Build batch header
197        self.header_buf[0..4].copy_from_slice(&BATCH_MAGIC.to_le_bytes());
198        self.header_buf[4..6].copy_from_slice(&BATCH_VERSION.to_le_bytes());
199        self.header_buf[6..8].copy_from_slice(&(count as u16).to_le_bytes());
200        self.header_buf[8..12].copy_from_slice(&(self.pending_bytes as u32).to_le_bytes());
201
202        // Compute checksum over header (excluding checksum field)
203        let checksum = crc32fast::hash(&self.header_buf[..12]);
204        self.header_buf[12..16].copy_from_slice(&checksum.to_le_bytes());
205
206        // Build iovec array for writev()
207        let mut iovecs: Vec<IoSlice> = Vec::with_capacity(1 + self.pending.len());
208        iovecs.push(IoSlice::new(&self.header_buf));
209        for entry in &self.pending {
210            iovecs.push(IoSlice::new(entry));
211        }
212
213        // Single vectored write - NO INTERMEDIATE COPIES
214        let expected = BATCH_HEADER_SIZE + self.pending_bytes;
215        let written = self.file.write_vectored(&iovecs).map_err(SochDBError::Io)?;
216
217        if written != expected {
218            return Err(SochDBError::Io(io::Error::new(
219                io::ErrorKind::WriteZero,
220                format!("Incomplete batch write: {} < {}", written, expected),
221            )));
222        }
223
224        // Update stats
225        self.stats.entries_written += count as u64;
226        self.stats.batches_written += 1;
227        self.stats.bytes_written += written as u64;
228        self.stats.avg_batch_size =
229            self.stats.entries_written as f64 / self.stats.batches_written as f64;
230
231        // Clear pending
232        self.pending.clear();
233        self.pending_bytes = 0;
234
235        Ok(count)
236    }
237
238    /// Sync to disk (fsync)
239    pub fn sync(&mut self) -> Result<()> {
240        // Flush any pending entries first
241        if !self.pending.is_empty() {
242            self.flush()?;
243        }
244
245        self.file.sync_data().map_err(SochDBError::Io)?;
246
247        self.stats.syncs_performed += 1;
248        Ok(())
249    }
250
251    /// Get statistics
252    pub fn stats(&self) -> BatchedWalStats {
253        self.stats.clone()
254    }
255
256    /// Get pending entry count
257    #[inline]
258    pub fn pending_count(&self) -> usize {
259        self.pending.len()
260    }
261
262    /// Get pending bytes
263    #[inline]
264    pub fn pending_bytes(&self) -> usize {
265        self.pending_bytes
266    }
267}
268
269impl Drop for BatchedWalWriter {
270    fn drop(&mut self) {
271        // Best effort flush on drop
272        let _ = self.flush();
273    }
274}
275
276/// Batch entry accumulator for a single transaction
277///
278/// Collects all writes for a transaction and commits them as a single batch.
279pub struct BatchAccumulator {
280    /// Transaction ID
281    txn_id: u64,
282    /// Accumulated entries
283    entries: Vec<TxnWalEntry>,
284}
285
286impl BatchAccumulator {
287    /// Create a new batch accumulator for a transaction
288    pub fn new(txn_id: u64) -> Self {
289        Self {
290            txn_id,
291            entries: Vec::with_capacity(16),
292        }
293    }
294
295    /// Add a write to the batch (does not hit WAL yet)
296    pub fn write(&mut self, key: Vec<u8>, value: Vec<u8>) {
297        self.entries
298            .push(TxnWalEntry::data(self.txn_id, key, value));
299    }
300
301    /// Add a delete to the batch
302    pub fn delete(&mut self, key: Vec<u8>) {
303        // Delete is represented as a data entry with empty value
304        // The storage layer interprets empty value as tombstone
305        self.entries
306            .push(TxnWalEntry::data(self.txn_id, key, Vec::new()));
307    }
308
309    /// Get entry count
310    #[inline]
311    pub fn len(&self) -> usize {
312        self.entries.len()
313    }
314
315    /// Check if empty
316    #[inline]
317    pub fn is_empty(&self) -> bool {
318        self.entries.is_empty()
319    }
320
321    /// Commit batch - writes all entries to WAL in single batch
322    ///
323    /// # Arguments
324    /// * `writer` - Batched WAL writer
325    ///
326    /// # Returns
327    /// The number of entries written
328    pub fn commit(mut self, writer: &mut BatchedWalWriter) -> Result<usize> {
329        // Add commit marker
330        self.entries.push(TxnWalEntry::txn_commit(self.txn_id));
331
332        let count = self.entries.len();
333
334        // Append all entries to WAL
335        for entry in &self.entries {
336            writer.append(entry)?;
337        }
338
339        // Force flush and sync on commit
340        writer.flush()?;
341        writer.sync()?;
342
343        Ok(count)
344    }
345
346    /// Abort the batch (discard all pending writes)
347    pub fn abort(self) {
348        // Just drop the entries - nothing written to WAL
349    }
350
351    /// Get the transaction ID
352    #[inline]
353    pub fn txn_id(&self) -> u64 {
354        self.txn_id
355    }
356}
357
358/// Thread-safe batched WAL writer
359///
360/// Wraps BatchedWalWriter with a mutex for concurrent access.
361pub struct ConcurrentBatchedWal {
362    inner: Mutex<BatchedWalWriter>,
363    /// Next transaction ID
364    next_txn_id: AtomicU64,
365}
366
367impl ConcurrentBatchedWal {
368    /// Create a new concurrent batched WAL
369    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
370        Ok(Self {
371            inner: Mutex::new(BatchedWalWriter::new(path)?),
372            next_txn_id: AtomicU64::new(1),
373        })
374    }
375
376    /// Begin a new transaction batch
377    pub fn begin(&self) -> BatchAccumulator {
378        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
379        BatchAccumulator::new(txn_id)
380    }
381
382    /// Commit a transaction batch
383    pub fn commit(&self, batch: BatchAccumulator) -> Result<usize> {
384        let mut writer = self.inner.lock();
385        batch.commit(&mut writer)
386    }
387
388    /// Append a single entry
389    pub fn append(&self, entry: &TxnWalEntry) -> Result<()> {
390        self.inner.lock().append(entry)
391    }
392
393    /// Force flush
394    pub fn flush(&self) -> Result<usize> {
395        self.inner.lock().flush()
396    }
397
398    /// Force sync
399    pub fn sync(&self) -> Result<()> {
400        self.inner.lock().sync()
401    }
402
403    /// Get statistics
404    pub fn stats(&self) -> BatchedWalStats {
405        self.inner.lock().stats()
406    }
407}
408
409/// Batch reader for recovery
410///
411/// Reads batched WAL entries during crash recovery.
412pub struct BatchedWalReader {
413    file: File,
414    position: u64,
415}
416
417impl BatchedWalReader {
418    /// Open a batched WAL file for reading
419    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
420        let file = File::open(path.as_ref()).map_err(SochDBError::Io)?;
421
422        Ok(Self { file, position: 0 })
423    }
424
425    /// Read the next batch of entries
426    ///
427    /// Returns None if EOF or error
428    pub fn read_batch(&mut self) -> Result<Option<Vec<TxnWalEntry>>> {
429        use std::io::Read;
430
431        // Read batch header
432        let mut header = [0u8; BATCH_HEADER_SIZE];
433        match self.file.read_exact(&mut header) {
434            Ok(_) => {}
435            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
436            Err(e) => return Err(SochDBError::Io(e)),
437        }
438
439        // Validate magic
440        let magic = u32::from_le_bytes(header[0..4].try_into().unwrap());
441        if magic != BATCH_MAGIC {
442            return Err(SochDBError::Internal("Invalid batch magic".into()));
443        }
444
445        // Read batch metadata
446        let _version = u16::from_le_bytes(header[4..6].try_into().unwrap());
447        let entry_count = u16::from_le_bytes(header[6..8].try_into().unwrap()) as usize;
448        let total_bytes = u32::from_le_bytes(header[8..12].try_into().unwrap()) as usize;
449        let stored_checksum = u32::from_le_bytes(header[12..16].try_into().unwrap());
450
451        // Validate checksum
452        let computed_checksum = crc32fast::hash(&header[..12]);
453        if stored_checksum != computed_checksum {
454            return Err(SochDBError::Internal(
455                "Batch header checksum mismatch".into(),
456            ));
457        }
458
459        // Read all entry data
460        let mut data = vec![0u8; total_bytes];
461        self.file.read_exact(&mut data).map_err(SochDBError::Io)?;
462
463        // Parse individual entries
464        let mut entries = Vec::with_capacity(entry_count);
465        let mut cursor = std::io::Cursor::new(&data);
466
467        for _ in 0..entry_count {
468            let entry = TxnWalEntry::from_reader(&mut cursor)?;
469            entries.push(entry);
470        }
471
472        self.position += BATCH_HEADER_SIZE as u64 + total_bytes as u64;
473
474        Ok(Some(entries))
475    }
476
477    /// Get current file position
478    pub fn position(&self) -> u64 {
479        self.position
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486    use tempfile::tempdir;
487
488    #[test]
489    fn test_batch_write_and_read() {
490        let dir = tempdir().unwrap();
491        let path = dir.path().join("test.wal");
492
493        // Write some entries
494        {
495            let mut writer = BatchedWalWriter::new(&path).unwrap();
496
497            for i in 0..10 {
498                let entry = TxnWalEntry::data(
499                    1,
500                    format!("key{}", i).into_bytes(),
501                    format!("value{}", i).into_bytes(),
502                );
503                writer.append(&entry).unwrap();
504            }
505
506            writer.flush().unwrap();
507        }
508
509        // Read back
510        {
511            let mut reader = BatchedWalReader::open(&path).unwrap();
512            let batch = reader.read_batch().unwrap().unwrap();
513
514            assert_eq!(batch.len(), 10);
515            for (i, entry) in batch.iter().enumerate() {
516                assert_eq!(entry.key, format!("key{}", i).into_bytes());
517                assert_eq!(entry.value, format!("value{}", i).into_bytes());
518            }
519        }
520    }
521
522    #[test]
523    fn test_auto_flush_on_limit() {
524        let dir = tempdir().unwrap();
525        let path = dir.path().join("test.wal");
526
527        let mut writer = BatchedWalWriter::with_config(&path, 5, 1024 * 1024).unwrap();
528
529        // Add 4 entries - should not auto-flush
530        for i in 0..4 {
531            let entry = TxnWalEntry::data(1, vec![i], vec![i]);
532            writer.append(&entry).unwrap();
533        }
534        assert_eq!(writer.pending_count(), 4);
535
536        // Add 5th entry - should auto-flush
537        let entry = TxnWalEntry::data(1, vec![4], vec![4]);
538        writer.append(&entry).unwrap();
539        assert_eq!(writer.pending_count(), 0); // Flushed
540
541        let stats = writer.stats();
542        assert_eq!(stats.batches_written, 1);
543        assert_eq!(stats.entries_written, 5);
544    }
545
546    #[test]
547    fn test_batch_accumulator() {
548        let dir = tempdir().unwrap();
549        let path = dir.path().join("test.wal");
550
551        let wal = ConcurrentBatchedWal::new(&path).unwrap();
552
553        // Begin transaction and add writes
554        let mut batch = wal.begin();
555        batch.write(b"key1".to_vec(), b"value1".to_vec());
556        batch.write(b"key2".to_vec(), b"value2".to_vec());
557        batch.write(b"key3".to_vec(), b"value3".to_vec());
558
559        assert_eq!(batch.len(), 3);
560
561        // Commit
562        let count = wal.commit(batch).unwrap();
563        assert_eq!(count, 4); // 3 writes + 1 commit marker
564
565        // Verify stats
566        let stats = wal.stats();
567        assert_eq!(stats.entries_written, 4);
568    }
569
570    #[test]
571    fn test_batch_abort() {
572        let dir = tempdir().unwrap();
573        let path = dir.path().join("test.wal");
574
575        let wal = ConcurrentBatchedWal::new(&path).unwrap();
576        let wal_stats_before = wal.stats();
577
578        // Begin transaction
579        let mut batch = wal.begin();
580        batch.write(b"key1".to_vec(), b"value1".to_vec());
581        batch.write(b"key2".to_vec(), b"value2".to_vec());
582
583        // Abort
584        batch.abort();
585
586        // Verify nothing was written
587        let stats = wal.stats();
588        assert_eq!(stats.entries_written, wal_stats_before.entries_written);
589    }
590}