use std::collections::HashSet;
use std::io::{Read, Seek, SeekFrom, Write};
use std::time::{Duration, Instant};
use claw_core::{ClawConfig, ClawEngine, ClawError, ListOptions, MemoryRecord, MemoryType};
use tempfile::TempDir;
async fn make_engine(dir: &TempDir, with_snapshot: bool) -> ClawEngine {
let db_path = dir.path().join("integration.db");
let mut builder = ClawConfig::builder().db_path(&db_path).cache_size_mb(1);
if with_snapshot {
builder = builder.snapshot_dir(dir.path().join("snapshots"));
}
let config = builder.build().expect("config");
ClawEngine::open(config).await.expect("engine")
}
#[tokio::test]
async fn snapshot_restore_round_trip() {
let dir = TempDir::new().expect("tempdir");
let mut engine = make_engine(&dir, true).await;
for i in 0..100 {
let record = MemoryRecord::new(
format!("snapshot-record-{i}"),
MemoryType::Semantic,
vec!["snap".to_string()],
None,
);
engine.insert_memory(&record).await.expect("insert");
}
let snapshot_path = engine.snapshot().await.expect("snapshot");
let all = engine.list_memories(None).await.expect("list");
for record in all {
engine.delete_memory(record.id).await.expect("delete");
}
assert_eq!(engine.list_memories(None).await.expect("empty").len(), 0);
engine.restore(&snapshot_path).await.expect("restore");
let restored = engine.list_memories(None).await.expect("restored");
assert_eq!(restored.len(), 100);
}
#[tokio::test]
async fn snapshot_corrupt_detection() {
let dir = TempDir::new().expect("tempdir");
let mut engine = make_engine(&dir, true).await;
let record = MemoryRecord::new(
"for-corruption-test",
MemoryType::Semantic,
vec!["snap".to_string()],
None,
);
engine.insert_memory(&record).await.expect("insert");
let snapshot_path = engine.snapshot().await.expect("snapshot");
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&snapshot_path)
.expect("open snapshot");
let mut first_byte = [0_u8; 1];
file.read_exact(&mut first_byte).expect("read first byte");
file.seek(SeekFrom::Start(0)).expect("seek start");
first_byte[0] ^= 0x01;
file.write_all(&first_byte).expect("write flipped byte");
file.flush().expect("flush");
let err = engine.restore(&snapshot_path).await.expect_err("must fail");
assert!(matches!(err, ClawError::SnapshotCorrupt(_)));
}
#[tokio::test]
async fn fts_rollback_consistency() {
let dir = TempDir::new().expect("tempdir");
let engine = make_engine(&dir, false).await;
let mut tx = engine.transaction().await.expect("tx");
for i in 0..5 {
let record = MemoryRecord::new(
format!("rollbackmarker value {i}"),
MemoryType::Semantic,
vec!["fts".to_string()],
None,
);
tx.insert_memory(&record).await.expect("stage insert");
}
tx.rollback().await.expect("rollback");
let results = engine
.fts_search("rollbackmarker")
.await
.expect("fts search");
assert_eq!(results.len(), 0);
}
#[tokio::test]
async fn fts_commit_consistency() {
let dir = TempDir::new().expect("tempdir");
let engine = make_engine(&dir, false).await;
let mut tx = engine.transaction().await.expect("tx");
for i in 0..5 {
let record = MemoryRecord::new(
format!("commitmarker value {i}"),
MemoryType::Semantic,
vec!["fts".to_string()],
None,
);
tx.insert_memory(&record).await.expect("stage insert");
}
tx.commit().await.expect("commit");
let results = engine.fts_search("commitmarker").await.expect("fts search");
assert_eq!(results.len(), 5);
}
#[tokio::test]
async fn tag_search_indexed() {
let dir = TempDir::new().expect("tempdir");
let engine = make_engine(&dir, false).await;
for i in 0..1000 {
let tags = vec![
"t1".to_string(),
"t2".to_string(),
if i % 4 == 0 {
"needle".to_string()
} else {
"other".to_string()
},
];
let record = MemoryRecord::new(format!("tag-record-{i}"), MemoryType::Semantic, tags, None);
engine.insert_memory(&record).await.expect("insert");
}
let results = engine
.search_by_tag_paginated("needle", 1_000, 0)
.await
.expect("tag search");
assert!(!results.is_empty());
assert!(results.iter().all(|r| r.tags.iter().any(|t| t == "needle")));
let plan_rows: Vec<(i64, i64, i64, String)> = sqlx::query_as(
"EXPLAIN QUERY PLAN \
SELECT m.id, m.content, m.memory_type, m.tags, m.ttl_seconds, m.created_at, m.updated_at \
FROM memories m \
JOIN memory_tags t ON m.id = t.memory_id \
WHERE t.tag = ? \
ORDER BY m.created_at DESC LIMIT ? OFFSET ?",
)
.bind("needle")
.bind(100_i64)
.bind(0_i64)
.fetch_all(engine.pool())
.await
.expect("query plan");
assert!(plan_rows.iter().any(|(_, _, _, detail)| {
detail.contains("idx_memory_tags_tag") || detail.contains("INDEX idx_memory_tags_tag")
}));
}
#[tokio::test]
async fn pagination_cursor() {
let dir = TempDir::new().expect("tempdir");
let engine = make_engine(&dir, false).await;
for i in 0..200 {
let record = MemoryRecord::new(
format!("page-record-{i}"),
MemoryType::Semantic,
vec!["page".to_string()],
None,
);
engine.insert_memory(&record).await.expect("insert");
}
let mut seen = HashSet::new();
let mut cursor = None;
let mut pages = 0_u32;
loop {
let page = engine
.get_memories_by_type(
MemoryType::Semantic,
Some(ListOptions {
limit: 50,
cursor: cursor.clone(),
}),
)
.await
.expect("page");
if page.items.is_empty() {
break;
}
pages += 1;
for item in &page.items {
assert!(seen.insert(item.id), "duplicate id found: {}", item.id);
}
cursor = page.next_cursor;
if cursor.is_none() {
break;
}
}
assert_eq!(pages, 4);
assert_eq!(seen.len(), 200);
}
#[tokio::test]
async fn cache_invalidated_after_restore() {
let dir = TempDir::new().expect("tempdir");
let mut engine = make_engine(&dir, true).await;
let record = MemoryRecord::new(
"old value",
MemoryType::Semantic,
vec!["restore".to_string()],
None,
);
let id = record.id;
engine.insert_memory(&record).await.expect("insert");
let snapshot_path = engine.snapshot().await.expect("snapshot");
engine
.update_memory(id, "new value")
.await
.expect("update newer");
let cached_new = engine.get_memory(id).await.expect("cache new");
assert_eq!(cached_new.content, "new value");
engine.restore(&snapshot_path).await.expect("restore");
let restored = engine.get_memory(id).await.expect("get restored");
assert_eq!(restored.content, "old value");
}
#[test]
fn performance_guard_fts_and_snapshot() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime");
rt.block_on(async {
let dir = TempDir::new().expect("tempdir");
let engine = make_engine(&dir, true).await;
for i in 0..10_000 {
let marker = if i == 4_242 { "needlequery" } else { "common" };
let record = MemoryRecord::new(
format!("perf-record-{i} {marker}"),
MemoryType::Semantic,
vec!["perf".to_string()],
None,
);
engine.insert_memory(&record).await.expect("insert");
}
let fts_start = Instant::now();
let fts_results = engine.fts_search("needlequery").await.expect("fts");
let fts_elapsed = fts_start.elapsed();
assert_eq!(fts_results.len(), 1);
assert!(
fts_elapsed < Duration::from_millis(50),
"fts_search on 10k exceeded 50ms: {:?}",
fts_elapsed
);
let snapshot_start = Instant::now();
let _snapshot_path = engine.snapshot().await.expect("snapshot");
let snapshot_elapsed = snapshot_start.elapsed();
assert!(
snapshot_elapsed < Duration::from_millis(500),
"snapshot on 10k exceeded 500ms: {:?}",
snapshot_elapsed
);
});
}