use anyhow::{Context, Result};
use colored::Colorize;
use redb::{Database, ReadableTable, TableHandle};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::path::Path;
use trusty_common::memory_core::palace::{Drawer, Palace, PalaceId};
use trusty_common::memory_core::store::kg::Triple;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KuzuEntity {
pub id: String,
pub name: String,
#[serde(default)]
pub entity_type: String,
#[serde(default)]
pub observations: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KuzuRelation {
pub from: String,
pub to: String,
pub relation_type: String,
}
pub fn entity_uuid(entity_id: &str, palace_name: &str) -> Uuid {
let mut hasher = Sha256::new();
hasher.update(entity_id.as_bytes());
hasher.update(b"\x00");
hasher.update(palace_name.as_bytes());
let digest = hasher.finalize();
let mut bytes = [0u8; 16];
bytes.copy_from_slice(&digest[..16]);
bytes[6] = (bytes[6] & 0x0f) | 0x40;
bytes[8] = (bytes[8] & 0x3f) | 0x80;
Uuid::from_bytes(bytes)
}
pub fn entity_to_drawer(entity: &KuzuEntity, palace_name: &str) -> Drawer {
let id = entity_uuid(&entity.id, palace_name);
let room_id = Uuid::nil(); let content = if entity.observations.is_empty() {
entity.name.clone()
} else {
format!("{}: {}", entity.name, entity.observations.join("\n"))
};
let mut tags = vec!["source:kuzu".to_string()];
if !entity.entity_type.is_empty() {
tags.push(format!("type:{}", entity.entity_type.to_lowercase()));
}
let mut drawer = Drawer::new(room_id, &content);
drawer.id = id;
drawer.tags = tags;
drawer.importance = 0.5;
drawer
}
pub fn relation_to_triple(relation: &KuzuRelation) -> Triple {
Triple {
subject: format!("entity:{}", relation.from),
predicate: relation.relation_type.clone(),
object: format!("entity:{}", relation.to),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 0.8,
provenance: Some("kuzu-migrate".to_string()),
}
}
pub const ENTITIES_TABLE: &str = "entities";
pub const RELATIONS_TABLE: &str = "relations";
pub fn discover_schema(path: &Path) -> Result<Vec<String>> {
let db = Database::open(path)
.with_context(|| format!("open kuzu-memory store.redb at {}", path.display()))?;
let rtx = db
.begin_read()
.context("begin read txn for schema discovery")?;
let tables = rtx
.list_tables()
.context("list tables for schema discovery")?;
Ok(tables.map(|t| t.name().to_string()).collect())
}
pub fn read_entities(db: &Database) -> Result<Vec<KuzuEntity>> {
use redb::TableDefinition;
const TABLE: TableDefinition<&str, &str> = TableDefinition::new(ENTITIES_TABLE);
let rtx = db.begin_read().context("begin read txn for entities")?;
let table = match rtx.open_table(TABLE) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => {
tracing::warn!("kuzu-migrate: entities table not found — skipping");
return Ok(Vec::new());
}
Err(e) => return Err(e).context("open entities table"),
};
let mut out = Vec::new();
for entry in table.iter().context("iterate entities")? {
let (k, v) = entry.context("read entity row")?;
let entity_id = k.value();
match serde_json::from_str::<KuzuEntity>(v.value()) {
Ok(mut entity) => {
if entity.id.is_empty() {
entity.id = entity_id.to_string();
}
out.push(entity);
}
Err(e) => {
tracing::warn!(id = %entity_id, "kuzu-migrate: skip malformed entity: {e}");
}
}
}
Ok(out)
}
pub fn read_relations(db: &Database) -> Result<Vec<KuzuRelation>> {
use redb::TableDefinition;
const TABLE: TableDefinition<&str, &str> = TableDefinition::new(RELATIONS_TABLE);
let rtx = db.begin_read().context("begin read txn for relations")?;
let table = match rtx.open_table(TABLE) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => {
tracing::warn!("kuzu-migrate: relations table not found — skipping");
return Ok(Vec::new());
}
Err(e) => return Err(e).context("open relations table"),
};
let mut out = Vec::new();
for entry in table.iter().context("iterate relations")? {
let (k, v) = entry.context("read relation row")?;
let rid = k.value();
match serde_json::from_str::<KuzuRelation>(v.value()) {
Ok(relation) => out.push(relation),
Err(e) => {
tracing::warn!(id = %rid, "kuzu-migrate: skip malformed relation: {e}");
}
}
}
Ok(out)
}
pub fn handle_kuzu_data_migrate(
from: &Path,
palace_name: &str,
dry_run: bool,
limit: Option<usize>,
) -> Result<()> {
if dry_run {
println!("{} Dry run — no data will be written.\n", "·".dimmed());
}
println!("🔍 Opening source store: {}", from.display());
let tables = discover_schema(from)?;
if tables.is_empty() {
println!(
"{} No tables found in source store — nothing to import.",
"·".dimmed()
);
return Ok(());
}
println!("{} Source schema: {}", "·".dimmed(), tables.join(", "));
let source_db =
Database::open(from).with_context(|| format!("open source store at {}", from.display()))?;
let entities = read_entities(&source_db)?;
let relations = read_relations(&source_db)?;
let entity_limit = limit.unwrap_or(entities.len()).min(entities.len());
println!(
"{} Found {} entities, {} relations (importing {} entities).",
"·".dimmed(),
entities.len(),
relations.len(),
entity_limit
);
if dry_run {
print_dry_run_plan(&entities[..entity_limit], &relations, palace_name);
return Ok(());
}
let data_dir = trusty_common::resolve_data_dir("trusty-memory")
.context("resolve trusty-memory data directory")?;
let data_root = crate::resolve_palace_registry_dir(data_dir);
let registry = trusty_common::memory_core::PalaceRegistry::new();
let palace_id = PalaceId::new(palace_name);
if registry.open_palace(&data_root, &palace_id).is_err() {
println!(" Creating target palace '{palace_name}'…");
let palace = Palace {
id: palace_id.clone(),
name: palace_name.to_string(),
description: Some(format!(
"Imported from kuzu-memory store.redb at {}",
from.display()
)),
created_at: chrono::Utc::now(),
data_dir: data_root.join(palace_name),
};
registry
.create_palace(&data_root, palace)
.context("create target palace")?;
}
let handle = registry
.open_palace(&data_root, &palace_id)
.context("open target palace")?;
let mut drawers_written = 0usize;
let mut drawers_skipped = 0usize;
for entity in &entities[..entity_limit] {
let drawer = entity_to_drawer(entity, palace_name);
let exists = {
let d = handle.drawers.read();
d.iter().any(|x| x.id == drawer.id)
};
if exists {
drawers_skipped += 1;
continue;
}
match handle.kg.upsert_drawer_sync(&drawer) {
Ok(()) => drawers_written += 1,
Err(e) => {
tracing::warn!(entity_id = %entity.id, "kuzu-migrate: upsert drawer failed: {e:#}");
}
}
}
let mut triples_written = 0usize;
let mut triples_skipped = 0usize;
let store = handle.kg.store();
for relation in &relations {
let triple = relation_to_triple(relation);
let exists = store
.query_active(&triple.subject)
.map(|v| v.iter().any(|t| t.predicate == triple.predicate))
.unwrap_or(false);
if exists {
triples_skipped += 1;
continue;
}
match handle.kg.assert_sync(&triple) {
Ok(()) => triples_written += 1,
Err(e) => {
tracing::warn!(
from = %relation.from, to = %relation.to,
"kuzu-migrate: assert triple failed: {e:#}"
);
}
}
}
println!();
println!(
"{} Import complete: {} drawers written ({} already existed), \
{} triples written ({} already existed).",
"✓".green(),
drawers_written,
drawers_skipped,
triples_written,
triples_skipped
);
Ok(())
}
fn print_dry_run_plan(entities: &[KuzuEntity], relations: &[KuzuRelation], palace_name: &str) {
println!("\nPlanned operations:");
println!(" {} drawers to create:", entities.len());
for entity in entities.iter().take(10) {
let drawer_id = entity_uuid(&entity.id, palace_name);
println!(
" drawer:{drawer_id} ← entity:{} ({:?})",
entity.id, entity.name
);
}
if entities.len() > 10 {
println!(" … and {} more", entities.len() - 10);
}
println!(" {} triples to assert:", relations.len());
for rel in relations.iter().take(10) {
let triple = relation_to_triple(rel);
println!(
" ({}, {}, {})",
triple.subject, triple.predicate, triple.object
);
}
if relations.len() > 10 {
println!(" … and {} more", relations.len() - 10);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn entity_to_drawer_maps_name_and_observations() {
let entity = KuzuEntity {
id: "ent-001".to_string(),
name: "Alice".to_string(),
entity_type: "person".to_string(),
observations: vec!["works at Acme".to_string(), "knows Rust".to_string()],
};
let drawer = entity_to_drawer(&entity, "test-palace");
assert!(
drawer.content.contains("Alice"),
"content must include name"
);
assert!(
drawer.content.contains("works at Acme"),
"content must include first observation"
);
assert!(
drawer.content.contains("knows Rust"),
"content must include second observation"
);
assert!(drawer.tags.contains(&"source:kuzu".to_string()));
assert!(drawer.tags.contains(&"type:person".to_string()));
let drawer2 = entity_to_drawer(&entity, "test-palace");
assert_eq!(drawer.id, drawer2.id, "drawer id must be deterministic");
}
#[test]
fn entity_to_drawer_empty_observations() {
let entity = KuzuEntity {
id: "ent-002".to_string(),
name: "Bob".to_string(),
entity_type: String::new(),
observations: vec![],
};
let drawer = entity_to_drawer(&entity, "palace");
assert_eq!(drawer.content, "Bob");
assert!(!drawer.tags.iter().any(|t| t.starts_with("type:")));
}
#[test]
fn relation_to_triple_maps_fields() {
let relation = KuzuRelation {
from: "alice".to_string(),
to: "acme-corp".to_string(),
relation_type: "works_at".to_string(),
};
let triple = relation_to_triple(&relation);
assert_eq!(triple.subject, "entity:alice");
assert_eq!(triple.predicate, "works_at");
assert_eq!(triple.object, "entity:acme-corp");
assert_eq!(triple.provenance.as_deref(), Some("kuzu-migrate"));
assert!(triple.valid_to.is_none());
}
#[test]
fn entity_uuid_is_deterministic() {
let a = entity_uuid("ent-abc", "my-palace");
let b = entity_uuid("ent-abc", "my-palace");
let c = entity_uuid("ent-xyz", "my-palace");
let d = entity_uuid("ent-abc", "other-palace");
assert_eq!(a, b);
assert_ne!(a, c);
assert_ne!(a, d);
}
#[test]
fn discover_schema_on_empty_db_returns_empty() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("empty.redb");
drop(Database::create(&path).expect("create empty db"));
let tables = discover_schema(&path).expect("should not fail on empty db");
assert!(tables.is_empty());
}
#[test]
fn read_entities_on_empty_db_returns_empty() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("empty.redb");
let db = Database::create(&path).expect("create db");
let entities = read_entities(&db).expect("should not fail on empty db");
assert!(entities.is_empty());
}
#[test]
fn read_relations_on_empty_db_returns_empty() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("empty.redb");
let db = Database::create(&path).expect("create db");
let relations = read_relations(&db).expect("should not fail on empty db");
assert!(relations.is_empty());
}
#[test]
fn read_fixture_store_returns_entities_and_relations() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("fixture.redb");
write_fixture_store(&path, 2, 1).expect("write fixture");
let db = Database::open(&path).expect("open fixture");
let entities = read_entities(&db).expect("read entities");
let relations = read_relations(&db).expect("read relations");
assert_eq!(entities.len(), 2, "expected 2 entities");
assert_eq!(relations.len(), 1, "expected 1 relation");
let drawer = entity_to_drawer(&entities[0], "test");
assert!(drawer.tags.contains(&"source:kuzu".to_string()));
let triple = relation_to_triple(&relations[0]);
assert_eq!(triple.predicate, "test_rel");
}
#[test]
fn dry_run_returns_ok_without_writing() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("fixture.redb");
write_fixture_store(&path, 1, 0).expect("write fixture");
let result = handle_kuzu_data_migrate(&path, "test-palace", true, None);
assert!(result.is_ok(), "dry run must succeed: {result:?}");
}
pub(crate) fn write_fixture_store(
path: &Path,
n_entities: usize,
n_relations: usize,
) -> Result<()> {
use redb::TableDefinition;
const ENTITIES: TableDefinition<&str, &str> = TableDefinition::new("entities");
const RELATIONS: TableDefinition<&str, &str> = TableDefinition::new("relations");
let db = Database::create(path).context("create fixture db")?;
let wtx = db.begin_write().context("begin write txn")?;
{
let mut entities = wtx.open_table(ENTITIES).context("open entities table")?;
for i in 0..n_entities {
let id = format!("ent-{i:03}");
let entity = KuzuEntity {
id: id.clone(),
name: format!("Entity {i}"),
entity_type: "test_type".to_string(),
observations: vec![format!("observation {i}")],
};
let json = serde_json::to_string(&entity).context("serialize entity")?;
entities
.insert(id.as_str(), json.as_str())
.context("insert entity")?;
}
}
{
let mut relations = wtx.open_table(RELATIONS).context("open relations table")?;
for i in 0..n_relations.min(if n_entities >= 2 { n_relations } else { 0 }) {
let rel_id = format!("rel-{i:03}");
let from = format!("ent-{:03}", i);
let to = format!("ent-{:03}", i + 1);
let relation = KuzuRelation {
from,
to,
relation_type: "test_rel".to_string(),
};
let json = serde_json::to_string(&relation).context("serialize relation")?;
relations
.insert(rel_id.as_str(), json.as_str())
.context("insert relation")?;
}
}
wtx.commit().context("commit fixture")?;
Ok(())
}
}