kaya-wal 0.1.43

Write-ahead log: CRC32C-protected record codec, append-only writer, crash-safe recovery, and inspector for KayaDB
Documentation
use std::sync::Arc;

use kaya_core::{KayaError, Lsn, Result, WalConfig};
use kaya_io::{Disk, RelativePath};

use crate::codec::DecodeRecordResult;
use crate::writer::parse_segment_id;
use crate::{decode_record, SegmentId, WalRecord, WalWarning};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RecoveredRecord {
    pub segment_id: SegmentId,
    pub offset: u64,
    pub record: WalRecord,
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct WalRecoveryReport {
    pub records: Vec<RecoveredRecord>,
    pub last_lsn: Option<Lsn>,
    pub valid_bytes: u64,
    pub truncated_bytes: u64,
    pub warnings: Vec<WalWarning>,
}

pub async fn recover_wal<D: Disk>(config: WalConfig, disk: Arc<D>) -> Result<WalRecoveryReport> {
    let wal_dir = RelativePath::new("wal")?;
    let mut segments = disk
        .list_dir(&wal_dir)
        .await?
        .into_iter()
        .filter_map(|entry| parse_segment_id(&entry.path).map(|id| (SegmentId(id), entry.path)))
        .collect::<Vec<_>>();
    segments.sort_by_key(|(segment_id, _)| segment_id.0);

    let mut report = WalRecoveryReport::default();
    let mut expected_lsn = None::<u64>;
    let mut stop_index = None::<usize>;

    for (index, (segment_id, path)) in segments.iter().enumerate() {
        let len = match disk.file_len(path).await {
            Ok(len) => len,
            Err(KayaError::NotFound) => 0,
            Err(error) => return Err(error),
        };
        if len == 0 {
            continue;
        }

        let mut bytes = vec![0_u8; len as usize];
        let read = disk.read_at(path, 0, &mut bytes).await?;
        bytes.truncate(read);

        let mut offset = 0_usize;
        while offset < bytes.len() {
            match decode_record(&bytes[offset..], offset as u64, config.max_record_bytes) {
                DecodeRecordResult::Complete { record, bytes_read } => {
                    if let Some(expected) = expected_lsn {
                        if record.lsn.get() != expected {
                            report.warnings.push(WalWarning::NonMonotonicLsn {
                                offset: offset as u64,
                                expected,
                                found: record.lsn.get(),
                            });
                            stop_index = Some(index);
                            break;
                        }
                    }
                    expected_lsn = Some(record.lsn.next().get());
                    report.last_lsn = Some(record.lsn);
                    report.records.push(RecoveredRecord {
                        segment_id: *segment_id,
                        offset: offset as u64,
                        record,
                    });
                    offset += bytes_read;
                }
                DecodeRecordResult::Incomplete { warning }
                | DecodeRecordResult::Invalid { warning } => {
                    report.warnings.push(warning);
                    stop_index = Some(index);
                    break;
                }
            }
        }

        report.valid_bytes += offset as u64;

        if stop_index == Some(index) {
            let valid_len = offset as u64;
            if len > valid_len {
                disk.truncate(path, valid_len).await?;
                let truncated = len - valid_len;
                report.truncated_bytes += truncated;
                report.warnings.push(WalWarning::TailTruncated {
                    path: path.as_str().to_owned(),
                    valid_len,
                    truncated_bytes: truncated,
                });
            }
            break;
        }
    }

    if let Some(index) = stop_index {
        let ignored = segments.len().saturating_sub(index + 1);
        if ignored > 0 {
            report
                .warnings
                .push(WalWarning::TrailingSegmentsIgnored { count: ignored });
        }
    }

    Ok(report)
}