icydb-core 0.171.0

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
use super::*;
use crate::{db::StoreSnapshotStorageMode, metrics::sink::MutationCommitClass};

fn mutation_commit_classes_for_entity(
    events: &[MetricsEvent],
    entity_path: &'static str,
) -> Vec<MutationCommitClass> {
    events
        .iter()
        .filter_map(|event| match event {
            MetricsEvent::MutationCommitPlan {
                entity_path: path,
                class,
            } if *path == entity_path => Some(*class),
            _ => None,
        })
        .collect()
}

fn capture_mutation_commit_classes<R>(
    entity_path: &'static str,
    run: impl FnOnce() -> R,
) -> (R, Vec<MutationCommitClass>) {
    let sink = SessionMetricsCaptureSink::default();
    let output = with_metrics_sink(&sink, run);
    let classes = mutation_commit_classes_for_entity(&sink.into_events(), entity_path);

    (output, classes)
}

fn public_projection_rows<E>(session: &DbSession<SessionSqlCanister>, sql: &str) -> Vec<Vec<Value>>
where
    E: PersistedRow<Canister = SessionSqlCanister> + EntityValue,
{
    let result = session
        .execute_sql_query::<E>(sql)
        .unwrap_or_else(|err| panic!("public SQL query should succeed: {sql}: {err}"));

    let SqlStatementResult::Projection { rows, .. } = result else {
        panic!("public SQL query should emit projection rows: {sql}");
    };

    rows.into_iter()
        .map(|row| row.into_iter().map(runtime_output).collect())
        .collect()
}

fn public_explain_text<E>(session: &DbSession<SessionSqlCanister>, sql: &str) -> String
where
    E: PersistedRow<Canister = SessionSqlCanister> + EntityValue,
{
    let result = session
        .execute_sql_query::<E>(sql)
        .unwrap_or_else(|err| panic!("public EXPLAIN query should succeed: {sql}: {err}"));

    let SqlStatementResult::Explain(explain) = result else {
        panic!("public EXPLAIN query should emit explain text: {sql}");
    };

    explain
}

fn seed_heap_session_entities(session: &DbSession<SessionSqlCanister>) {
    for (id, name, age) in [(1, "Atlas", 20), (2, "Beryl", 30), (3, "Cato", 40)] {
        session
            .insert(HeapSessionSqlEntity {
                id,
                name: name.to_string(),
                age,
            })
            .expect("heap typed insert should succeed while live");
    }
}

fn heap_snapshot_counts(
    session: &DbSession<SessionSqlCanister>,
) -> (u64, u64, u64, Option<u32>, Option<String>) {
    let report = session
        .storage_report_default()
        .expect("heap storage report should build");
    let data = report
        .storage_data()
        .iter()
        .find(|snapshot| snapshot.path() == HeapSessionSqlStore::PATH)
        .expect("heap data snapshot should be present");
    let index = report
        .storage_index()
        .iter()
        .find(|snapshot| snapshot.path() == HeapSessionSqlStore::PATH)
        .expect("heap index snapshot should be present");
    let schema = report
        .schema_storage()
        .iter()
        .find(|snapshot| snapshot.path() == HeapSessionSqlStore::PATH)
        .expect("heap schema snapshot should be present");

    assert_eq!(data.storage(), StoreSnapshotStorageMode::Heap);
    assert_eq!(data.memory_id(), None);
    assert_eq!(index.storage(), StoreSnapshotStorageMode::Heap);
    assert_eq!(index.memory_id(), None);
    assert_eq!(schema.storage(), StoreSnapshotStorageMode::Heap);
    assert_eq!(schema.memory_id(), None);

    (
        data.entries(),
        index.entries(),
        schema.entity_count(),
        schema.schema_version(),
        schema.schema_fingerprint().map(ToOwned::to_owned),
    )
}

