Skip to main content

sochdb_kernel/
wal.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Write-Ahead Logging (WAL)
19//!
20//! Minimal WAL implementation for the kernel.
21//! Provides durability guarantees for transactions.
22
23use crate::error::{KernelError, KernelResult, WalErrorKind};
24use crate::kernel_api::PageId;
25use crate::transaction::TransactionId;
26use bytes::{BufMut, Bytes, BytesMut};
27use parking_lot::{Mutex, RwLock};
28use std::collections::HashMap;
29use std::fs::{File, OpenOptions};
30use std::io::{Read, Seek, SeekFrom, Write};
31use std::path::{Path, PathBuf};
32use std::sync::atomic::{AtomicU64, Ordering};
33
34/// Log Sequence Number - unique identifier for WAL records
35#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
36pub struct LogSequenceNumber(pub u64);
37
38impl LogSequenceNumber {
39    /// Invalid/null LSN (max value as sentinel)
40    pub const INVALID: Self = Self(u64::MAX);
41
42    /// Create a new LSN
43    pub fn new(value: u64) -> Self {
44        Self(value)
45    }
46
47    /// Get the raw value
48    pub fn value(&self) -> u64 {
49        self.0
50    }
51
52    /// Check if valid
53    pub fn is_valid(&self) -> bool {
54        self.0 != u64::MAX
55    }
56}
57
58impl std::fmt::Display for LogSequenceNumber {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "LSN({})", self.0)
61    }
62}
63
64/// WAL record types for kernel-level operations.
65///
66/// This is a local enum with on-disk byte values (1-11) for backward
67/// compatibility with existing WAL files. Use `to_canonical()` / `from_canonical()`
68/// to convert to/from `sochdb_core::txn::WalRecordType` (the canonical superset).
69///
70/// Disk byte mapping (DO NOT CHANGE without migration):
71///   Begin=1, Commit=2, Abort=3, Update=4, Insert=5, Delete=6,
72///   Clr=7, CheckpointBegin=8, CheckpointEnd=9, AllocPage=10, FreePage=11
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74#[repr(u8)]
75pub enum WalRecordType {
76    /// Transaction begin (canonical: TxnBegin)
77    Begin = 1,
78    /// Transaction commit (canonical: TxnCommit)
79    Commit = 2,
80    /// Transaction abort (canonical: TxnAbort)
81    Abort = 3,
82    /// Data update with undo info (canonical: PageUpdate)
83    Update = 4,
84    /// Data insert (canonical: Data)
85    Insert = 5,
86    /// Data delete (canonical: Delete)
87    Delete = 6,
88    /// Compensation log record for rollback (canonical: CompensationLogRecord)
89    Clr = 7,
90    /// Checkpoint begin (canonical: Checkpoint)
91    CheckpointBegin = 8,
92    /// Checkpoint end (canonical: CheckpointEnd)
93    CheckpointEnd = 9,
94    /// Page allocation (no canonical equivalent yet)
95    AllocPage = 10,
96    /// Page deallocation (no canonical equivalent yet)
97    FreePage = 11,
98}
99
100impl WalRecordType {
101    /// Convert to the canonical `sochdb_core::txn::WalRecordType`.
102    /// Returns `None` for variants without a canonical equivalent (AllocPage, FreePage).
103    pub fn to_canonical(self) -> Option<sochdb_core::txn::WalRecordType> {
104        use sochdb_core::txn::WalRecordType as C;
105        match self {
106            Self::Begin => Some(C::TxnBegin),
107            Self::Commit => Some(C::TxnCommit),
108            Self::Abort => Some(C::TxnAbort),
109            Self::Update => Some(C::PageUpdate),
110            Self::Insert => Some(C::Data),
111            Self::Delete => Some(C::Delete),
112            Self::Clr => Some(C::CompensationLogRecord),
113            Self::CheckpointBegin => Some(C::Checkpoint),
114            Self::CheckpointEnd => Some(C::CheckpointEnd),
115            Self::AllocPage | Self::FreePage => None,
116        }
117    }
118
119    /// Convert from the canonical `sochdb_core::txn::WalRecordType`.
120    pub fn from_canonical(rt: sochdb_core::txn::WalRecordType) -> Option<Self> {
121        use sochdb_core::txn::WalRecordType as C;
122        match rt {
123            C::TxnBegin => Some(Self::Begin),
124            C::TxnCommit => Some(Self::Commit),
125            C::TxnAbort => Some(Self::Abort),
126            C::PageUpdate => Some(Self::Update),
127            C::Data => Some(Self::Insert),
128            C::Delete => Some(Self::Delete),
129            C::CompensationLogRecord => Some(Self::Clr),
130            C::Checkpoint => Some(Self::CheckpointBegin),
131            C::CheckpointEnd => Some(Self::CheckpointEnd),
132            _ => None,
133        }
134    }
135}
136
137impl TryFrom<u8> for WalRecordType {
138    type Error = KernelError;
139
140    fn try_from(value: u8) -> Result<Self, Self::Error> {
141        match value {
142            1 => Ok(Self::Begin),
143            2 => Ok(Self::Commit),
144            3 => Ok(Self::Abort),
145            4 => Ok(Self::Update),
146            5 => Ok(Self::Insert),
147            6 => Ok(Self::Delete),
148            7 => Ok(Self::Clr),
149            8 => Ok(Self::CheckpointBegin),
150            9 => Ok(Self::CheckpointEnd),
151            10 => Ok(Self::AllocPage),
152            11 => Ok(Self::FreePage),
153            _ => Err(KernelError::Wal {
154                kind: WalErrorKind::Corrupted,
155            }),
156        }
157    }
158}
159
160/// WAL record
161#[derive(Debug, Clone)]
162pub struct WalRecord {
163    /// Log sequence number
164    pub lsn: LogSequenceNumber,
165    /// Previous LSN for this transaction (for undo chain)
166    pub prev_lsn: LogSequenceNumber,
167    /// Transaction ID
168    pub txn_id: TransactionId,
169    /// Record type
170    pub record_type: WalRecordType,
171    /// Page ID (for page-level operations)
172    pub page_id: Option<PageId>,
173    /// Redo data
174    pub redo_data: Bytes,
175    /// Undo data (for compensation)
176    pub undo_data: Bytes,
177    /// Checksum
178    pub checksum: u32,
179}
180
181impl WalRecord {
182    /// Record header size: lsn(8) + prev_lsn(8) + txn_id(8) + type(1) + page_id(8) + redo_len(4) + undo_len(4) + checksum(4)
183    const HEADER_SIZE: usize = 45;
184
185    /// Create a new WAL record
186    pub fn new(
187        lsn: LogSequenceNumber,
188        prev_lsn: LogSequenceNumber,
189        txn_id: TransactionId,
190        record_type: WalRecordType,
191        page_id: Option<PageId>,
192        redo_data: Bytes,
193        undo_data: Bytes,
194    ) -> Self {
195        let mut record = Self {
196            lsn,
197            prev_lsn,
198            txn_id,
199            record_type,
200            page_id,
201            redo_data,
202            undo_data,
203            checksum: 0,
204        };
205        record.checksum = record.compute_checksum();
206        record
207    }
208
209    /// Serialize to bytes
210    pub fn serialize(&self) -> Bytes {
211        let mut buf = BytesMut::with_capacity(
212            Self::HEADER_SIZE + self.redo_data.len() + self.undo_data.len(),
213        );
214
215        buf.put_u64_le(self.lsn.0);
216        buf.put_u64_le(self.prev_lsn.0);
217        buf.put_u64_le(self.txn_id);
218        buf.put_u8(self.record_type as u8);
219        buf.put_u64_le(self.page_id.unwrap_or(0));
220        buf.put_u32_le(self.redo_data.len() as u32);
221        buf.put_u32_le(self.undo_data.len() as u32);
222        buf.put_slice(&self.redo_data);
223        buf.put_slice(&self.undo_data);
224        buf.put_u32_le(self.checksum);
225
226        buf.freeze()
227    }
228
229    /// Deserialize from bytes
230    pub fn deserialize(data: &[u8]) -> KernelResult<Self> {
231        if data.len() < Self::HEADER_SIZE {
232            return Err(KernelError::Wal {
233                kind: WalErrorKind::Corrupted,
234            });
235        }
236
237        let lsn = LogSequenceNumber(u64::from_le_bytes(data[0..8].try_into().unwrap()));
238        let prev_lsn = LogSequenceNumber(u64::from_le_bytes(data[8..16].try_into().unwrap()));
239        let txn_id = u64::from_le_bytes(data[16..24].try_into().unwrap());
240        let record_type = WalRecordType::try_from(data[24])?;
241        let page_id_raw = u64::from_le_bytes(data[25..33].try_into().unwrap());
242        let page_id = if page_id_raw == 0 {
243            None
244        } else {
245            Some(page_id_raw)
246        };
247        let redo_len = u32::from_le_bytes(data[33..37].try_into().unwrap()) as usize;
248        let undo_len = u32::from_le_bytes(data[37..41].try_into().unwrap()) as usize;
249
250        let expected_len = Self::HEADER_SIZE + redo_len + undo_len;
251        if data.len() < expected_len {
252            return Err(KernelError::Wal {
253                kind: WalErrorKind::Corrupted,
254            });
255        }
256
257        let redo_start = 41;
258        let redo_data = Bytes::copy_from_slice(&data[redo_start..redo_start + redo_len]);
259        let undo_start = redo_start + redo_len;
260        let undo_data = Bytes::copy_from_slice(&data[undo_start..undo_start + undo_len]);
261        let checksum_start = undo_start + undo_len;
262        let checksum =
263            u32::from_le_bytes(data[checksum_start..checksum_start + 4].try_into().unwrap());
264
265        let record = Self {
266            lsn,
267            prev_lsn,
268            txn_id,
269            record_type,
270            page_id,
271            redo_data,
272            undo_data,
273            checksum,
274        };
275
276        // Verify checksum
277        let computed = record.compute_checksum();
278        if computed != checksum {
279            return Err(KernelError::Wal {
280                kind: WalErrorKind::ChecksumMismatch {
281                    expected: checksum,
282                    actual: computed,
283                },
284            });
285        }
286
287        Ok(record)
288    }
289
290    /// Compute checksum for the record
291    fn compute_checksum(&self) -> u32 {
292        let mut hasher = crc32fast::Hasher::new();
293        hasher.update(&self.lsn.0.to_le_bytes());
294        hasher.update(&self.prev_lsn.0.to_le_bytes());
295        hasher.update(&self.txn_id.to_le_bytes());
296        hasher.update(&[self.record_type as u8]);
297        hasher.update(&self.page_id.unwrap_or(0).to_le_bytes());
298        hasher.update(&self.redo_data);
299        hasher.update(&self.undo_data);
300        hasher.finalize()
301    }
302
303    /// Get serialized size
304    pub fn size(&self) -> usize {
305        Self::HEADER_SIZE + self.redo_data.len() + self.undo_data.len()
306    }
307}
308
309/// WAL Manager
310///
311/// Manages write-ahead log for durability.
312pub struct WalManager {
313    /// WAL file path
314    path: PathBuf,
315    /// WAL file handle
316    file: Mutex<File>,
317    /// Next LSN to allocate
318    next_lsn: AtomicU64,
319    /// Durable LSN (everything up to this is fsynced)
320    durable_lsn: AtomicU64,
321    /// Per-transaction last LSN (for undo chain)
322    txn_last_lsn: RwLock<HashMap<TransactionId, LogSequenceNumber>>,
323    /// Last checkpoint LSN
324    checkpoint_lsn: AtomicU64,
325    /// Buffer for batching writes
326    write_buffer: Mutex<BytesMut>,
327    /// Buffer threshold for auto-flush (bytes)
328    buffer_threshold: usize,
329}
330
331impl WalManager {
332    /// Default buffer threshold: 64KB
333    const DEFAULT_BUFFER_THRESHOLD: usize = 64 * 1024;
334
335    /// Open or create a WAL file
336    pub fn open(path: impl AsRef<Path>) -> KernelResult<Self> {
337        let path = path.as_ref().to_path_buf();
338
339        let file = OpenOptions::new()
340            .read(true)
341            .write(true)
342            .create(true)
343            .truncate(false)
344            .open(&path)?;
345
346        let file_len = file.metadata()?.len();
347        // LSN starts at 0 for empty file, otherwise at end of file for new writes
348        let next_lsn = file_len;
349
350        Ok(Self {
351            path,
352            file: Mutex::new(file),
353            next_lsn: AtomicU64::new(next_lsn),
354            durable_lsn: AtomicU64::new(if file_len > 0 { file_len } else { 0 }),
355            txn_last_lsn: RwLock::new(HashMap::new()),
356            checkpoint_lsn: AtomicU64::new(0),
357            write_buffer: Mutex::new(BytesMut::with_capacity(Self::DEFAULT_BUFFER_THRESHOLD)),
358            buffer_threshold: Self::DEFAULT_BUFFER_THRESHOLD,
359        })
360    }
361
362    /// Append a record to the WAL
363    ///
364    /// Returns the LSN of the appended record.
365    pub fn append(&self, record: &mut WalRecord) -> KernelResult<LogSequenceNumber> {
366        // Allocate LSN
367        let lsn = LogSequenceNumber(
368            self.next_lsn
369                .fetch_add(record.size() as u64, Ordering::SeqCst),
370        );
371        record.lsn = lsn;
372
373        // Set prev_lsn from transaction's last LSN
374        if let Some(&prev) = self.txn_last_lsn.read().get(&record.txn_id) {
375            record.prev_lsn = prev;
376        }
377
378        // Update checksum with final LSN
379        record.checksum = record.compute_checksum();
380
381        // Serialize
382        let data = record.serialize();
383
384        // Buffer the write
385        let mut buffer = self.write_buffer.lock();
386        buffer.extend_from_slice(&data);
387
388        // Update transaction's last LSN
389        self.txn_last_lsn.write().insert(record.txn_id, lsn);
390
391        // Auto-flush if buffer exceeds threshold
392        if buffer.len() >= self.buffer_threshold {
393            drop(buffer);
394            self.flush()?;
395        }
396
397        Ok(lsn)
398    }
399
400    /// Flush buffered writes to disk
401    pub fn flush(&self) -> KernelResult<()> {
402        let mut buffer = self.write_buffer.lock();
403        if buffer.is_empty() {
404            return Ok(());
405        }
406
407        let data = buffer.split().freeze();
408        let mut file = self.file.lock();
409
410        // Seek to end and write
411        file.seek(SeekFrom::End(0))?;
412        file.write_all(&data)?;
413
414        Ok(())
415    }
416
417    /// Sync WAL to durable storage (fsync)
418    pub fn sync(&self) -> KernelResult<LogSequenceNumber> {
419        // First flush any buffered writes
420        self.flush()?;
421
422        // Then fsync
423        let file = self.file.lock();
424        file.sync_all()?;
425
426        // Update durable LSN
427        let current_lsn = self.next_lsn.load(Ordering::SeqCst);
428        self.durable_lsn.store(current_lsn, Ordering::SeqCst);
429
430        Ok(LogSequenceNumber(current_lsn))
431    }
432
433    /// Get the current durable LSN
434    pub fn durable_lsn(&self) -> LogSequenceNumber {
435        LogSequenceNumber(self.durable_lsn.load(Ordering::SeqCst))
436    }
437
438    /// Get the next LSN that will be allocated
439    pub fn next_lsn(&self) -> LogSequenceNumber {
440        LogSequenceNumber(self.next_lsn.load(Ordering::SeqCst))
441    }
442
443    /// Log a transaction begin
444    pub fn log_begin(&self, txn_id: TransactionId) -> KernelResult<LogSequenceNumber> {
445        let mut record = WalRecord::new(
446            LogSequenceNumber::INVALID,
447            LogSequenceNumber::INVALID,
448            txn_id,
449            WalRecordType::Begin,
450            None,
451            Bytes::new(),
452            Bytes::new(),
453        );
454        self.append(&mut record)
455    }
456
457    /// Log a transaction commit
458    pub fn log_commit(&self, txn_id: TransactionId) -> KernelResult<LogSequenceNumber> {
459        let prev_lsn = self
460            .txn_last_lsn
461            .read()
462            .get(&txn_id)
463            .copied()
464            .unwrap_or(LogSequenceNumber::INVALID);
465        let mut record = WalRecord::new(
466            LogSequenceNumber::INVALID,
467            prev_lsn,
468            txn_id,
469            WalRecordType::Commit,
470            None,
471            Bytes::new(),
472            Bytes::new(),
473        );
474        let lsn = self.append(&mut record)?;
475
476        // Sync on commit for durability
477        self.sync()?;
478
479        // Clean up transaction state
480        self.txn_last_lsn.write().remove(&txn_id);
481
482        Ok(lsn)
483    }
484
485    /// Log a transaction abort
486    pub fn log_abort(&self, txn_id: TransactionId) -> KernelResult<LogSequenceNumber> {
487        let prev_lsn = self
488            .txn_last_lsn
489            .read()
490            .get(&txn_id)
491            .copied()
492            .unwrap_or(LogSequenceNumber::INVALID);
493        let mut record = WalRecord::new(
494            LogSequenceNumber::INVALID,
495            prev_lsn,
496            txn_id,
497            WalRecordType::Abort,
498            None,
499            Bytes::new(),
500            Bytes::new(),
501        );
502        let lsn = self.append(&mut record)?;
503
504        // Clean up transaction state
505        self.txn_last_lsn.write().remove(&txn_id);
506
507        Ok(lsn)
508    }
509
510    /// Log an update operation
511    pub fn log_update(
512        &self,
513        txn_id: TransactionId,
514        page_id: PageId,
515        redo_data: Bytes,
516        undo_data: Bytes,
517    ) -> KernelResult<LogSequenceNumber> {
518        let prev_lsn = self
519            .txn_last_lsn
520            .read()
521            .get(&txn_id)
522            .copied()
523            .unwrap_or(LogSequenceNumber::INVALID);
524        let mut record = WalRecord::new(
525            LogSequenceNumber::INVALID,
526            prev_lsn,
527            txn_id,
528            WalRecordType::Update,
529            Some(page_id),
530            redo_data,
531            undo_data,
532        );
533        self.append(&mut record)
534    }
535
536    /// Log a checkpoint begin
537    pub fn log_checkpoint_begin(&self) -> KernelResult<LogSequenceNumber> {
538        let mut record = WalRecord::new(
539            LogSequenceNumber::INVALID,
540            LogSequenceNumber::INVALID,
541            0, // System transaction
542            WalRecordType::CheckpointBegin,
543            None,
544            Bytes::new(),
545            Bytes::new(),
546        );
547        self.append(&mut record)
548    }
549
550    /// Log a checkpoint end with active transactions
551    pub fn log_checkpoint_end(
552        &self,
553        active_txns: &[TransactionId],
554    ) -> KernelResult<LogSequenceNumber> {
555        // Serialize active transaction list
556        let mut redo_data = BytesMut::with_capacity(active_txns.len() * 8);
557        for &txn_id in active_txns {
558            redo_data.put_u64_le(txn_id);
559        }
560
561        let mut record = WalRecord::new(
562            LogSequenceNumber::INVALID,
563            LogSequenceNumber::INVALID,
564            0, // System transaction
565            WalRecordType::CheckpointEnd,
566            None,
567            redo_data.freeze(),
568            Bytes::new(),
569        );
570        let lsn = self.append(&mut record)?;
571
572        // Sync checkpoint
573        self.sync()?;
574
575        // Update checkpoint LSN
576        self.checkpoint_lsn.store(lsn.0, Ordering::SeqCst);
577
578        Ok(lsn)
579    }
580
581    /// Get last checkpoint LSN
582    pub fn checkpoint_lsn(&self) -> Option<LogSequenceNumber> {
583        let lsn = self.checkpoint_lsn.load(Ordering::SeqCst);
584        if lsn == 0 {
585            None
586        } else {
587            Some(LogSequenceNumber(lsn))
588        }
589    }
590
591    /// Read all records from a given LSN
592    pub fn read_from(&self, start_lsn: LogSequenceNumber) -> KernelResult<Vec<WalRecord>> {
593        // Flush any pending writes first
594        self.flush()?;
595
596        let mut file = self.file.lock();
597        let file_len = file.metadata()?.len();
598
599        if start_lsn.0 >= file_len {
600            return Ok(Vec::new());
601        }
602
603        file.seek(SeekFrom::Start(start_lsn.0))?;
604
605        let mut buffer = vec![0u8; (file_len - start_lsn.0) as usize];
606        file.read_exact(&mut buffer)?;
607
608        let mut records = Vec::new();
609        let mut offset = 0;
610
611        while offset < buffer.len() {
612            match WalRecord::deserialize(&buffer[offset..]) {
613                Ok(record) => {
614                    let size = record.size();
615                    records.push(record);
616                    offset += size;
617                }
618                Err(_) => {
619                    // End of valid records (possibly torn write)
620                    break;
621                }
622            }
623        }
624
625        Ok(records)
626    }
627
628    /// Get WAL file path
629    pub fn path(&self) -> &Path {
630        &self.path
631    }
632
633    /// Truncate WAL up to a given LSN (for space reclamation after checkpoint)
634    pub fn truncate_before(&self, _lsn: LogSequenceNumber) -> KernelResult<()> {
635        // In production, this would copy records after LSN to a new file
636        // and rename. For simplicity, we skip this.
637        Ok(())
638    }
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644    use tempfile::tempdir;
645
646    #[test]
647    fn test_wal_record_serialize_deserialize() {
648        let record = WalRecord::new(
649            LogSequenceNumber(100),
650            LogSequenceNumber(50),
651            1,
652            WalRecordType::Update,
653            Some(42),
654            Bytes::from_static(b"redo data"),
655            Bytes::from_static(b"undo data"),
656        );
657
658        let serialized = record.serialize();
659        let deserialized = WalRecord::deserialize(&serialized).unwrap();
660
661        assert_eq!(record.lsn, deserialized.lsn);
662        assert_eq!(record.prev_lsn, deserialized.prev_lsn);
663        assert_eq!(record.txn_id, deserialized.txn_id);
664        assert_eq!(record.record_type, deserialized.record_type);
665        assert_eq!(record.page_id, deserialized.page_id);
666        assert_eq!(record.redo_data, deserialized.redo_data);
667        assert_eq!(record.undo_data, deserialized.undo_data);
668    }
669
670    #[test]
671    fn test_wal_manager_append_sync() {
672        let dir = tempdir().unwrap();
673        let wal_path = dir.path().join("test.wal");
674
675        let wal = WalManager::open(&wal_path).unwrap();
676
677        // Log begin
678        let lsn1 = wal.log_begin(1).unwrap();
679        assert!(lsn1.is_valid());
680
681        // Log update
682        let lsn2 = wal
683            .log_update(
684                1,
685                100,
686                Bytes::from_static(b"new value"),
687                Bytes::from_static(b"old value"),
688            )
689            .unwrap();
690        assert!(lsn2 > lsn1);
691
692        // Sync
693        let durable = wal.sync().unwrap();
694        assert!(durable >= lsn2);
695    }
696
697    #[test]
698    fn test_wal_recovery() {
699        let dir = tempdir().unwrap();
700        let wal_path = dir.path().join("test.wal");
701
702        // Write some records
703        let first_lsn = {
704            let wal = WalManager::open(&wal_path).unwrap();
705            let lsn = wal.log_begin(1).unwrap();
706            wal.log_update(1, 100, Bytes::from_static(b"data"), Bytes::new())
707                .unwrap();
708            wal.log_commit(1).unwrap();
709            lsn
710        };
711
712        // Reopen and read
713        {
714            let wal = WalManager::open(&wal_path).unwrap();
715            let records = wal.read_from(first_lsn).unwrap();
716
717            assert!(
718                records.len() >= 3,
719                "Expected at least 3 records, got {}",
720                records.len()
721            );
722            assert_eq!(records[0].record_type, WalRecordType::Begin);
723            assert_eq!(records[1].record_type, WalRecordType::Update);
724            assert_eq!(records[2].record_type, WalRecordType::Commit);
725        }
726    }
727}