Skip to main content

deck_store/
sqlite.rs

1//! Plain `SQLite` store, designed to be wrapped by an `age`-encrypted file in
2//! Phase 2 (decrypt-to-tmpfs lifecycle). The schema is intentionally small:
3//! a single `messages` table keyed by `(session_id, seq)`.
4
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, Mutex};
7
8use async_trait::async_trait;
9use deck_core::{DeckError, Message, Result, SessionId, Store};
10use rusqlite::{params, Connection};
11
12#[derive(Debug, Clone)]
13pub struct SqliteStore {
14    conn: Arc<Mutex<Connection>>,
15    path: PathBuf,
16}
17
18impl SqliteStore {
19    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
20        let path = path.as_ref().to_path_buf();
21        if let Some(parent) = path.parent() {
22            std::fs::create_dir_all(parent)?;
23        }
24        let conn = Connection::open(&path).map_err(|e| DeckError::Store(format!("open: {e}")))?;
25        conn.execute_batch(
26            r"
27            PRAGMA journal_mode = WAL;
28            PRAGMA foreign_keys = ON;
29            CREATE TABLE IF NOT EXISTS sessions (
30                id     TEXT PRIMARY KEY,
31                created_at INTEGER NOT NULL DEFAULT (strftime('%s','now'))
32            );
33            CREATE TABLE IF NOT EXISTS messages (
34                session_id TEXT NOT NULL REFERENCES sessions(id),
35                seq        INTEGER NOT NULL,
36                role       TEXT NOT NULL,
37                content    TEXT NOT NULL,
38                tool_calls TEXT NOT NULL DEFAULT '[]',
39                PRIMARY KEY (session_id, seq)
40            );
41            ",
42        )
43        .map_err(|e| DeckError::Store(format!("init schema: {e}")))?;
44        Ok(Self {
45            conn: Arc::new(Mutex::new(conn)),
46            path,
47        })
48    }
49
50    #[must_use]
51    pub fn path(&self) -> &Path {
52        &self.path
53    }
54}
55
56#[async_trait]
57impl Store for SqliteStore {
58    async fn append(&self, session: SessionId, msg: &Message) -> Result<()> {
59        let role = msg.role.as_wire_str();
60        let tool_calls = serde_json::to_string(&msg.tool_calls)?;
61        let content = msg.content.clone();
62        let session_str = session.to_string();
63        let conn = self.conn.clone();
64        tokio::task::spawn_blocking(move || -> Result<()> {
65            let mut conn = conn
66                .lock()
67                .map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
68            // Atomic INSERT-session + SELECT MAX(seq) + INSERT-message so
69            // two concurrent appends on the same session can never collide
70            // on the (session_id, seq) primary key.
71            let tx = conn
72                .transaction()
73                .map_err(|e| DeckError::Store(format!("begin tx: {e}")))?;
74            tx.execute(
75                "INSERT OR IGNORE INTO sessions(id) VALUES (?1)",
76                params![session_str],
77            )
78            .map_err(|e| DeckError::Store(format!("upsert session: {e}")))?;
79            let next: i64 = tx
80                .query_row(
81                    "SELECT COALESCE(MAX(seq), -1) + 1 FROM messages WHERE session_id = ?1",
82                    params![session_str],
83                    |row| row.get(0),
84                )
85                .map_err(|e| DeckError::Store(format!("next seq: {e}")))?;
86            tx.execute(
87                "INSERT INTO messages(session_id, seq, role, content, tool_calls) VALUES (?1, ?2, ?3, ?4, ?5)",
88                params![session_str, next, role, content, tool_calls],
89            )
90            .map_err(|e| DeckError::Store(format!("insert message: {e}")))?;
91            tx.commit()
92                .map_err(|e| DeckError::Store(format!("commit: {e}")))?;
93            Ok(())
94        })
95        .await
96        .map_err(|e| DeckError::Store(format!("join: {e}")))?
97    }
98
99    async fn load(&self, session: SessionId) -> Result<Vec<Message>> {
100        let session_str = session.to_string();
101        let conn = self.conn.clone();
102        tokio::task::spawn_blocking(move || -> Result<Vec<Message>> {
103            let conn = conn
104                .lock()
105                .map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
106            let mut stmt = conn
107                .prepare(
108                    "SELECT role, content, tool_calls FROM messages WHERE session_id = ?1 ORDER BY seq ASC",
109                )
110                .map_err(|e| DeckError::Store(format!("prepare: {e}")))?;
111            let rows = stmt
112                .query_map(params![session_str], |row| {
113                    let role: String = row.get(0)?;
114                    let content: String = row.get(1)?;
115                    let tool_calls: String = row.get(2)?;
116                    Ok((role, content, tool_calls))
117                })
118                .map_err(|e| DeckError::Store(format!("query: {e}")))?;
119            let mut out = Vec::new();
120            for r in rows {
121                let (role, content, tc) = r.map_err(|e| DeckError::Store(format!("row: {e}")))?;
122                let role = deck_core::Role::from_wire_str(&role).ok_or_else(|| {
123                    DeckError::Store(format!("unknown role variant in store: {role}"))
124                })?;
125                let tool_calls = serde_json::from_str(&tc).map_err(|e| {
126                    DeckError::Store(format!("tool_calls decode in store: {e}"))
127                })?;
128                out.push(Message {
129                    role,
130                    content,
131                    tool_calls,
132                });
133            }
134            Ok(out)
135        })
136        .await
137        .map_err(|e| DeckError::Store(format!("join: {e}")))?
138    }
139
140    async fn list(&self) -> Result<Vec<SessionId>> {
141        let conn = self.conn.clone();
142        tokio::task::spawn_blocking(move || -> Result<Vec<SessionId>> {
143            let conn = conn
144                .lock()
145                .map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
146            let mut stmt = conn
147                .prepare("SELECT id FROM sessions ORDER BY created_at ASC")
148                .map_err(|e| DeckError::Store(format!("prepare: {e}")))?;
149            let rows = stmt
150                .query_map([], |row| row.get::<_, String>(0))
151                .map_err(|e| DeckError::Store(format!("query: {e}")))?;
152            let mut out = Vec::new();
153            for r in rows {
154                let s = r.map_err(|e| DeckError::Store(format!("row: {e}")))?;
155                let uuid = uuid::Uuid::parse_str(&s)
156                    .map_err(|e| DeckError::Store(format!("uuid: {e}")))?;
157                out.push(SessionId(uuid));
158            }
159            Ok(out)
160        })
161        .await
162        .map_err(|e| DeckError::Store(format!("join: {e}")))?
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use deck_core::Role;
170    use tempfile::TempDir;
171
172    #[tokio::test]
173    async fn append_and_load_roundtrip() {
174        let dir = TempDir::new().unwrap();
175        let path = dir.path().join("sessions.db");
176        let store = SqliteStore::open(&path).expect("open");
177        let session = SessionId::new();
178        store
179            .append(
180                session,
181                &Message {
182                    role: Role::User,
183                    content: "hi".into(),
184                    tool_calls: vec![],
185                },
186            )
187            .await
188            .unwrap();
189        let msgs = store.load(session).await.unwrap();
190        assert_eq!(msgs.len(), 1);
191        assert_eq!(msgs[0].content, "hi");
192    }
193}