#[test]
fn heap_backed_session_write_read_and_index_query_round_trip_while_live() {
    reset_heap_session_sql_store();
    let session = heap_sql_session();
    seed_heap_session_entities(&session);

    let loaded = session
        .load::<HeapSessionSqlEntity>()
        .order_term(crate::db::asc("id"))
        .execute()
        .and_then(crate::db::LoadQueryResult::into_rows)
        .expect("heap fluent load should read live rows")
        .entities();
    assert_eq!(
        loaded
            .iter()
            .map(|entity| (entity.id, entity.name.as_str(), entity.age))
            .collect::<Vec<_>>(),
        vec![(1, "Atlas", 20), (2, "Beryl", 30), (3, "Cato", 40)],
    );

    let rows = public_projection_rows::<HeapSessionSqlEntity>(
        &session,
        "SELECT name, age FROM HeapSessionSqlEntity \
         WHERE name >= 'B' AND name < 'D' \
         ORDER BY name ASC",
    );
    assert_eq!(
        rows,
        vec![
            vec![Value::Text("Beryl".to_string()), Value::Nat64(30)],
            vec![Value::Text("Cato".to_string()), Value::Nat64(40)],
        ],
    );

    let explain = public_explain_text::<HeapSessionSqlEntity>(
        &session,
        "EXPLAIN EXECUTION SELECT name \
         FROM HeapSessionSqlEntity \
         WHERE name >= 'B' AND name < 'D' \
         ORDER BY name ASC",
    );
    assert!(
        explain.contains("access_strategy=IndexRange(name)"),
        "heap indexed query should keep the secondary-index route: {explain}",
    );
    assert!(
        !explain.contains("access=FullScan"),
        "heap indexed query should not collapse to a full scan: {explain}",
    );

    let (data_entries, index_entries, schema_entities, schema_version, schema_fingerprint) =
        heap_snapshot_counts(&session);
    assert_eq!(data_entries, 3);
    assert_eq!(index_entries, 3);
    assert_eq!(schema_entities, 1);
    assert!(schema_version.is_some());
    assert!(schema_fingerprint.is_some());
}

#[test]
fn heap_backed_session_reinit_loses_rows_and_indexes_but_reconciles_live_schema() {
    reset_heap_session_sql_store();
    let session = heap_sql_session();
    seed_heap_session_entities(&session);
    assert_eq!(heap_snapshot_counts(&session).0, 3);

    reinitialize_heap_session_sql_store();
    let session = heap_sql_session();

    let loaded = session
        .load::<HeapSessionSqlEntity>()
        .order_term(crate::db::asc("id"))
        .execute()
        .and_then(crate::db::LoadQueryResult::into_rows)
        .expect("heap fluent load should read after reinit")
        .entities();
    assert_eq!(
        loaded,
        Vec::<HeapSessionSqlEntity>::new(),
        "heap rows must not be recovered from stable commit state after store reinit",
    );

    let rows = public_projection_rows::<HeapSessionSqlEntity>(
        &session,
        "SELECT name, age FROM HeapSessionSqlEntity \
         WHERE name >= 'A' \
         ORDER BY name ASC",
    );
    assert_eq!(
        rows,
        Vec::<Vec<Value>>::new(),
        "heap SQL query should observe an empty live store after reinit",
    );

    let (data_entries, index_entries, schema_entities, schema_version, schema_fingerprint) =
        heap_snapshot_counts(&session);
    assert_eq!(data_entries, 0);
    assert_eq!(index_entries, 0);
    assert_eq!(
        schema_entities, 1,
        "heap schema metadata is rebuilt for live validation/diagnostics, not recovered as rows",
    );
    assert!(schema_version.is_some());
    assert!(schema_fingerprint.is_some());
}

#[test]
fn stable_source_strong_relation_to_heap_target_rejects_at_runtime_boundary() {
    reset_mixed_heap_relation_stores();
    let session = mixed_heap_relation_sql_session();
    seed_heap_session_entities(&session);

    let err = session
        .insert(StableSessionSqlSourceToHeapTargetEntity {
            id: 10,
            target_id: 1,
        })
        .expect_err("stable source strong relation to heap target should fail closed");
    assert_eq!(err.class(), ErrorClass::Unsupported);

    let persisted = session
        .load::<StableSessionSqlSourceToHeapTargetEntity>()
        .execute()
        .and_then(crate::db::LoadQueryResult::into_rows)
        .expect("post-rejection stable-source load should succeed")
        .entities();
    assert_eq!(
        persisted,
        Vec::<StableSessionSqlSourceToHeapTargetEntity>::new(),
        "stable-source relation rejection must not persist the row",
    );
}

