icydb-core 0.180.0

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
use super::{
    FoldWatermark, JournalBatch, JournalRecord, JournalSequence, JournalTailStore,
    JournalTailVisit,
    codec::{
        JOURNAL_BATCH_FORMAT_VERSION_CURRENT, MAX_JOURNAL_BATCH_BYTES, RawJournalBatch,
        decode_journal_batch, encode_journal_batch,
    },
};
use crate::{
    db::data::{DecodedDataStoreKey, RawDataStoreKey},
    error::{ErrorClass, ErrorOrigin},
    testing::test_memory,
    traits::Storable,
    types::EntityTag,
};
use ic_memory::stable_structures::storable::Bound;
use std::borrow::Cow;

fn raw_data_store_key(fill: u64) -> RawDataStoreKey {
    DecodedDataStoreKey::try_from_typed_key(EntityTag::new(1), &fill)
        .expect("test key should encode")
        .to_raw()
        .expect("test key should materialize")
}

fn row_put_record(fill: u64) -> JournalRecord {
    let fill_byte = u8::try_from(fill).expect("test fill should fit u8");
    JournalRecord::row_put(
        "test::Entity",
        raw_data_store_key(fill),
        vec![fill_byte; 3],
        [0x11; 16],
    )
    .expect("row put record should build")
}

fn row_delete_record(fill: u64) -> JournalRecord {
    JournalRecord::row_delete("test::Entity", raw_data_store_key(fill), [0x22; 16])
        .expect("row delete record should build")
}

fn schema_put_record(fill: u8) -> JournalRecord {
    JournalRecord::schema_put("test::Store", vec![fill; 8]).expect("schema put record should build")
}

fn batch(sequence: u64) -> JournalBatch {
    let sequence_byte = u8::try_from(sequence).expect("test sequence should fit u8");
    JournalBatch::new(
        [sequence_byte; 16],
        [0xAA; 16],
        JournalSequence::new(sequence),
        vec![
            row_put_record(sequence),
            row_delete_record(sequence + 100),
            schema_put_record(sequence_byte),
        ],
    )
    .expect("journal batch should build")
}

#[test]
fn journal_batch_codec_round_trips_logical_row_and_schema_records() {
    let batch = batch(1);
    let encoded = encode_journal_batch(&batch).expect("journal batch should encode");
    let decoded = decode_journal_batch(&encoded).expect("journal batch should decode");

    assert_eq!(decoded, batch);
    assert_eq!(decoded.records().len(), 3);
}

#[test]
fn journal_batch_decode_rejects_future_version() {
    let mut encoded = encode_journal_batch(&batch(1)).expect("journal batch should encode");
    encoded[4] = JOURNAL_BATCH_FORMAT_VERSION_CURRENT.saturating_add(1);

    let err =
        decode_journal_batch(&encoded).expect_err("future journal batch versions must fail closed");

    assert_eq!(err.class, ErrorClass::IncompatiblePersistedFormat);
    assert_eq!(err.origin, ErrorOrigin::Serialize);
}

#[test]
fn journal_batch_decode_rejects_trailing_bytes() {
    let mut encoded = encode_journal_batch(&batch(1)).expect("journal batch should encode");
    encoded.push(0xFF);

    let err = decode_journal_batch(&encoded).expect_err("trailing journal batch bytes should fail");

    assert_eq!(err.class, ErrorClass::Corruption);
    assert_eq!(err.origin, ErrorOrigin::Store);
}

#[test]
fn journal_batch_codec_rejects_oversized_row_payload() {
    let err = JournalRecord::row_put(
        "test::Entity",
        raw_data_store_key(7),
        vec![0u8; crate::db::codec::MAX_ROW_BYTES as usize + 1],
        [0x33; 16],
    )
    .expect_err("oversized row payload should fail");

    assert_eq!(err.class, ErrorClass::Corruption);
    assert_eq!(err.origin, ErrorOrigin::Store);
}

#[test]
fn journal_sequence_uses_big_endian_storable_order() {
    let low = JournalSequence::new(9);
    let high = JournalSequence::new(10);
    let low_bytes = low.to_bytes().into_owned();

    assert!(low.to_bytes() < high.to_bytes());
    assert_eq!(JournalSequence::from_bytes(Cow::Owned(low_bytes)), low);
}

#[test]
fn journal_tail_store_visits_batches_in_sequence_order_after_watermark() {
    let mut store = JournalTailStore::init(test_memory(210));
    store
        .append_batch(&batch(1))
        .expect("first batch should append");
    store
        .append_batch(&batch(2))
        .expect("second batch should append");

    let mut visited = Vec::new();
    store
        .visit_batches_after(JournalSequence::new(0), |batch| {
            visited.push(batch.journal_sequence().get());
            Ok(JournalTailVisit::Continue)
        })
        .expect("journal tail should visit in order");

    assert_eq!(visited, vec![1, 2]);
    assert_eq!(store.len(), 2);
}

