aedb 0.1.11

Embedded Rust storage engine with transactional commits, WAL durability, and snapshot-consistent reads
Documentation
use crate::catalog::Catalog;
use crate::commit::apply::apply_mutation;
use crate::commit::tx::{IdempotencyKey, IdempotencyRecord, WalCommitPayload};
use crate::error::AedbError;
use crate::recovery::scanner::validated_hash_chain_prefix_len;
use crate::storage::keyspace::Keyspace;
use crate::wal::frame::{FrameError, FrameReader};
use crate::wal::segment::SEGMENT_HEADER_SIZE;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::PathBuf;
use tracing::warn;

#[allow(clippy::too_many_arguments)]
pub fn replay_segments(
    segments: &[PathBuf],
    from_seq_exclusive: u64,
    to_seq_inclusive: Option<u64>,
    segment_replay_byte_limits: Option<&HashMap<PathBuf, u64>>,
    hash_chain_required: bool,
    strict_recovery: bool,
    keyspace: &mut Keyspace,
    catalog: &mut Catalog,
    idempotency: &mut HashMap<IdempotencyKey, IdempotencyRecord>,
) -> Result<u64, AedbError> {
    let valid_segment_count =
        validated_hash_chain_prefix_len(segments, hash_chain_required, strict_recovery)?;
    let replay_segments = &segments[..valid_segment_count];

    let mut max_seq = from_seq_exclusive;
    let mut last_applied_seq = from_seq_exclusive;
    for segment in replay_segments {
        let file = File::open(segment)?;
        let segment_size_bytes = file.metadata()?.len();
        let replay_size_bytes = segment_replay_byte_limits
            .and_then(|limits| limits.get(segment).copied())
            .unwrap_or(segment_size_bytes)
            .min(segment_size_bytes);
        debug_assert!(replay_size_bytes <= segment_size_bytes);
        if replay_size_bytes <= SEGMENT_HEADER_SIZE as u64 {
            continue;
        }
        let mut reader = BufReader::with_capacity(64 * 1024, file);
        let mut header = [0u8; SEGMENT_HEADER_SIZE];
        reader.read_exact(&mut header)?;
        let payload_size_bytes = replay_size_bytes.saturating_sub(SEGMENT_HEADER_SIZE as u64);
        let mut frame_reader = FrameReader::new(reader.take(payload_size_bytes));
        loop {
            match frame_reader.next_frame() {
                Ok(Some(frame)) => {
                    if frame.commit_seq <= from_seq_exclusive {
                        continue;
                    }
                    if let Some(to_seq) = to_seq_inclusive
                        && frame.commit_seq > to_seq
                    {
                        continue;
                    }
                    if frame.commit_seq <= last_applied_seq {
                        return Err(AedbError::Validation(
                            "non-monotonic wal commit_seq during replay".into(),
                        ));
                    }
                    let applied = apply_payload(
                        frame.payload_type,
                        &frame.payload,
                        frame.commit_seq,
                        frame.timestamp_micros,
                        keyspace,
                        catalog,
                        idempotency,
                    );
                    match applied {
                        Ok(()) => {
                            last_applied_seq = frame.commit_seq;
                            max_seq = max_seq.max(frame.commit_seq);
                        }
                        Err(err) if strict_recovery => return Err(err),
                        Err(err) => {
                            warn!(
                                segment = %segment.display(),
                                commit_seq = frame.commit_seq,
                                error = ?err,
                                "recovery: skipping invalid wal frame in permissive mode"
                            );
                            continue;
                        }
                    }
                }
                Ok(None) => break,
                Err(FrameError::Truncation) => break,
                Err(FrameError::Corruption) => {
                    if strict_recovery {
                        return Err(AedbError::Validation(
                            "wal frame corruption detected during replay".into(),
                        ));
                    }
                    warn!(
                        segment = %segment.display(),
                        "recovery: encountered wal frame corruption; skipping remainder of segment in permissive mode"
                    );
                    break;
                }
                Err(FrameError::Io(e)) => return Err(AedbError::Io(std::io::Error::other(e))),
            }
        }
    }
    Ok(max_seq)
}

fn apply_payload(
    payload_type: u8,
    payload: &[u8],
    commit_seq: u64,
    timestamp_micros: u64,
    keyspace: &mut Keyspace,
    catalog: &mut Catalog,
    idempotency: &mut HashMap<IdempotencyKey, IdempotencyRecord>,
) -> Result<(), AedbError> {
    match payload_type {
        0x01 | 0x02 | 0x04 => {
            let wal_payload = decode_wal_payload(payload)?;
            for mutation in wal_payload.mutations {
                apply_mutation(catalog, keyspace, mutation, commit_seq, None, None)?;
            }
            if let Some(key) = wal_payload.idempotency_key {
                idempotency.insert(
                    key,
                    IdempotencyRecord {
                        commit_seq,
                        recorded_at_micros: timestamp_micros,
                        request_fingerprint: wal_payload.request_fingerprint,
                    },
                );
            }
        }
        _ => {
            return Err(AedbError::Decode(format!(
                "unknown payload type: {payload_type}"
            )));
        }
    }
    Ok(())
}

fn decode_wal_payload(payload: &[u8]) -> Result<WalCommitPayload, AedbError> {
    rmp_serde::from_slice(payload).map_err(|e| AedbError::Decode(e.to_string()))
}