#[test]
fn stable_source_weak_relation_to_heap_target_remains_non_enforcing() {
    reset_mixed_heap_relation_stores();
    let session = mixed_heap_relation_sql_session();

    session
        .insert(StableSessionSqlWeakSourceToHeapTargetEntity {
            id: 11,
            target_id: 9_999,
        })
        .expect("weak stable-source relation to heap target should not take strong policy");

    let persisted = session
        .load::<StableSessionSqlWeakSourceToHeapTargetEntity>()
        .execute()
        .and_then(crate::db::LoadQueryResult::into_rows)
        .expect("weak stable-source relation load should succeed")
        .entities();
    assert_eq!(
        persisted,
        vec![StableSessionSqlWeakSourceToHeapTargetEntity {
            id: 11,
            target_id: 9_999,
        }],
        "non-strong relation behavior should stay independent of target durability",
    );
}

#[test]
fn heap_source_strong_relation_to_heap_target_keeps_live_validation_semantics() {
    reset_heap_session_sql_store();
    let session = heap_sql_session();
    seed_heap_session_entities(&session);

    session
        .insert(HeapSessionSqlSourceToHeapTargetEntity {
            id: 20,
            target_id: 1,
        })
        .expect("heap source relation to heap target should validate while live");

    let persisted = session
        .load::<HeapSessionSqlSourceToHeapTargetEntity>()
        .execute()
        .and_then(crate::db::LoadQueryResult::into_rows)
        .expect("heap-source relation load should succeed")
        .entities();
    assert_eq!(
        persisted,
        vec![HeapSessionSqlSourceToHeapTargetEntity {
            id: 20,
            target_id: 1,
        }],
    );
}

#[test]
fn public_writes_emit_durable_live_and_mixed_commit_classifications() {
    reset_session_sql_store();
    let session = sql_session();
    let (stable_result, stable_classes) =
        capture_mutation_commit_classes(SessionSqlWriteEntity::PATH, || {
            session.insert(SessionSqlWriteEntity {
                id: 170_200,
                name: "stable".to_string(),
                age: 41,
            })
        });
    stable_result.expect("stable public insert should succeed");
    assert_eq!(stable_classes, vec![MutationCommitClass::DurableOnly]);

    reset_heap_session_sql_store();
    let session = heap_sql_session();
    let (heap_result, heap_classes) =
        capture_mutation_commit_classes(HeapSessionSqlEntity::PATH, || {
            session.insert(HeapSessionSqlEntity {
                id: 170_201,
                name: "heap".to_string(),
                age: 42,
            })
        });
    heap_result.expect("heap public insert should succeed");
    assert_eq!(heap_classes, vec![MutationCommitClass::LiveOnly]);

    reset_mixed_heap_relation_stores();
    let session = mixed_heap_relation_sql_session();
    session
        .insert(SessionSqlSelfRelationEntity {
            id: 170_202,
            parent: None,
        })
        .expect("stable relation target should seed");
    let (mixed_result, mixed_classes) =
        capture_mutation_commit_classes(HeapSessionSqlSourceToStableTargetEntity::PATH, || {
            session.insert(HeapSessionSqlSourceToStableTargetEntity {
                id: 170_203,
                target_id: 170_202,
            })
        });
    mixed_result.expect("heap source to stable target should validate while live");
    assert_eq!(
        mixed_classes,
        vec![MutationCommitClass::MixedDurableAndLive]
    );
}

#[test]
fn failed_heap_source_to_stable_target_write_leaves_no_heap_side_effects() {
    reset_mixed_heap_relation_stores();
    let session = mixed_heap_relation_sql_session();

    let (result, classes) =
        capture_mutation_commit_classes(HeapSessionSqlSourceToStableTargetEntity::PATH, || {
            session.insert(HeapSessionSqlSourceToStableTargetEntity {
                id: 170_204,
                target_id: 170_205,
            })
        });
    result.expect_err("missing stable target should reject before heap write apply");
    assert_eq!(
        classes,
        Vec::<MutationCommitClass>::new(),
        "failed preflight must not advertise a commit classification"
    );

    let persisted = session
        .load::<HeapSessionSqlSourceToStableTargetEntity>()
        .execute()
        .and_then(crate::db::LoadQueryResult::into_rows)
        .expect("heap source load after failed write should succeed")
        .entities();
    assert_eq!(
        persisted,
        Vec::<HeapSessionSqlSourceToStableTargetEntity>::new(),
        "failed mixed write must not leave heap source side effects"
    );
}