#[test]
fn journal_tail_store_skips_batches_at_or_below_watermark() {
    let mut store = JournalTailStore::init(test_memory(211));
    store.append_batch(&batch(1)).expect("batch should append");
    store.append_batch(&batch(2)).expect("batch should append");

    let mut visited = Vec::new();
    store
        .visit_batches_after(JournalSequence::new(1), |batch| {
            visited.push(batch.journal_sequence().get());
            Ok(JournalTailVisit::Continue)
        })
        .expect("journal tail should skip folded batch");

    assert_eq!(visited, vec![2]);
}

#[test]
fn journal_tail_store_persists_fold_watermark_without_counting_it_as_tail_batch() {
    let mut store = JournalTailStore::init(test_memory(216));
    store.append_batch(&batch(1)).expect("batch should append");
    store.append_batch(&batch(2)).expect("batch should append");

    store
        .persist_fold_watermark(FoldWatermark::new(JournalSequence::new(2), 1))
        .expect("fold watermark should persist");

    let watermark = store
        .fold_watermark()
        .expect("fold watermark should be readable");
    assert_eq!(watermark.highest_folded_journal_sequence().get(), 2);
    assert_eq!(watermark.fold_epoch(), 1);
    assert_eq!(store.len(), 2);
}

#[test]
fn journal_tail_store_cleanup_keeps_watermark_as_replay_boundary() {
    let mut store = JournalTailStore::init(test_memory(217));
    store.append_batch(&batch(1)).expect("batch should append");
    store.append_batch(&batch(2)).expect("batch should append");
    store
        .persist_fold_watermark(FoldWatermark::new(JournalSequence::new(2), 1))
        .expect("fold watermark should persist");

    store.clear_batches_through(JournalSequence::new(2));

    let mut visited = Vec::new();
    store
        .visit_batches_after(
            store
                .fold_watermark()
                .expect("fold watermark should be readable")
                .highest_folded_journal_sequence(),
            |batch| {
                visited.push(batch.journal_sequence().get());
                Ok(JournalTailVisit::Continue)
            },
        )
        .expect("folded tail should read as empty replay tail");

    assert_eq!(visited, Vec::<u64>::new());
    assert_eq!(store.len(), 0);
    assert_eq!(
        store
            .next_append_sequence()
            .expect("next append sequence should account for watermark")
            .get(),
        3,
    );
}

#[test]
fn journal_tail_store_treats_identical_duplicate_append_as_idempotent() {
    let mut store = JournalTailStore::init(test_memory(212));
    let batch = batch(1);
    store.append_batch(&batch).expect("batch should append");
    store
        .append_batch(&batch)
        .expect("same batch append should be idempotent");

    assert_eq!(store.len(), 1);
}

#[test]
fn journal_tail_store_rejects_batch_at_fold_watermark_control_sequence() {
    let mut store = JournalTailStore::init(test_memory(218));
    let control_sequence_batch = JournalBatch::new(
        [0x01; 16],
        [0xAA; 16],
        JournalSequence::new(0),
        vec![row_put_record(1)],
    )
    .expect("control-sequence batch shape should build before tail append rejects it");

    let err = store
        .append_batch(&control_sequence_batch)
        .expect_err("sequence zero is reserved for fold-watermark control");

    assert_eq!(err.class, ErrorClass::Corruption);
    assert_eq!(err.origin, ErrorOrigin::Store);
}

#[test]
fn journal_tail_store_rejects_sequence_gap_above_watermark() {
    let mut store = JournalTailStore::init(test_memory(213));
    store.append_batch(&batch(2)).expect("batch should append");

    let err = store
        .visit_batches_after(JournalSequence::new(0), |_| Ok(JournalTailVisit::Continue))
        .expect_err("sequence gap should fail closed");

    assert_eq!(err.class, ErrorClass::Corruption);
    assert_eq!(err.origin, ErrorOrigin::Store);
}

#[test]
fn journal_tail_store_rejects_duplicate_batch_id_at_different_sequence() {
    let mut store = JournalTailStore::init(test_memory(214));
    let first = batch(1);
    let duplicate_id = JournalBatch::new(
        first.batch_id(),
        [0xAA; 16],
        JournalSequence::new(2),
        vec![row_put_record(2)],
    )
    .expect("duplicate id batch shape should build");
    store
        .append_batch(&first)
        .expect("first batch should append");
    store
        .append_batch(&duplicate_id)
        .expect("duplicate id at different sequence is detected during ordered read");

    let err = store
        .visit_batches_after(JournalSequence::new(0), |_| Ok(JournalTailVisit::Continue))
        .expect_err("duplicate batch ids above watermark should fail closed");

    assert_eq!(err.class, ErrorClass::Corruption);
    assert_eq!(err.origin, ErrorOrigin::Store);
}

#[test]
fn journal_tail_store_is_empty_before_append() {
    let store = JournalTailStore::init(test_memory(215));

    assert!(store.is_empty());
}

#[test]
fn journal_batch_storable_bound_matches_runtime_batch_limit() {
    match RawJournalBatch::BOUND {
        Bound::Bounded { max_size, .. } => assert_eq!(max_size, MAX_JOURNAL_BATCH_BYTES),
        Bound::Unbounded => panic!("journal batches must remain bounded"),
    }
}