modelvault-core 0.14.0

Core engine for ModelVault — application-focused embedded storage with model schemas, validation, and migrations.
Documentation
//! Open and bootstrap: decode header, superblocks, manifest, and initial segment scan.

use std::collections::HashMap;
use std::path::PathBuf;

use crate::config::{OpenOptions, RecoveryMode};
use crate::error::{DbError, FormatError};
use crate::file_format::{decode_header, FileHeader, FILE_HEADER_SIZE};
use crate::manifest::decode_manifest_v0;
use crate::publish::append_manifest_and_publish;
use crate::segments::header::SegmentType;
use crate::segments::reader::read_segment_header_at;
use crate::segments::reader::read_segment_payload;
use crate::storage::Store;
use crate::superblock::{decode_superblock, Superblock, SUPERBLOCK_SIZE};

use super::recover;
use super::replay;
use super::Database;

#[rustfmt::skip]
pub(crate) fn init_superblocks(store: &mut impl Store, segment_start: u64) -> Result<(), DbError> {
    store.write_all_at(segment_start - 1, &[0u8])?; store.write_all_at(FILE_HEADER_SIZE as u64, &Superblock { generation: 1, ..Superblock::empty() }.encode())?; store.write_all_at((FILE_HEADER_SIZE + SUPERBLOCK_SIZE) as u64, &Superblock::empty().encode())?; Ok(())
}

#[rustfmt::skip]
pub(crate) fn read_and_select_superblock(store: &mut impl Store) -> Result<Superblock, DbError> {
    let mut a = [0u8; SUPERBLOCK_SIZE];
    let mut b = [0u8; SUPERBLOCK_SIZE];
    store.read_exact_at(FILE_HEADER_SIZE as u64, &mut a)?; store.read_exact_at((FILE_HEADER_SIZE + SUPERBLOCK_SIZE) as u64, &mut b)?;
    let sa = decode_superblock(&a).ok();
    let sb = decode_superblock(&b).ok();
    match (sa, sb) {
        (Some(sa), Some(sb)) => Ok(if sa.generation >= sb.generation { sa } else { sb }),
        (Some(sa), None) => Ok(sa),
        (None, Some(sb)) => Ok(sb),
        (None, None) => Err(DbError::Format(FormatError::BadSuperblockChecksum)),
    }
}

#[rustfmt::skip]
pub(crate) fn read_manifest(store: &mut impl Store, sb: &Superblock) -> Result<(), DbError> {
    let (_, header) = read_segment_header_at(store, sb.manifest_offset)?;
    if header.segment_type != SegmentType::Manifest {
        return Err(DbError::Format(FormatError::UnsupportedVersion { major: 0, minor: 0 }));
    }
    let mut payload = vec![0u8; header.payload_len as usize];
    store.read_exact_at(sb.manifest_offset + crate::segments::header::SEGMENT_HEADER_LEN as u64, &mut payload)?; let _m = decode_manifest_v0(&payload)?;
    Ok(())
}

#[rustfmt::skip]
fn try_load_checkpoint_state(
    store: &mut impl Store,
    sb: &Superblock,
    segment_start: u64,
) -> Result<
    Option<(
        u64,
        crate::catalog::Catalog,
        super::LatestMap,
        crate::index::IndexState,
    )>,
    DbError,
> {
    if sb.checkpoint_offset == 0 || sb.checkpoint_len == 0 { return Ok(None); }
    let file_len = store.len()?;
    if sb.checkpoint_offset < segment_start || sb.checkpoint_offset >= file_len { return Ok(None); }
    let (_, header) = read_segment_header_at(store, sb.checkpoint_offset)?;
    if header.segment_type != SegmentType::Checkpoint { return Ok(None); }
    let meta = crate::segments::reader::SegmentMeta { offset: sb.checkpoint_offset, header };
    let payload = read_segment_payload(store, &meta)?;
    let crc = crate::checksum::crc32c(&payload);
    if crc != header.payload_crc32c { return Err(DbError::Format(FormatError::BadSegmentPayloadChecksum)); }
    let checkpoint_end = sb.checkpoint_offset
        + crate::segments::header::SEGMENT_HEADER_LEN as u64
        + header.payload_len;
    let (replay_from, catalog, latest, indexes) = crate::checkpoint::state_from_checkpoint_payload(&payload)?;
    if replay_from < checkpoint_end {
        return Err(DbError::Format(FormatError::InvalidCheckpointPayload {
            message: format!(
                "replay_from_offset {replay_from} is before checkpoint segment end {checkpoint_end}"
            ),
        }));
    }
    Ok(Some((replay_from, catalog, latest, indexes)))
}

