claw-core 0.1.2

Embedded local database engine for ClawDB — an agent-native cognitive database
Documentation
use std::collections::HashSet;
use std::io::{Read, Seek, SeekFrom, Write};
use std::time::{Duration, Instant};

use claw_core::{ClawConfig, ClawEngine, ClawError, ListOptions, MemoryRecord, MemoryType};
use tempfile::TempDir;

async fn make_engine(dir: &TempDir, with_snapshot: bool) -> ClawEngine {
    let db_path = dir.path().join("integration.db");
    let mut builder = ClawConfig::builder().db_path(&db_path).cache_size_mb(1);
    if with_snapshot {
        builder = builder.snapshot_dir(dir.path().join("snapshots"));
    }
    let config = builder.build().expect("config");
    ClawEngine::open(config).await.expect("engine")
}

#[tokio::test]
async fn snapshot_restore_round_trip() {
    let dir = TempDir::new().expect("tempdir");
    let mut engine = make_engine(&dir, true).await;

    for i in 0..100 {
        let record = MemoryRecord::new(
            format!("snapshot-record-{i}"),
            MemoryType::Semantic,
            vec!["snap".to_string()],
            None,
        );
        engine.insert_memory(&record).await.expect("insert");
    }

    let snapshot_path = engine.snapshot().await.expect("snapshot");

    let all = engine.list_memories(None).await.expect("list");
    for record in all {
        engine.delete_memory(record.id).await.expect("delete");
    }
    assert_eq!(engine.list_memories(None).await.expect("empty").len(), 0);

    engine.restore(&snapshot_path).await.expect("restore");
    let restored = engine.list_memories(None).await.expect("restored");
    assert_eq!(restored.len(), 100);
}

#[tokio::test]
async fn snapshot_corrupt_detection() {
    let dir = TempDir::new().expect("tempdir");
    let mut engine = make_engine(&dir, true).await;

    let record = MemoryRecord::new(
        "for-corruption-test",
        MemoryType::Semantic,
        vec!["snap".to_string()],
        None,
    );
    engine.insert_memory(&record).await.expect("insert");

    let snapshot_path = engine.snapshot().await.expect("snapshot");

    let mut file = std::fs::OpenOptions::new()
        .read(true)
        .write(true)
        .open(&snapshot_path)
        .expect("open snapshot");

    let mut first_byte = [0_u8; 1];
    file.read_exact(&mut first_byte).expect("read first byte");
    file.seek(SeekFrom::Start(0)).expect("seek start");
    first_byte[0] ^= 0x01;
    file.write_all(&first_byte).expect("write flipped byte");
    file.flush().expect("flush");

    let err = engine.restore(&snapshot_path).await.expect_err("must fail");
    assert!(matches!(err, ClawError::SnapshotCorrupt(_)));
}

#[tokio::test]
async fn fts_rollback_consistency() {
    let dir = TempDir::new().expect("tempdir");
    let engine = make_engine(&dir, false).await;

    let mut tx = engine.transaction().await.expect("tx");
    for i in 0..5 {
        let record = MemoryRecord::new(
            format!("rollbackmarker value {i}"),
            MemoryType::Semantic,
            vec!["fts".to_string()],
            None,
        );
        tx.insert_memory(&record).await.expect("stage insert");
    }
    tx.rollback().await.expect("rollback");

    let results = engine
        .fts_search("rollbackmarker")
        .await
        .expect("fts search");
    assert_eq!(results.len(), 0);
}

#[tokio::test]
async fn fts_commit_consistency() {
    let dir = TempDir::new().expect("tempdir");
    let engine = make_engine(&dir, false).await;

    let mut tx = engine.transaction().await.expect("tx");
    for i in 0..5 {
        let record = MemoryRecord::new(
            format!("commitmarker value {i}"),
            MemoryType::Semantic,
            vec!["fts".to_string()],
            None,
        );
        tx.insert_memory(&record).await.expect("stage insert");
    }
    tx.commit().await.expect("commit");

    let results = engine.fts_search("commitmarker").await.expect("fts search");
    assert_eq!(results.len(), 5);
}

