use std::path::Path;
use crate::{Database, DbResultExt};
use roboticus_core::{Result, RoboticusError};
const SCHEMA_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
scope_key TEXT NOT NULL DEFAULT 'agent',
status TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active', 'archived', 'expired')),
model TEXT,
nickname TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
metadata TEXT,
cross_channel_consent INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sessions_scope ON sessions(agent_id, scope_key, status);
CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_active_scope_unique ON sessions(agent_id, scope_key) WHERE status = 'active';
CREATE INDEX IF NOT EXISTS idx_sessions_status_updated ON sessions(status, updated_at);
CREATE TABLE IF NOT EXISTS session_messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(id),
parent_id TEXT,
role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system', 'tool')),
content TEXT NOT NULL,
usage_json TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_session_messages_session ON session_messages(session_id, created_at);
CREATE TABLE IF NOT EXISTS turns (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(id),
thinking TEXT,
tool_calls_json TEXT,
tokens_in INTEGER,
tokens_out INTEGER,
cost REAL,
model TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_turns_session ON turns(session_id, created_at);
CREATE TABLE IF NOT EXISTS tool_calls (
id TEXT PRIMARY KEY,
turn_id TEXT NOT NULL REFERENCES turns(id),
tool_name TEXT NOT NULL,
input TEXT NOT NULL,
output TEXT,
skill_id TEXT,
skill_name TEXT,
skill_hash TEXT,
status TEXT NOT NULL,
duration_ms INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_tool_calls_turn ON tool_calls(turn_id);
CREATE TABLE IF NOT EXISTS policy_decisions (
id TEXT PRIMARY KEY,
turn_id TEXT,
tool_name TEXT NOT NULL,
decision TEXT NOT NULL CHECK(decision IN ('allow', 'deny')),
rule_name TEXT,
reason TEXT,
context_json TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_policy_decisions_session ON policy_decisions(turn_id);
CREATE INDEX IF NOT EXISTS idx_policy_decisions_created ON policy_decisions(created_at);
CREATE TABLE IF NOT EXISTS working_memory (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
entry_type TEXT NOT NULL CHECK(entry_type IN ('goal', 'note', 'turn_summary', 'decision', 'observation', 'fact')),
content TEXT NOT NULL,
importance INTEGER NOT NULL DEFAULT 5,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_working_memory_session ON working_memory(session_id);
CREATE TABLE IF NOT EXISTS episodic_memory (
id TEXT PRIMARY KEY,
classification TEXT NOT NULL,
content TEXT NOT NULL,
importance INTEGER NOT NULL DEFAULT 5,
owner_id TEXT,
memory_state TEXT NOT NULL DEFAULT 'active',
state_reason TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_episodic_importance ON episodic_memory(importance DESC, created_at DESC);
CREATE TABLE IF NOT EXISTS semantic_memory (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 0.8,
memory_state TEXT NOT NULL DEFAULT 'active',
state_reason TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(category, key)
);
CREATE TABLE IF NOT EXISTS procedural_memory (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
steps TEXT NOT NULL,
success_count INTEGER NOT NULL DEFAULT 0,
failure_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS relationship_memory (
id TEXT PRIMARY KEY,
entity_id TEXT NOT NULL UNIQUE,
entity_name TEXT,
trust_score REAL NOT NULL DEFAULT 0.5,
interaction_summary TEXT,
interaction_count INTEGER NOT NULL DEFAULT 1,
last_interaction TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5(
content,
category,
source_table,
source_id
);
-- Keep FTS in sync with episodic_memory
CREATE TRIGGER IF NOT EXISTS episodic_ai AFTER INSERT ON episodic_memory BEGIN
INSERT INTO memory_fts(content, category, source_table, source_id)
VALUES (new.content, new.classification, 'episodic', new.id);
END;
CREATE TRIGGER IF NOT EXISTS episodic_ad AFTER DELETE ON episodic_memory BEGIN
DELETE FROM memory_fts WHERE source_table = 'episodic' AND source_id = old.id;
END;
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
status TEXT NOT NULL DEFAULT 'pending',
priority INTEGER NOT NULL DEFAULT 0,
source TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS cron_jobs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
schedule_kind TEXT NOT NULL,
schedule_expr TEXT,
schedule_every_ms INTEGER,
schedule_tz TEXT DEFAULT 'UTC',
agent_id TEXT NOT NULL,
session_target TEXT NOT NULL DEFAULT 'main',
payload_json TEXT NOT NULL,
delivery_mode TEXT DEFAULT 'none',
delivery_channel TEXT,
last_run_at TEXT,
last_status TEXT,
last_duration_ms INTEGER,
consecutive_errors INTEGER NOT NULL DEFAULT 0,
next_run_at TEXT,
last_error TEXT,
lease_holder TEXT,
lease_expires_at TEXT
);
CREATE TABLE IF NOT EXISTS cron_runs (
id TEXT PRIMARY KEY,
job_id TEXT NOT NULL REFERENCES cron_jobs(id),
status TEXT NOT NULL CHECK(status IN ('success', 'error')),
duration_ms INTEGER,
error TEXT,
output_text TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_cron_runs_job ON cron_runs(job_id, created_at);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_cron_jobs_enabled ON cron_jobs(enabled, next_run_at);
CREATE TABLE IF NOT EXISTS transactions (
id TEXT PRIMARY KEY,
tx_type TEXT NOT NULL,
amount REAL NOT NULL,
currency TEXT NOT NULL DEFAULT 'USD',
counterparty TEXT,
tx_hash TEXT,
metadata_json TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_transactions_created ON transactions(created_at DESC);
CREATE TABLE IF NOT EXISTS service_requests (
id TEXT PRIMARY KEY,
service_id TEXT NOT NULL,
requester TEXT NOT NULL,
parameters_json TEXT NOT NULL,
status TEXT NOT NULL,
quoted_amount REAL NOT NULL,
currency TEXT NOT NULL DEFAULT 'USDC',
recipient TEXT NOT NULL,
quote_expires_at TEXT NOT NULL,
payment_tx_hash TEXT,
paid_amount REAL,
payment_verified_at TEXT,
fulfillment_output TEXT,
fulfilled_at TEXT,
failure_reason TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_service_requests_status ON service_requests(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_service_requests_service ON service_requests(service_id, created_at DESC);
CREATE TABLE IF NOT EXISTS revenue_opportunities (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
strategy TEXT NOT NULL,
payload_json TEXT NOT NULL,
expected_revenue_usdc REAL NOT NULL,
status TEXT NOT NULL,
qualification_reason TEXT,
confidence_score REAL NOT NULL DEFAULT 0,
effort_score REAL NOT NULL DEFAULT 0,
risk_score REAL NOT NULL DEFAULT 0,
priority_score REAL NOT NULL DEFAULT 0,
recommended_approved INTEGER NOT NULL DEFAULT 0,
score_reason TEXT,
plan_json TEXT,
evidence_json TEXT,
request_id TEXT,
settlement_ref TEXT UNIQUE,
settled_amount_usdc REAL,
attributable_costs_usdc REAL NOT NULL DEFAULT 0,
net_profit_usdc REAL,
tax_rate REAL NOT NULL DEFAULT 0,
tax_amount_usdc REAL NOT NULL DEFAULT 0,
retained_earnings_usdc REAL,
tax_destination_wallet TEXT,
settled_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_revenue_opportunities_status ON revenue_opportunities(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_revenue_opportunities_strategy ON revenue_opportunities(strategy, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_revenue_opportunities_request ON revenue_opportunities(request_id);
CREATE TABLE IF NOT EXISTS revenue_feedback (
id TEXT PRIMARY KEY,
opportunity_id TEXT NOT NULL,
strategy TEXT NOT NULL,
grade REAL NOT NULL,
source TEXT NOT NULL,
comment TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_revenue_feedback_opportunity ON revenue_feedback(opportunity_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_revenue_feedback_strategy ON revenue_feedback(strategy, created_at DESC);
CREATE TABLE IF NOT EXISTS inference_costs (
id TEXT PRIMARY KEY,
model TEXT NOT NULL,
provider TEXT NOT NULL,
tokens_in INTEGER NOT NULL,
tokens_out INTEGER NOT NULL,
cost REAL NOT NULL,
tier TEXT,
cached INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER,
quality_score REAL,
escalation INTEGER NOT NULL DEFAULT 0,
turn_id TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_inference_costs_time ON inference_costs(created_at DESC);
CREATE TABLE IF NOT EXISTS semantic_cache (
id TEXT PRIMARY KEY,
prompt_hash TEXT NOT NULL,
embedding BLOB,
response TEXT NOT NULL,
model TEXT NOT NULL,
tokens_saved INTEGER NOT NULL DEFAULT 0,
hit_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
expires_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_cache_hash ON semantic_cache(prompt_hash);
CREATE TABLE IF NOT EXISTS identity (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS os_personality_history (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS metric_snapshots (
id TEXT PRIMARY KEY,
metrics_json TEXT NOT NULL,
alerts_json TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS discovered_agents (
id TEXT PRIMARY KEY,
did TEXT NOT NULL UNIQUE,
agent_card_json TEXT NOT NULL,
capabilities TEXT,
endpoint_url TEXT NOT NULL,
chain_id INTEGER NOT NULL DEFAULT 8453,
trust_score REAL NOT NULL DEFAULT 0.5,
last_verified_at TEXT,
expires_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_discovered_agents_did ON discovered_agents(did);
CREATE TABLE IF NOT EXISTS skills (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
kind TEXT NOT NULL CHECK(kind IN ('structured', 'instruction', 'scripted', 'builtin')),
description TEXT,
source_path TEXT NOT NULL,
content_hash TEXT NOT NULL,
triggers_json TEXT,
tool_chain_json TEXT,
policy_overrides_json TEXT,
script_path TEXT,
risk_level TEXT NOT NULL DEFAULT 'Caution' CHECK(risk_level IN ('Safe', 'Caution', 'Dangerous', 'Forbidden')),
enabled INTEGER NOT NULL DEFAULT 1,
usage_count INTEGER NOT NULL DEFAULT 0,
last_used_at TEXT,
last_loaded_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
version TEXT NOT NULL DEFAULT '0.0.0',
author TEXT NOT NULL DEFAULT 'local',
registry_source TEXT NOT NULL DEFAULT 'local'
);
CREATE INDEX IF NOT EXISTS idx_skills_kind ON skills(kind);
CREATE TABLE IF NOT EXISTS delivery_queue (
id TEXT PRIMARY KEY,
channel TEXT NOT NULL,
recipient_id TEXT NOT NULL,
content TEXT NOT NULL,
idempotency_key TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending' CHECK(status IN ('pending', 'in_flight', 'delivered', 'dead_letter')),
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
next_retry_at TEXT NOT NULL DEFAULT (datetime('now')),
last_error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_delivery_queue_status ON delivery_queue(status, next_retry_at);
CREATE INDEX IF NOT EXISTS idx_delivery_queue_idem ON delivery_queue(idempotency_key);
CREATE TABLE IF NOT EXISTS approval_requests (
id TEXT PRIMARY KEY,
tool_name TEXT NOT NULL,
tool_input TEXT NOT NULL,
session_id TEXT,
turn_id TEXT,
status TEXT NOT NULL DEFAULT 'pending' CHECK(status IN ('pending', 'approved', 'denied', 'timed_out')),
decided_by TEXT,
decided_at TEXT,
timeout_at TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_approvals_status ON approval_requests(status);
CREATE TABLE IF NOT EXISTS plugins (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
version TEXT NOT NULL,
description TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
manifest_path TEXT NOT NULL,
permissions_json TEXT,
installed_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS embeddings (
id TEXT PRIMARY KEY,
source_table TEXT NOT NULL,
source_id TEXT NOT NULL,
content_preview TEXT NOT NULL,
embedding_blob BLOB,
dimensions INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_embeddings_source ON embeddings(source_table, source_id);
CREATE TABLE IF NOT EXISTS sub_agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
display_name TEXT,
model TEXT NOT NULL DEFAULT '',
fallback_models_json TEXT NOT NULL DEFAULT '[]',
role TEXT NOT NULL DEFAULT 'specialist',
description TEXT,
skills_json TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
session_count INTEGER NOT NULL DEFAULT 0,
last_used_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS context_checkpoints (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(id),
system_prompt_hash TEXT NOT NULL,
memory_summary TEXT NOT NULL,
active_tasks TEXT,
conversation_digest TEXT,
turn_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_checkpoints_session ON context_checkpoints(session_id, created_at DESC);
CREATE TABLE IF NOT EXISTS hippocampus (
table_name TEXT PRIMARY KEY,
description TEXT NOT NULL,
columns_json TEXT NOT NULL,
created_by TEXT NOT NULL DEFAULT 'system',
agent_owned INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_hippocampus_agent ON hippocampus(created_by, agent_owned);
CREATE TABLE IF NOT EXISTS turn_feedback (
id TEXT PRIMARY KEY,
turn_id TEXT NOT NULL UNIQUE REFERENCES turns(id),
session_id TEXT NOT NULL REFERENCES sessions(id),
grade INTEGER NOT NULL CHECK (grade BETWEEN 1 AND 5),
source TEXT NOT NULL DEFAULT 'dashboard',
comment TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_turn_feedback_session ON turn_feedback(session_id);
CREATE TABLE IF NOT EXISTS context_snapshots (
turn_id TEXT PRIMARY KEY REFERENCES turns(id),
complexity_level TEXT NOT NULL CHECK(complexity_level IN ('L0', 'L1', 'L2', 'L3')),
token_budget INTEGER NOT NULL,
system_prompt_tokens INTEGER,
memory_tokens INTEGER,
history_tokens INTEGER,
history_depth INTEGER,
memory_tiers_json TEXT,
retrieved_memories_json TEXT,
model TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS model_selection_events (
id TEXT PRIMARY KEY,
turn_id TEXT NOT NULL,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
channel TEXT NOT NULL,
selected_model TEXT NOT NULL,
strategy TEXT NOT NULL,
primary_model TEXT NOT NULL,
override_model TEXT,
complexity TEXT,
user_excerpt TEXT NOT NULL,
candidates_json TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
schema_version INTEGER NOT NULL DEFAULT 1,
attribution TEXT,
metascore_json TEXT,
features_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_model_selection_events_turn ON model_selection_events(turn_id);
CREATE INDEX IF NOT EXISTS idx_model_selection_events_created ON model_selection_events(created_at DESC);
CREATE TABLE IF NOT EXISTS shadow_routing_predictions (
id TEXT PRIMARY KEY,
turn_id TEXT NOT NULL,
production_model TEXT NOT NULL,
shadow_model TEXT,
production_complexity REAL,
shadow_complexity REAL,
agreed INTEGER NOT NULL DEFAULT 0,
detail_json TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_shadow_routing_created ON shadow_routing_predictions(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_shadow_routing_turn ON shadow_routing_predictions(turn_id);
CREATE TABLE IF NOT EXISTS abuse_events (
id TEXT PRIMARY KEY,
actor_id TEXT NOT NULL,
origin TEXT NOT NULL,
channel TEXT NOT NULL,
signal_type TEXT NOT NULL,
severity TEXT NOT NULL CHECK(severity IN ('low', 'medium', 'high')),
action_taken TEXT NOT NULL,
detail TEXT,
score REAL NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_abuse_events_actor ON abuse_events(actor_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_abuse_events_origin ON abuse_events(origin, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_abuse_events_created ON abuse_events(created_at DESC);
CREATE TABLE IF NOT EXISTS learned_skills (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
description TEXT NOT NULL DEFAULT '',
trigger_tools TEXT NOT NULL DEFAULT '[]',
steps_json TEXT NOT NULL DEFAULT '[]',
source_session_id TEXT,
success_count INTEGER NOT NULL DEFAULT 1,
failure_count INTEGER NOT NULL DEFAULT 0,
priority INTEGER NOT NULL DEFAULT 50,
skill_md_path TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
);
CREATE INDEX IF NOT EXISTS idx_learned_skills_priority ON learned_skills(priority DESC);
CREATE TABLE IF NOT EXISTS hygiene_log (
id TEXT PRIMARY KEY,
sweep_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')),
stale_procedural_days INTEGER NOT NULL,
dead_skill_priority_threshold INTEGER NOT NULL,
proc_total INTEGER NOT NULL DEFAULT 0,
proc_stale INTEGER NOT NULL DEFAULT 0,
proc_pruned INTEGER NOT NULL DEFAULT 0,
skills_total INTEGER NOT NULL DEFAULT 0,
skills_dead INTEGER NOT NULL DEFAULT 0,
skills_pruned INTEGER NOT NULL DEFAULT 0,
avg_skill_priority REAL NOT NULL DEFAULT 0.0
);
CREATE INDEX IF NOT EXISTS idx_hygiene_log_sweep ON hygiene_log(sweep_at DESC);
CREATE TABLE IF NOT EXISTS treasury_state (
id TEXT PRIMARY KEY DEFAULT 'singleton',
usdc_balance REAL NOT NULL DEFAULT 0.0,
native_balance REAL NOT NULL DEFAULT 0.0,
survival_tier TEXT NOT NULL DEFAULT 'Dead',
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS session_model_performance (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
turn_id TEXT NOT NULL,
model TEXT NOT NULL,
guard_fires INTEGER NOT NULL DEFAULT 0,
fallback_triggered INTEGER NOT NULL DEFAULT 0,
confidence_failures INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_smp_session ON session_model_performance(session_id);
CREATE TABLE IF NOT EXISTS consent_requests (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
origin_channel TEXT NOT NULL,
origin_recipient TEXT NOT NULL,
requesting_channel TEXT NOT NULL,
requesting_recipient TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
expires_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_consent_session ON consent_requests(session_id);
CREATE TABLE IF NOT EXISTS installed_themes (
id TEXT PRIMARY KEY,
installed_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"#;
const EMBEDDED_SCHEMA_VERSION: i64 = 27;
pub fn initialize_db(db: &Database) -> Result<()> {
{
let conn = db.conn();
conn.execute_batch(SCHEMA_SQL)
.map_err(|e| RoboticusError::Database(format!("schema init failed: {e}")))?;
let version_exists: bool = conn
.query_row("SELECT COUNT(*) FROM schema_version", [], |row| {
row.get::<_, i64>(0)
})
.map(|c| c > 0)
.db_err()?;
if !version_exists {
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
[EMBEDDED_SCHEMA_VERSION],
)
.db_err()?;
}
}
run_migrations(db)?;
ensure_optional_columns(db)?;
crate::hippocampus::bootstrap_hippocampus(db)?;
Ok(())
}
fn has_column(conn: &rusqlite::Connection, table: &str, column: &str) -> Result<bool> {
let mut stmt = conn
.prepare(&format!(
"PRAGMA table_info(\"{}\")",
table.replace('"', "\"\"")
))
.db_err()?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1)).db_err()?;
for col in rows {
if col.db_err()? == column {
return Ok(true);
}
}
Ok(false)
}
fn has_table(conn: &rusqlite::Connection, table: &str) -> Result<bool> {
conn.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)",
[table],
|row| row.get::<_, i64>(0),
)
.map(|exists| exists != 0)
.db_err()
}
fn ensure_optional_columns(db: &Database) -> Result<()> {
let conn = db.conn();
if has_table(&conn, "pipeline_traces")? {
if !has_column(&conn, "pipeline_traces", "session_id")? {
conn.execute(
"ALTER TABLE pipeline_traces ADD COLUMN session_id TEXT NOT NULL DEFAULT ''",
[],
)
.db_err()?;
}
if !has_column(&conn, "pipeline_traces", "react_trace_json")? {
conn.execute(
"ALTER TABLE pipeline_traces ADD COLUMN react_trace_json TEXT",
[],
)
.db_err()?;
}
if has_column(&conn, "pipeline_traces", "session_id")? {
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_pipeline_traces_session ON pipeline_traces(session_id)",
[],
)
.db_err()?;
}
if !has_column(&conn, "pipeline_traces", "inference_params_json")? {
conn.execute(
"ALTER TABLE pipeline_traces ADD COLUMN inference_params_json TEXT",
[],
)
.db_err()?;
}
}
if !has_column(&conn, "skills", "risk_level")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN risk_level TEXT NOT NULL DEFAULT 'Caution'",
[],
)
.db_err()?;
}
if !has_column(&conn, "tool_calls", "skill_id")? {
conn.execute("ALTER TABLE tool_calls ADD COLUMN skill_id TEXT", [])
.db_err()?;
}
if !has_column(&conn, "tool_calls", "skill_name")? {
conn.execute("ALTER TABLE tool_calls ADD COLUMN skill_name TEXT", [])
.db_err()?;
}
if !has_column(&conn, "tool_calls", "skill_hash")? {
conn.execute("ALTER TABLE tool_calls ADD COLUMN skill_hash TEXT", [])
.db_err()?;
}
if !has_column(&conn, "skills", "usage_count")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN usage_count INTEGER NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "skills", "last_used_at")? {
conn.execute("ALTER TABLE skills ADD COLUMN last_used_at TEXT", [])
.db_err()?;
}
if !has_column(&conn, "delivery_queue", "idempotency_key")? {
conn.execute(
"ALTER TABLE delivery_queue ADD COLUMN idempotency_key TEXT NOT NULL DEFAULT ''",
[],
)
.db_err()?;
conn.execute(
"UPDATE delivery_queue SET idempotency_key = id WHERE idempotency_key = ''",
[],
)
.db_err()?;
}
if !has_column(&conn, "inference_costs", "latency_ms")? {
conn.execute(
"ALTER TABLE inference_costs ADD COLUMN latency_ms INTEGER",
[],
)
.db_err()?;
}
if !has_column(&conn, "inference_costs", "quality_score")? {
conn.execute(
"ALTER TABLE inference_costs ADD COLUMN quality_score REAL",
[],
)
.db_err()?;
}
if !has_column(&conn, "inference_costs", "escalation")? {
conn.execute(
"ALTER TABLE inference_costs ADD COLUMN escalation INTEGER NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "hippocampus", "access_level")? {
conn.execute(
"ALTER TABLE hippocampus ADD COLUMN access_level TEXT NOT NULL DEFAULT 'internal'",
[],
)
.db_err()?;
}
if !has_column(&conn, "hippocampus", "row_count")? {
conn.execute(
"ALTER TABLE hippocampus ADD COLUMN row_count INTEGER NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "sub_agents", "fallback_models_json")? {
conn.execute(
"ALTER TABLE sub_agents ADD COLUMN fallback_models_json TEXT NOT NULL DEFAULT '[]'",
[],
)
.db_err()?;
}
if !has_column(&conn, "sub_agents", "last_used_at")? {
conn.execute("ALTER TABLE sub_agents ADD COLUMN last_used_at TEXT", [])
.db_err()?;
}
if !has_column(&conn, "model_selection_events", "schema_version")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN schema_version INTEGER NOT NULL DEFAULT 1",
[],
)
.db_err()?;
}
if !has_column(&conn, "model_selection_events", "attribution")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN attribution TEXT",
[],
)
.db_err()?;
}
if !has_column(&conn, "model_selection_events", "metascore_json")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN metascore_json TEXT",
[],
)
.db_err()?;
}
if !has_column(&conn, "model_selection_events", "features_json")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN features_json TEXT",
[],
)
.db_err()?;
}
if !has_column(&conn, "inference_costs", "turn_id")? {
conn.execute("ALTER TABLE inference_costs ADD COLUMN turn_id TEXT", [])
.db_err()?;
}
if has_column(&conn, "inference_costs", "turn_id")? {
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_inference_costs_turn ON inference_costs(turn_id)",
[],
)
.db_err()?;
}
if !has_column(&conn, "cron_runs", "output_text")? {
conn.execute("ALTER TABLE cron_runs ADD COLUMN output_text TEXT", [])
.db_err()?;
}
if !has_column(&conn, "episodic_memory", "owner_id")? {
conn.execute("ALTER TABLE episodic_memory ADD COLUMN owner_id TEXT", [])
.db_err()?;
}
if !has_column(&conn, "episodic_memory", "memory_state")? {
conn.execute(
"ALTER TABLE episodic_memory ADD COLUMN memory_state TEXT NOT NULL DEFAULT 'active'",
[],
)
.db_err()?;
}
if !has_column(&conn, "episodic_memory", "state_reason")? {
conn.execute(
"ALTER TABLE episodic_memory ADD COLUMN state_reason TEXT",
[],
)
.db_err()?;
}
if !has_column(&conn, "semantic_memory", "memory_state")? {
conn.execute(
"ALTER TABLE semantic_memory ADD COLUMN memory_state TEXT NOT NULL DEFAULT 'active'",
[],
)
.db_err()?;
}
if !has_column(&conn, "semantic_memory", "state_reason")? {
conn.execute(
"ALTER TABLE semantic_memory ADD COLUMN state_reason TEXT",
[],
)
.db_err()?;
}
conn.execute(
"UPDATE relationship_memory
SET interaction_count = 1
WHERE COALESCE(interaction_count, 0) <= 0",
[],
)
.db_err()?;
if !has_column(&conn, "revenue_opportunities", "attributable_costs_usdc")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN attributable_costs_usdc REAL NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "net_profit_usdc")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN net_profit_usdc REAL",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "tax_rate")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN tax_rate REAL NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "tax_amount_usdc")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN tax_amount_usdc REAL NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "retained_earnings_usdc")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN retained_earnings_usdc REAL",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "tax_destination_wallet")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN tax_destination_wallet TEXT",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "confidence_score")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN confidence_score REAL NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "effort_score")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN effort_score REAL NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "risk_score")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN risk_score REAL NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "priority_score")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN priority_score REAL NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "recommended_approved")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN recommended_approved INTEGER NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "score_reason")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN score_reason TEXT",
[],
)
.db_err()?;
}
if !has_column(&conn, "revenue_opportunities", "settled_at")? {
conn.execute(
"ALTER TABLE revenue_opportunities ADD COLUMN settled_at TEXT",
[],
)
.db_err()?;
}
if !has_column(&conn, "skills", "version")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN version TEXT NOT NULL DEFAULT '0.0.0'",
[],
)
.db_err()?;
}
if !has_column(&conn, "skills", "author")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN author TEXT NOT NULL DEFAULT 'local'",
[],
)
.db_err()?;
}
if !has_column(&conn, "skills", "registry_source")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN registry_source TEXT NOT NULL DEFAULT 'local'",
[],
)
.db_err()?;
}
conn.execute_batch("DROP TABLE IF EXISTS proxy_stats;")
.db_err()?;
if has_column(&conn, "embeddings", "embedding_json")? {
let migrated: usize = conn
.execute(
"UPDATE embeddings SET embedding_blob = NULL \
WHERE (embedding_blob IS NULL OR length(embedding_blob) = 0) \
AND embedding_json = ''",
[],
)
.unwrap_or(0);
let _ = conn.execute_batch("ALTER TABLE embeddings DROP COLUMN embedding_json;");
if migrated > 0 {
tracing::info!(count = migrated, "DF-15: cleaned empty embedding rows");
}
}
if has_table(&conn, "session_messages")? && !has_column(&conn, "session_messages", "topic_tag")?
{
conn.execute("ALTER TABLE session_messages ADD COLUMN topic_tag TEXT", [])
.db_err()?;
}
if !has_table(&conn, "memory_index")? {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS memory_index (
id TEXT PRIMARY KEY,
summary TEXT NOT NULL,
source_table TEXT NOT NULL,
source_id TEXT NOT NULL,
category TEXT,
last_verified TEXT,
confidence REAL NOT NULL DEFAULT 1.0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_memory_index_source ON memory_index(source_table, source_id);
CREATE INDEX IF NOT EXISTS idx_memory_index_confidence ON memory_index(confidence DESC);",
)
.db_err()?;
}
if has_table(&conn, "sessions")? && !has_column(&conn, "sessions", "non_interactive")? {
conn.execute(
"ALTER TABLE sessions ADD COLUMN non_interactive INTEGER NOT NULL DEFAULT 0",
[],
)
.db_err()?;
}
let needs_backfill = has_table(&conn, "memory_index")?
&& conn
.query_row(
"SELECT COUNT(*) FROM memory_index WHERE id = '__backfill_v0113__'",
[],
|r| r.get::<_, i64>(0),
)
.unwrap_or(0)
== 0;
let pending_count: i64 = if needs_backfill {
[
"SELECT COUNT(*) FROM episodic_memory WHERE id NOT IN (SELECT source_id FROM memory_index WHERE source_table = 'episodic_memory')",
"SELECT COUNT(*) FROM semantic_memory WHERE id NOT IN (SELECT source_id FROM memory_index WHERE source_table = 'semantic_memory')",
"SELECT COUNT(*) FROM procedural_memory WHERE id NOT IN (SELECT source_id FROM memory_index WHERE source_table = 'procedural_memory')",
"SELECT COUNT(*) FROM relationship_memory WHERE id NOT IN (SELECT source_id FROM memory_index WHERE source_table = 'relationship_memory')",
"SELECT COUNT(*) FROM learned_skills WHERE id NOT IN (SELECT source_id FROM memory_index WHERE source_table = 'learned_skills')",
]
.iter()
.map(|q| conn.query_row(q, [], |r| r.get::<_, i64>(0)).unwrap_or(0))
.sum()
} else {
0
};
drop(conn);
if needs_backfill {
if pending_count > 0 {
let start = std::time::Instant::now();
eprint!(" [migration] Indexing {pending_count} memories (one-time upgrade)...");
let count =
crate::memory_index::backfill_missing_index_entries(db, 0).unwrap_or_default();
let elapsed = start.elapsed();
eprintln!(" done ({count} indexed, {:.1}s)", elapsed.as_secs_f64());
if count > 0 {
tracing::info!(
count,
"backfilled memory_index entries for existing memories"
);
}
}
let conn = db.conn();
conn.execute(
"INSERT OR IGNORE INTO memory_index \
(id, summary, source_table, source_id, confidence) \
VALUES ('__backfill_v0113__', 'migration sentinel', 'system', 'backfill', 0.0)",
[],
)
.db_err()?;
drop(conn);
}
Ok(())
}
fn migrations_dir() -> Option<std::path::PathBuf> {
std::env::current_dir()
.ok()
.map(|p| p.join("migrations"))
.filter(|p| p.is_dir())
.or_else(|| {
let p = Path::new(env!("CARGO_MANIFEST_DIR")).join("migrations");
if p.is_dir() { Some(p) } else { None }
})
}
pub fn run_migrations(db: &Database) -> Result<()> {
let dir = match migrations_dir() {
Some(d) => d,
None => return Ok(()),
};
let mut entries: Vec<std::path::PathBuf> = std::fs::read_dir(&dir)
.map_err(|e| RoboticusError::Database(format!("read migrations dir: {e}")))?
.filter_map(|e| {
e.inspect_err(|e| tracing::warn!("skipping unreadable migration entry: {e}"))
.ok()
.map(|e| e.path())
})
.filter(|p| p.extension().and_then(|e| e.to_str()) == Some("sql"))
.collect();
entries.sort_by(|a, b| {
let va = version_from_name(a.file_name().and_then(|n| n.to_str()).unwrap_or(""));
let vb = version_from_name(b.file_name().and_then(|n| n.to_str()).unwrap_or(""));
va.cmp(&vb)
});
let conn = db.conn();
let max_version: i64 = conn
.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.db_err()?;
for path in entries {
let version = version_from_name(path.file_name().and_then(|n| n.to_str()).unwrap_or(""));
if version <= max_version {
continue;
}
let sql = std::fs::read_to_string(&path)
.map_err(|e| RoboticusError::Database(format!("read migration {:?}: {e}", path)))?;
let tx = conn.unchecked_transaction().map_err(|e| {
RoboticusError::Database(format!("begin tx for migration {version}: {e}"))
})?;
if version == 13 {
apply_migration_13_idempotent(&tx)
.map_err(|e| RoboticusError::Database(format!("migration {version}: {e}")))?;
} else if version == 22 {
apply_migration_22_idempotent(&tx)
.map_err(|e| RoboticusError::Database(format!("migration {version}: {e}")))?;
} else if version == 31 {
apply_migration_31_idempotent(&tx)
.map_err(|e| RoboticusError::Database(format!("migration {version}: {e}")))?;
} else if version == 32 {
if !has_column(&tx, "approval_requests", "turn_id")? {
tx.execute_batch(sql.trim())
.map_err(|e| RoboticusError::Database(format!("migration {version}: {e}")))?;
}
} else if version == 33 {
if has_table(&tx, "pipeline_traces")?
&& !has_column(&tx, "pipeline_traces", "inference_params_json")?
{
tx.execute_batch(sql.trim())
.map_err(|e| RoboticusError::Database(format!("migration {version}: {e}")))?;
}
} else {
tx.execute_batch(sql.trim())
.map_err(|e| RoboticusError::Database(format!("migration {version}: {e}")))?;
}
tx.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
[version],
)
.db_err()?;
tx.commit()
.map_err(|e| RoboticusError::Database(format!("commit migration {version}: {e}")))?;
}
Ok(())
}
fn apply_migration_13_idempotent(conn: &rusqlite::Transaction<'_>) -> Result<()> {
if !has_column(conn, "model_selection_events", "schema_version")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN schema_version INTEGER NOT NULL DEFAULT 1",
[],
)
.db_err()?;
}
if !has_column(conn, "model_selection_events", "attribution")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN attribution TEXT",
[],
)
.db_err()?;
}
if !has_column(conn, "model_selection_events", "metascore_json")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN metascore_json TEXT",
[],
)
.db_err()?;
}
if !has_column(conn, "model_selection_events", "features_json")? {
conn.execute(
"ALTER TABLE model_selection_events ADD COLUMN features_json TEXT",
[],
)
.db_err()?;
}
if !has_column(conn, "inference_costs", "turn_id")? {
conn.execute("ALTER TABLE inference_costs ADD COLUMN turn_id TEXT", [])
.db_err()?;
}
if has_column(conn, "inference_costs", "turn_id")? {
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_inference_costs_turn ON inference_costs(turn_id)",
[],
)
.db_err()?;
}
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS shadow_routing_predictions (
id TEXT PRIMARY KEY,
turn_id TEXT NOT NULL,
production_model TEXT NOT NULL,
shadow_model TEXT,
production_complexity REAL,
shadow_complexity REAL,
agreed INTEGER NOT NULL DEFAULT 0,
detail_json TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_shadow_routing_created ON shadow_routing_predictions(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_shadow_routing_turn ON shadow_routing_predictions(turn_id);
"#,
)
.db_err()?;
Ok(())
}
fn apply_migration_22_idempotent(conn: &rusqlite::Transaction<'_>) -> Result<()> {
if !has_column(conn, "skills", "version")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN version TEXT NOT NULL DEFAULT '0.0.0'",
[],
)
.db_err()?;
}
if !has_column(conn, "skills", "author")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN author TEXT NOT NULL DEFAULT 'local'",
[],
)
.db_err()?;
}
if !has_column(conn, "skills", "registry_source")? {
conn.execute(
"ALTER TABLE skills ADD COLUMN registry_source TEXT NOT NULL DEFAULT 'local'",
[],
)
.db_err()?;
}
Ok(())
}
fn apply_migration_31_idempotent(conn: &rusqlite::Transaction<'_>) -> Result<()> {
if !has_table(conn, "pipeline_traces")? {
return Ok(());
}
if !has_column(conn, "pipeline_traces", "created_at")? {
conn.execute(
"ALTER TABLE pipeline_traces ADD COLUMN created_at TEXT NOT NULL DEFAULT ''",
[],
)
.db_err()?;
conn.execute(
"UPDATE pipeline_traces SET created_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE created_at = ''",
[],
)
.db_err()?;
}
Ok(())
}
fn version_from_name(name: &str) -> i64 {
name.find('_')
.and_then(|i| name.get(..i))
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0)
}
#[cfg(test)]
pub(crate) fn table_count(db: &Database) -> Result<usize> {
let conn = db.conn();
let count: usize = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type IN ('table', 'view') AND name NOT LIKE 'sqlite_%' AND name NOT LIKE '%_data' AND name NOT LIKE '%_idx' AND name NOT LIKE '%_content' AND name NOT LIKE '%_docsize' AND name NOT LIKE '%_config'",
[],
|row| row.get(0),
)
.db_err()?;
Ok(count)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn schema_creates_all_tables() {
let db = Database::new(":memory:").unwrap();
let count = table_count(&db).unwrap();
assert!(
count == 44 || count == 45,
"expected 44 or 45 user-defined tables, got {count}"
);
}
#[test]
fn schema_idempotent() {
let db = Database::new(":memory:").unwrap();
initialize_db(&db).unwrap();
initialize_db(&db).unwrap();
let count = table_count(&db).unwrap();
assert!(count == 44 || count == 45);
}
#[test]
fn schema_version_inserted() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let version: i64 = conn
.query_row(
"SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1",
[],
|row| row.get(0),
)
.unwrap();
assert!(
version >= EMBEDDED_SCHEMA_VERSION,
"embedded schema seeds at version {EMBEDDED_SCHEMA_VERSION}"
);
}
#[test]
fn wal_mode_enabled() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let mode: String = conn
.query_row("PRAGMA journal_mode", [], |row| row.get(0))
.unwrap();
assert!(mode == "wal" || mode == "memory");
}
#[test]
fn version_from_name_parses_correctly() {
assert_eq!(super::version_from_name("001_initial.sql"), 1);
assert_eq!(super::version_from_name("002_add_indexes.sql"), 2);
assert_eq!(super::version_from_name("010_foo.sql"), 10);
assert_eq!(super::version_from_name("no_underscore.sql"), 0);
}
#[test]
fn run_migrations_applies_in_order() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let versions: Vec<i64> = conn
.prepare("SELECT version FROM schema_version ORDER BY version")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert!(
!versions.is_empty(),
"schema_version should have at least one entry"
);
assert!(
versions[0] >= EMBEDDED_SCHEMA_VERSION,
"embedded schema seeds at version {EMBEDDED_SCHEMA_VERSION}"
);
for w in versions.windows(2) {
assert!(w[1] > w[0], "versions must be strictly increasing");
}
}
#[test]
fn version_from_name_edge_cases() {
assert_eq!(super::version_from_name(""), 0);
assert_eq!(super::version_from_name("_no_number.sql"), 0);
assert_eq!(super::version_from_name("abc_nonnumeric.sql"), 0);
assert_eq!(super::version_from_name("999_big.sql"), 999);
}
#[test]
fn initialize_db_creates_version_row() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM schema_version WHERE version >= ?1",
[EMBEDDED_SCHEMA_VERSION],
|row| row.get(0),
)
.unwrap();
assert!(
count >= 1,
"embedded schema should seed at least version {EMBEDDED_SCHEMA_VERSION}"
);
}
#[test]
fn run_migrations_no_dir_is_noop() {
let db = Database::new(":memory:").unwrap();
run_migrations(&db).unwrap();
}
#[test]
fn migrations_dir_returns_option() {
let result = migrations_dir();
if let Some(path) = result {
assert!(path.is_dir());
}
}
#[test]
fn fts_table_exists() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let exists: bool = conn
.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE name = 'memory_fts' AND type = 'table'",
)
.unwrap()
.query_row([], |row| {
let count: i64 = row.get(0)?;
Ok(count > 0)
})
.unwrap();
assert!(exists, "memory_fts FTS5 table should exist");
}
#[test]
fn has_column_returns_true_for_existing() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
assert!(has_column(&conn, "sessions", "id").unwrap());
assert!(has_column(&conn, "sessions", "agent_id").unwrap());
assert!(has_column(&conn, "sessions", "status").unwrap());
}
#[test]
fn has_column_returns_false_for_missing() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
assert!(!has_column(&conn, "sessions", "nonexistent_col").unwrap());
}
#[test]
fn has_column_returns_false_for_nonexistent_table() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
assert!(!has_column(&conn, "no_such_table", "id").unwrap());
}
#[test]
fn has_table_distinguishes_present_and_missing_tables() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
assert!(has_table(&conn, "sessions").unwrap());
assert!(!has_table(&conn, "no_such_table").unwrap());
}
#[test]
fn has_column_with_quotes_in_table_name() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
assert!(!has_column(&conn, "tab\"le", "id").unwrap());
}
#[test]
fn ensure_optional_columns_idempotent() {
let db = Database::new(":memory:").unwrap();
ensure_optional_columns(&db).unwrap();
let conn = db.conn();
assert!(has_column(&conn, "skills", "risk_level").unwrap());
assert!(has_column(&conn, "tool_calls", "skill_id").unwrap());
assert!(has_column(&conn, "tool_calls", "skill_name").unwrap());
assert!(has_column(&conn, "tool_calls", "skill_hash").unwrap());
assert!(has_column(&conn, "delivery_queue", "idempotency_key").unwrap());
assert!(has_column(&conn, "skills", "version").unwrap());
assert!(has_column(&conn, "skills", "author").unwrap());
assert!(has_column(&conn, "skills", "registry_source").unwrap());
}
#[test]
fn table_count_is_consistent() {
let db = Database::new(":memory:").unwrap();
let c1 = table_count(&db).unwrap();
let c2 = table_count(&db).unwrap();
assert_eq!(c1, c2, "table_count should be deterministic");
}
#[test]
fn schema_indexes_created() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let idx_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'index' AND name LIKE 'idx_%'",
[],
|row| row.get(0),
)
.unwrap();
assert!(
idx_count >= 10,
"expected at least 10 custom indexes, got {idx_count}"
);
}
#[test]
fn schema_triggers_created() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let trigger_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'trigger'",
[],
|row| row.get(0),
)
.unwrap();
assert!(
trigger_count >= 2,
"expected at least 2 triggers (episodic_ai, episodic_ad), got {trigger_count}"
);
}
#[test]
fn episodic_trigger_populates_fts() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
conn.execute(
"INSERT INTO episodic_memory (id, classification, content, importance) VALUES ('e1', 'fact', 'Paris is the capital of France', 5)",
[],
)
.unwrap();
let fts_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memory_fts WHERE source_table = 'episodic' AND source_id = 'e1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
fts_count, 1,
"FTS insert trigger should fire on episodic insert"
);
}
#[test]
fn episodic_delete_trigger_removes_fts() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
conn.execute(
"INSERT INTO episodic_memory (id, classification, content, importance) VALUES ('e2', 'fact', 'test content', 5)",
[],
)
.unwrap();
conn.execute("DELETE FROM episodic_memory WHERE id = 'e2'", [])
.unwrap();
let fts_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memory_fts WHERE source_table = 'episodic' AND source_id = 'e2'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
fts_count, 0,
"FTS delete trigger should fire on episodic delete"
);
}
#[test]
fn fts_search_returns_results() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
conn.execute(
"INSERT INTO episodic_memory (id, classification, content) VALUES ('e3', 'fact', 'Rust is a systems programming language')",
[],
)
.unwrap();
let found: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memory_fts WHERE memory_fts MATCH 'Rust'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(found, 1);
}
#[test]
fn foreign_keys_enabled() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
let fk: i64 = conn
.query_row("PRAGMA foreign_keys", [], |row| row.get(0))
.unwrap();
assert_eq!(fk, 1, "foreign_keys pragma should be ON");
}
#[test]
fn version_from_name_leading_zeros() {
assert_eq!(version_from_name("0001_migration.sql"), 1);
assert_eq!(version_from_name("0100_big.sql"), 100);
}
#[test]
fn schema_version_no_duplicates_on_reinit() {
let db = Database::new(":memory:").unwrap();
initialize_db(&db).unwrap();
let conn = db.conn();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM schema_version WHERE version = ?1",
[EMBEDDED_SCHEMA_VERSION],
|row| row.get(0),
)
.unwrap();
assert_eq!(
count, 1,
"reinitialize should not duplicate the seed version row"
);
}
#[test]
fn ensure_optional_columns_adds_risk_level_when_missing() {
let conn = rusqlite::Connection::open_in_memory().unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
conn.execute_batch(SCHEMA_SQL).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
[EMBEDDED_SCHEMA_VERSION],
)
.unwrap();
let db = Database::new(":memory:").unwrap();
ensure_optional_columns(&db).unwrap();
ensure_optional_columns(&db).unwrap();
let conn = db.conn();
assert!(has_column(&conn, "skills", "risk_level").unwrap());
assert!(has_column(&conn, "tool_calls", "skill_id").unwrap());
assert!(has_column(&conn, "tool_calls", "skill_name").unwrap());
assert!(has_column(&conn, "tool_calls", "skill_hash").unwrap());
assert!(has_column(&conn, "delivery_queue", "idempotency_key").unwrap());
assert!(has_column(&conn, "skills", "version").unwrap());
assert!(has_column(&conn, "skills", "author").unwrap());
assert!(has_column(&conn, "skills", "registry_source").unwrap());
}
#[test]
fn ensure_optional_columns_upgrades_legacy_pipeline_traces_shape() {
let db = Database::new(":memory:").unwrap();
{
let conn = db.conn();
conn.execute_batch(
"DROP TABLE IF EXISTS pipeline_traces;
CREATE TABLE pipeline_traces (
id TEXT PRIMARY KEY,
turn_id TEXT NOT NULL,
channel TEXT NOT NULL,
total_ms INTEGER NOT NULL,
stages_json TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_pipeline_traces_turn ON pipeline_traces(turn_id);
CREATE INDEX idx_pipeline_traces_created ON pipeline_traces(created_at DESC);",
)
.unwrap();
}
ensure_optional_columns(&db).unwrap();
let conn = db.conn();
assert!(has_column(&conn, "pipeline_traces", "session_id").unwrap());
assert!(has_column(&conn, "pipeline_traces", "react_trace_json").unwrap());
let idx_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'index' AND name = 'idx_pipeline_traces_session'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(idx_count, 1);
}
#[test]
fn run_migrations_multiple_times_is_idempotent() {
let db = Database::new(":memory:").unwrap();
run_migrations(&db).unwrap();
run_migrations(&db).unwrap();
let conn = db.conn();
let max_version: i64 = conn
.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.unwrap();
assert!(max_version >= EMBEDDED_SCHEMA_VERSION);
}
#[test]
fn embedded_schema_does_not_fail_when_inference_costs_lacks_turn_id() {
let conn = rusqlite::Connection::open_in_memory().unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT INTO schema_version(version) VALUES (12);
CREATE TABLE IF NOT EXISTS inference_costs (
id TEXT PRIMARY KEY,
model TEXT NOT NULL,
provider TEXT NOT NULL,
tokens_in INTEGER NOT NULL,
tokens_out INTEGER NOT NULL,
cost REAL NOT NULL,
tier TEXT,
cached INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER,
quality_score REAL,
escalation INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_inference_costs_time ON inference_costs(created_at DESC);
"#,
)
.unwrap();
conn.execute_batch(SCHEMA_SQL).unwrap();
}
#[test]
fn migration_13_is_idempotent_when_columns_already_exist() {
let conn = rusqlite::Connection::open_in_memory().unwrap();
conn.execute_batch(SCHEMA_SQL).unwrap();
let tx = conn.unchecked_transaction().unwrap();
apply_migration_13_idempotent(&tx).unwrap();
apply_migration_13_idempotent(&tx).unwrap();
let idx_count: i64 = tx
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_inference_costs_turn'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(idx_count, 1);
}
#[test]
fn version_from_name_no_underscore_returns_zero() {
assert_eq!(version_from_name("noseparator"), 0);
assert_eq!(version_from_name("noseparator.sql"), 0);
}
#[test]
fn version_from_name_various_formats() {
assert_eq!(version_from_name("42_answer.sql"), 42);
assert_eq!(version_from_name("0_zero.sql"), 0);
assert_eq!(version_from_name("9999_huge.sql"), 9999);
}
#[test]
fn initialize_db_then_query_all_tables() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
conn.execute(
"INSERT INTO sessions (id, agent_id, scope_key) VALUES ('s1', 'a1', 'agent')",
[],
)
.unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1);
conn.execute(
"INSERT INTO working_memory (id, session_id, entry_type, content) VALUES ('w1', 's1', 'note', 'test')",
[],
).unwrap();
conn.execute(
"INSERT INTO episodic_memory (id, classification, content) VALUES ('e1', 'event', 'something happened')",
[],
).unwrap();
let fts_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memory_fts WHERE source_table = 'episodic'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(fts_count, 1);
}
}