use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use deck_core::{DeckError, Message, Result, SessionId, Store};
use rusqlite::{params, Connection};
#[derive(Debug, Clone)]
pub struct SqliteStore {
conn: Arc<Mutex<Connection>>,
path: PathBuf,
}
impl SqliteStore {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(&path).map_err(|e| DeckError::Store(format!("open: {e}")))?;
conn.execute_batch(
r"
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
created_at INTEGER NOT NULL DEFAULT (strftime('%s','now'))
);
CREATE TABLE IF NOT EXISTS messages (
session_id TEXT NOT NULL REFERENCES sessions(id),
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
tool_calls TEXT NOT NULL DEFAULT '[]',
PRIMARY KEY (session_id, seq)
);
",
)
.map_err(|e| DeckError::Store(format!("init schema: {e}")))?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
path,
})
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
}
#[async_trait]
impl Store for SqliteStore {
async fn append(&self, session: SessionId, msg: &Message) -> Result<()> {
let role = msg.role.as_wire_str();
let tool_calls = serde_json::to_string(&msg.tool_calls)?;
let content = msg.content.clone();
let session_str = session.to_string();
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || -> Result<()> {
let mut conn = conn
.lock()
.map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
let tx = conn
.transaction()
.map_err(|e| DeckError::Store(format!("begin tx: {e}")))?;
tx.execute(
"INSERT OR IGNORE INTO sessions(id) VALUES (?1)",
params![session_str],
)
.map_err(|e| DeckError::Store(format!("upsert session: {e}")))?;
let next: i64 = tx
.query_row(
"SELECT COALESCE(MAX(seq), -1) + 1 FROM messages WHERE session_id = ?1",
params![session_str],
|row| row.get(0),
)
.map_err(|e| DeckError::Store(format!("next seq: {e}")))?;
tx.execute(
"INSERT INTO messages(session_id, seq, role, content, tool_calls) VALUES (?1, ?2, ?3, ?4, ?5)",
params![session_str, next, role, content, tool_calls],
)
.map_err(|e| DeckError::Store(format!("insert message: {e}")))?;
tx.commit()
.map_err(|e| DeckError::Store(format!("commit: {e}")))?;
Ok(())
})
.await
.map_err(|e| DeckError::Store(format!("join: {e}")))?
}
async fn load(&self, session: SessionId) -> Result<Vec<Message>> {
let session_str = session.to_string();
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || -> Result<Vec<Message>> {
let conn = conn
.lock()
.map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
let mut stmt = conn
.prepare(
"SELECT role, content, tool_calls FROM messages WHERE session_id = ?1 ORDER BY seq ASC",
)
.map_err(|e| DeckError::Store(format!("prepare: {e}")))?;
let rows = stmt
.query_map(params![session_str], |row| {
let role: String = row.get(0)?;
let content: String = row.get(1)?;
let tool_calls: String = row.get(2)?;
Ok((role, content, tool_calls))
})
.map_err(|e| DeckError::Store(format!("query: {e}")))?;
let mut out = Vec::new();
for r in rows {
let (role, content, tc) = r.map_err(|e| DeckError::Store(format!("row: {e}")))?;
let role = deck_core::Role::from_wire_str(&role).ok_or_else(|| {
DeckError::Store(format!("unknown role variant in store: {role}"))
})?;
let tool_calls = serde_json::from_str(&tc).map_err(|e| {
DeckError::Store(format!("tool_calls decode in store: {e}"))
})?;
out.push(Message {
role,
content,
tool_calls,
});
}
Ok(out)
})
.await
.map_err(|e| DeckError::Store(format!("join: {e}")))?
}
async fn list(&self) -> Result<Vec<SessionId>> {
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || -> Result<Vec<SessionId>> {
let conn = conn
.lock()
.map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
let mut stmt = conn
.prepare("SELECT id FROM sessions ORDER BY created_at ASC")
.map_err(|e| DeckError::Store(format!("prepare: {e}")))?;
let rows = stmt
.query_map([], |row| row.get::<_, String>(0))
.map_err(|e| DeckError::Store(format!("query: {e}")))?;
let mut out = Vec::new();
for r in rows {
let s = r.map_err(|e| DeckError::Store(format!("row: {e}")))?;
let uuid = uuid::Uuid::parse_str(&s)
.map_err(|e| DeckError::Store(format!("uuid: {e}")))?;
out.push(SessionId(uuid));
}
Ok(out)
})
.await
.map_err(|e| DeckError::Store(format!("join: {e}")))?
}
}
#[cfg(test)]
mod tests {
use super::*;
use deck_core::Role;
use tempfile::TempDir;
#[tokio::test]
async fn append_and_load_roundtrip() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("sessions.db");
let store = SqliteStore::open(&path).expect("open");
let session = SessionId::new();
store
.append(
session,
&Message {
role: Role::User,
content: "hi".into(),
tool_calls: vec![],
},
)
.await
.unwrap();
let msgs = store.load(session).await.unwrap();
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "hi");
}
}