#[tokio::test]
async fn tag_search_indexed() {
    let dir = TempDir::new().expect("tempdir");
    let engine = make_engine(&dir, false).await;

    for i in 0..1000 {
        let tags = vec![
            "t1".to_string(),
            "t2".to_string(),
            if i % 4 == 0 {
                "needle".to_string()
            } else {
                "other".to_string()
            },
        ];
        let record = MemoryRecord::new(format!("tag-record-{i}"), MemoryType::Semantic, tags, None);
        engine.insert_memory(&record).await.expect("insert");
    }

    let results = engine
        .search_by_tag_paginated("needle", 1_000, 0)
        .await
        .expect("tag search");
    assert!(!results.is_empty());
    assert!(results.iter().all(|r| r.tags.iter().any(|t| t == "needle")));

    let plan_rows: Vec<(i64, i64, i64, String)> = sqlx::query_as(
        "EXPLAIN QUERY PLAN \
         SELECT m.id, m.content, m.memory_type, m.tags, m.ttl_seconds, m.created_at, m.updated_at \
         FROM memories m \
         JOIN memory_tags t ON m.id = t.memory_id \
         WHERE t.tag = ? \
         ORDER BY m.created_at DESC LIMIT ? OFFSET ?",
    )
    .bind("needle")
    .bind(100_i64)
    .bind(0_i64)
    .fetch_all(engine.pool())
    .await
    .expect("query plan");

    assert!(plan_rows.iter().any(|(_, _, _, detail)| {
        detail.contains("idx_memory_tags_tag") || detail.contains("INDEX idx_memory_tags_tag")
    }));
}

#[tokio::test]
async fn pagination_cursor() {
    let dir = TempDir::new().expect("tempdir");
    let engine = make_engine(&dir, false).await;

    for i in 0..200 {
        let record = MemoryRecord::new(
            format!("page-record-{i}"),
            MemoryType::Semantic,
            vec!["page".to_string()],
            None,
        );
        engine.insert_memory(&record).await.expect("insert");
    }

    let mut seen = HashSet::new();
    let mut cursor = None;
    let mut pages = 0_u32;

    loop {
        let page = engine
            .get_memories_by_type(
                MemoryType::Semantic,
                Some(ListOptions {
                    limit: 50,
                    cursor: cursor.clone(),
                }),
            )
            .await
            .expect("page");

        if page.items.is_empty() {
            break;
        }

        pages += 1;
        for item in &page.items {
            assert!(seen.insert(item.id), "duplicate id found: {}", item.id);
        }

        cursor = page.next_cursor;
        if cursor.is_none() {
            break;
        }
    }

    assert_eq!(pages, 4);
    assert_eq!(seen.len(), 200);
}

#[tokio::test]
async fn cache_invalidated_after_restore() {
    let dir = TempDir::new().expect("tempdir");
    let mut engine = make_engine(&dir, true).await;

    let record = MemoryRecord::new(
        "old value",
        MemoryType::Semantic,
        vec!["restore".to_string()],
        None,
    );
    let id = record.id;
    engine.insert_memory(&record).await.expect("insert");

    let snapshot_path = engine.snapshot().await.expect("snapshot");

    engine
        .update_memory(id, "new value")
        .await
        .expect("update newer");
    let cached_new = engine.get_memory(id).await.expect("cache new");
    assert_eq!(cached_new.content, "new value");

    engine.restore(&snapshot_path).await.expect("restore");

    let restored = engine.get_memory(id).await.expect("get restored");
    assert_eq!(restored.content, "old value");
}

#[test]
fn performance_guard_fts_and_snapshot() {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("runtime");

    rt.block_on(async {
        let dir = TempDir::new().expect("tempdir");
        let engine = make_engine(&dir, true).await;

        for i in 0..10_000 {
            let marker = if i == 4_242 { "needlequery" } else { "common" };
            let record = MemoryRecord::new(
                format!("perf-record-{i} {marker}"),
                MemoryType::Semantic,
                vec!["perf".to_string()],
                None,
            );
            engine.insert_memory(&record).await.expect("insert");
        }

        let fts_start = Instant::now();
        let fts_results = engine.fts_search("needlequery").await.expect("fts");
        let fts_elapsed = fts_start.elapsed();
        assert_eq!(fts_results.len(), 1);
        assert!(
            fts_elapsed < Duration::from_millis(50),
            "fts_search on 10k exceeded 50ms: {:?}",
            fts_elapsed
        );

        let snapshot_start = Instant::now();
        let _snapshot_path = engine.snapshot().await.expect("snapshot");
        let snapshot_elapsed = snapshot_start.elapsed();
        assert!(
            snapshot_elapsed < Duration::from_millis(500),
            "snapshot on 10k exceeded 500ms: {:?}",
            snapshot_elapsed
        );
    });
}