kaya-wal 0.1.22

Write-ahead log: CRC32C-protected record codec, append-only writer, crash-safe recovery, and inspector for KayaDB
Documentation
use std::sync::Arc;

use kaya_core::{DurabilityMode, KayaError, Lsn, Result, SequenceNumber, WalConfig};
use kaya_io::{Disk, RelativePath};

use crate::{encode_record, WalPayload, WalRecord};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SegmentId(pub u64);

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppendResult {
    pub lsn: Lsn,
    pub sequence: SequenceNumber,
    pub segment_id: SegmentId,
    pub offset: u64,
    pub encoded_len: u32,
    pub durable: bool,
}

#[derive(Debug)]
pub struct WalWriter<D: Disk> {
    disk: Arc<D>,
    config: WalConfig,
    active_segment_id: SegmentId,
    active_path: RelativePath,
    active_len: u64,
    next_lsn: Lsn,
    next_sequence: SequenceNumber,
}

impl<D: Disk> WalWriter<D> {
    pub async fn open(config: WalConfig, disk: Arc<D>) -> Result<Self> {
        Self::open_at(config, disk, Lsn::FIRST, SequenceNumber::FIRST).await
    }

    pub async fn open_at(
        config: WalConfig,
        disk: Arc<D>,
        next_lsn: Lsn,
        next_sequence: SequenceNumber,
    ) -> Result<Self> {
        let wal_dir = RelativePath::new("wal")?;
        let segments = disk.list_dir(&wal_dir).await?;
        let active_segment_id = segments
            .iter()
            .filter_map(|entry| parse_segment_id(&entry.path))
            .max()
            .map(SegmentId)
            .unwrap_or(SegmentId(1));
        let active_path = segment_path(active_segment_id)?;
        let active_len = match disk.file_len(&active_path).await {
            Ok(len) => len,
            Err(KayaError::NotFound) => 0,
            Err(error) => return Err(error),
        };

        Ok(Self {
            disk,
            config,
            active_segment_id,
            active_path,
            active_len,
            next_lsn,
            next_sequence,
        })
    }

    pub async fn append(
        &mut self,
        payload: WalPayload,
        mode: DurabilityMode,
    ) -> Result<AppendResult> {
        let record = WalRecord::new(self.next_lsn, self.next_sequence, payload);
        let encoded = encode_record(&record)?;
        let encoded_len = u32::try_from(encoded.len()).map_err(|_| {
            KayaError::invalid_argument("encoded WAL record length does not fit into u32")
        })?;
        if encoded_len > self.config.max_record_bytes {
            return Err(KayaError::invalid_argument(format!(
                "encoded WAL record exceeds configured max: {encoded_len} > {}",
                self.config.max_record_bytes
            )));
        }

        if self.active_len > 0
            && self.active_len + u64::from(encoded_len) > self.config.segment_max_bytes
        {
            self.rotate().await?;
        }

        let offset = self.disk.append(&self.active_path, &encoded).await?;
        let durable = match mode {
            DurabilityMode::Strict => {
                self.disk.fsync_file(&self.active_path).await?;
                true
            }
            DurabilityMode::Relaxed => false,
        };

        self.active_len = offset + u64::from(encoded_len);
        let result = AppendResult {
            lsn: self.next_lsn,
            sequence: self.next_sequence,
            segment_id: self.active_segment_id,
            offset,
            encoded_len,
            durable,
        };
        self.next_lsn = self.next_lsn.next();
        self.next_sequence = self.next_sequence.next();
        Ok(result)
    }

    async fn rotate(&mut self) -> Result<()> {
        self.active_segment_id = SegmentId(self.active_segment_id.0 + 1);
        self.active_path = segment_path(self.active_segment_id)?;
        self.active_len = 0;
        self.disk.fsync_dir(&RelativePath::new("wal")?).await?;
        Ok(())
    }
}

pub fn segment_path(segment_id: SegmentId) -> Result<RelativePath> {
    RelativePath::new(format!("wal/{:016x}.wal", segment_id.0))
}

pub fn parse_segment_id(path: &RelativePath) -> Option<u64> {
    let name = path.file_name()?;
    let hex = name.strip_suffix(".wal")?;
    u64::from_str_radix(hex, 16).ok()
}