#[rustfmt::skip]
pub(crate) fn open_with_store<S: Store>(
    path: PathBuf,
    mut store: S,
    opts: OpenOptions,
) -> Result<Database<S>, DbError> {
    #[cfg(feature = "tracing")] tracing::info!(path = %path.display(), "open_with_store_begin");
    let len = store.len()?;
    let segment_start = (FILE_HEADER_SIZE + 2 * SUPERBLOCK_SIZE) as u64;
    let format_minor;
    if len == 0 {
        let header = FileHeader::new_v0_8();
        format_minor = header.format_minor;
        store.write_all_at(0, &header.encode())?;
        init_superblocks(&mut store, segment_start)?;
        let _ = append_manifest_and_publish(&mut store, segment_start)?;
        store.sync()?;
    } else if len < FILE_HEADER_SIZE as u64 {
        return Err(DbError::Format(FormatError::TruncatedHeader { got: len as usize, expected: FILE_HEADER_SIZE }));
    } else {
        let mut buf = [0u8; FILE_HEADER_SIZE];
        store.read_exact_at(0, &mut buf)?;
        let header = decode_header(&buf)?;
        if header.format_minor == 2 {
            if len == FILE_HEADER_SIZE as u64 {
                let upgraded = FileHeader::new_v0_3();
                store.write_all_at(0, &upgraded.encode())?;
                init_superblocks(&mut store, segment_start)?;
                let _ = append_manifest_and_publish(&mut store, segment_start)?;
                store.sync()?;
                format_minor = crate::file_format::FORMAT_MINOR_V3;
            } else {
                return Err(DbError::Format(FormatError::UnsupportedVersion { major: header.format_major, minor: header.format_minor }));
            }
        } else {
            if len < segment_start {
                return Err(DbError::Format(FormatError::TruncatedSuperblock { got: len as usize, expected: segment_start as usize }));
            }
            let selected = read_and_select_superblock(&mut store)?;
            if selected.manifest_offset != 0 {
                if let Err(e) = read_manifest(&mut store, &selected) {
                    match opts.recovery {
                        RecoveryMode::Strict => return Err(e),
                        RecoveryMode::AutoTruncate => {}
                    }
                }
            }
            format_minor = header.format_minor;
            let (truncate_to, reason) = recover::truncate_end_for_recovery(&mut store, segment_start, format_minor)?;
            match opts.recovery {
                RecoveryMode::Strict => {
                    if let Some(reason) = reason {
                        let flen = store.len()?;
                        if truncate_to < flen { return Err(DbError::Format(FormatError::UncleanLogTail { safe_end: truncate_to, reason })); }
                    }
                }
                RecoveryMode::AutoTruncate => {
                    let flen = store.len()?;
                    if truncate_to < flen { store.truncate(truncate_to)?; store.sync()?; }
                }
            }
        }
    }
    let flen = store.len()?;
    let (mut catalog, mut latest, mut indexes, replay_from) = if flen == 0 {
        (crate::catalog::Catalog::default(), HashMap::new(), crate::index::IndexState::default(), segment_start)
    } else {
        let selected = read_and_select_superblock(&mut store)?;
        match try_load_checkpoint_state(&mut store, &selected, segment_start) {
            Ok(Some((from, cat, lat, idx))) => (cat, lat, idx, from),
            Ok(None) => {
                let (cat, lat, idx) = replay::load_catalog_latest_and_indexes(&mut store, segment_start, format_minor)?;
                (cat, lat, idx, store.len()?)
            }
            Err(e @ DbError::Format(FormatError::InvalidCheckpointPayload { .. })) => return Err(e),
            Err(e) => match opts.recovery {
                RecoveryMode::Strict => return Err(e),
                RecoveryMode::AutoTruncate => {
                    let (cat, lat, idx) = replay::load_catalog_latest_and_indexes(&mut store, segment_start, format_minor)?;
                    (cat, lat, idx, store.len()?)
                }
            },
        }
    };
    if flen != 0 && replay_from < store.len()? && replay_from >= segment_start {
        replay::replay_tail_into(&mut store, replay_from, format_minor, &mut catalog, &mut latest, &mut indexes)?;
    }
    #[cfg(feature = "tracing")] tracing::info!(path = %path.display(), format_minor = format_minor, "open_with_store_ok");
    Ok(Database {
        path,
        store,
        catalog,
        segment_start,
        format_minor,
        latest,
        indexes,
        txn_seq: 0,
        txn_staging: None,
        #[cfg(test)]
        test_poison_planned_replace_row: None,
        #[cfg(test)]
        test_poison_delete_encode_scalar: None,
    })
}

#[cfg(test)]
mod tests {
    include!(concat!(
        env!("CARGO_MANIFEST_DIR"),
        "/tests/unit/src_db_open_tests.rs"
    ));
}