use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension, params};
use std::path::{Path, PathBuf};
const MIGRATIONS: &[(i64, &str)] = &[
(
1,
r#"
CREATE TABLE IF NOT EXISTS orchestrations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
input_json TEXT,
output_json TEXT,
error TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_orchestrations_status
ON orchestrations(status);
CREATE INDEX IF NOT EXISTS idx_orchestrations_created_at
ON orchestrations(created_at DESC);
"#,
),
(
2,
r#"
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
orchestration_id TEXT NOT NULL REFERENCES orchestrations(id) ON DELETE CASCADE,
sequence INTEGER NOT NULL,
event_type TEXT NOT NULL,
event_data TEXT NOT NULL,
timestamp TEXT NOT NULL,
UNIQUE(orchestration_id, sequence)
);
CREATE INDEX IF NOT EXISTS idx_events_orchestration_sequence
ON events(orchestration_id, sequence);
"#,
),
(
3,
r#"
CREATE TABLE IF NOT EXISTS orchestration_definitions (
name TEXT PRIMARY KEY,
definition_json TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
"#,
),
(
4,
r#"
CREATE TABLE IF NOT EXISTS stores (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
kind TEXT NOT NULL,
sandbox TEXT,
config_json TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_stores_name ON stores(name);
CREATE INDEX IF NOT EXISTS idx_stores_kind ON stores(kind);
"#,
),
(
5,
r#"
CREATE TABLE IF NOT EXISTS objects (
id TEXT PRIMARY KEY,
class TEXT NOT NULL,
object_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'hibernating',
sandbox TEXT,
storage_json TEXT NOT NULL DEFAULT '{}',
idle_timeout_seconds INTEGER NOT NULL DEFAULT 300,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(class, object_id)
);
CREATE INDEX IF NOT EXISTS idx_objects_class ON objects(class);
CREATE INDEX IF NOT EXISTS idx_objects_status ON objects(status);
CREATE INDEX IF NOT EXISTS idx_objects_class_object_id ON objects(class, object_id);
"#,
),
(
6,
r#"
CREATE TABLE IF NOT EXISTS schedules (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
cron TEXT,
fire_at TEXT,
method TEXT NOT NULL,
args_json TEXT NOT NULL DEFAULT '{}',
target_class TEXT,
target_object_id TEXT,
target_orchestration TEXT,
status TEXT NOT NULL DEFAULT 'active',
last_fired_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_schedules_status ON schedules(status);
CREATE INDEX IF NOT EXISTS idx_schedules_fire_at ON schedules(fire_at);
"#,
),
];
#[derive(Debug, Clone)]
pub struct DurableStorage {
db_path: PathBuf,
}
impl DurableStorage {
pub fn new(db_path: PathBuf) -> Result<Self> {
let storage = Self { db_path };
storage.bootstrap()?;
Ok(storage)
}
pub fn open_default() -> Result<Self> {
Self::new(Self::default_db_path())
}
pub fn default_db_path() -> PathBuf {
let base = dirs::home_dir()
.map(|home| home.join(".local/share/agentkernel"))
.unwrap_or_else(|| PathBuf::from("/tmp/agentkernel"));
base.join("durable").join("orchestrations.db")
}
#[allow(dead_code)]
pub fn db_path(&self) -> &Path {
&self.db_path
}
pub fn open_connection(&self) -> Result<Connection> {
let conn = Connection::open(&self.db_path)
.with_context(|| format!("failed to open sqlite db at {}", self.db_path.display()))?;
Self::apply_pragmas(&conn)?;
Ok(conn)
}
fn apply_pragmas(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA wal_autocheckpoint = 1000;
PRAGMA busy_timeout = 5000;
PRAGMA foreign_keys = ON;
"#,
)
.context("failed to apply durable sqlite pragmas")
}
fn bootstrap(&self) -> Result<()> {
if let Some(parent) = self.db_path.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("failed to create storage dir {}", parent.to_string_lossy())
})?;
}
let mut conn = self.open_connection()?;
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TEXT NOT NULL
);
"#,
)
.context("failed to create sqlite migration metadata")?;
let tx = conn
.transaction()
.context("failed to start migration transaction")?;
for (version, sql) in MIGRATIONS {
let existing: Option<i64> = tx
.query_row(
"SELECT version FROM schema_migrations WHERE version = ?1",
[*version],
|row| row.get(0),
)
.optional()
.context("failed to read migration state")?;
if existing.is_some() {
continue;
}
tx.execute_batch(sql)
.with_context(|| format!("failed applying migration {}", version))?;
tx.execute(
"INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
params![*version, chrono::Utc::now().to_rfc3339()],
)
.with_context(|| format!("failed recording migration {}", version))?;
}
tx.commit().context("failed to commit migrations")?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_storage_bootstrap_creates_schema() {
let temp = tempfile::TempDir::new().unwrap();
let db_path = temp.path().join("durable.db");
let storage = DurableStorage::new(db_path.clone()).unwrap();
assert_eq!(storage.db_path(), db_path.as_path());
assert!(db_path.exists());
let conn = storage.open_connection().unwrap();
let table_exists: i64 = conn
.query_row(
"SELECT COUNT(1) FROM sqlite_master WHERE type = 'table' AND name = 'orchestrations'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_exists, 1);
let events_exists: i64 = conn
.query_row(
"SELECT COUNT(1) FROM sqlite_master WHERE type = 'table' AND name = 'events'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(events_exists, 1);
let migration_count: i64 = conn
.query_row("SELECT COUNT(1) FROM schema_migrations", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(migration_count, 6);
}
}