1use std::path::{Path, PathBuf};
6use std::sync::{Arc, Mutex};
7
8use async_trait::async_trait;
9use deck_core::{DeckError, Message, Result, SessionId, Store};
10use rusqlite::{params, Connection};
11
12#[derive(Debug, Clone)]
13pub struct SqliteStore {
14 conn: Arc<Mutex<Connection>>,
15 path: PathBuf,
16}
17
18impl SqliteStore {
19 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
20 let path = path.as_ref().to_path_buf();
21 if let Some(parent) = path.parent() {
22 std::fs::create_dir_all(parent)?;
23 }
24 let conn = Connection::open(&path).map_err(|e| DeckError::Store(format!("open: {e}")))?;
25 conn.execute_batch(
26 r"
27 PRAGMA journal_mode = WAL;
28 PRAGMA foreign_keys = ON;
29 CREATE TABLE IF NOT EXISTS sessions (
30 id TEXT PRIMARY KEY,
31 created_at INTEGER NOT NULL DEFAULT (strftime('%s','now'))
32 );
33 CREATE TABLE IF NOT EXISTS messages (
34 session_id TEXT NOT NULL REFERENCES sessions(id),
35 seq INTEGER NOT NULL,
36 role TEXT NOT NULL,
37 content TEXT NOT NULL,
38 tool_calls TEXT NOT NULL DEFAULT '[]',
39 PRIMARY KEY (session_id, seq)
40 );
41 ",
42 )
43 .map_err(|e| DeckError::Store(format!("init schema: {e}")))?;
44 Ok(Self {
45 conn: Arc::new(Mutex::new(conn)),
46 path,
47 })
48 }
49
50 #[must_use]
51 pub fn path(&self) -> &Path {
52 &self.path
53 }
54}
55
56#[async_trait]
57impl Store for SqliteStore {
58 async fn append(&self, session: SessionId, msg: &Message) -> Result<()> {
59 let role = msg.role.as_wire_str();
60 let tool_calls = serde_json::to_string(&msg.tool_calls)?;
61 let content = msg.content.clone();
62 let session_str = session.to_string();
63 let conn = self.conn.clone();
64 tokio::task::spawn_blocking(move || -> Result<()> {
65 let mut conn = conn
66 .lock()
67 .map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
68 let tx = conn
72 .transaction()
73 .map_err(|e| DeckError::Store(format!("begin tx: {e}")))?;
74 tx.execute(
75 "INSERT OR IGNORE INTO sessions(id) VALUES (?1)",
76 params![session_str],
77 )
78 .map_err(|e| DeckError::Store(format!("upsert session: {e}")))?;
79 let next: i64 = tx
80 .query_row(
81 "SELECT COALESCE(MAX(seq), -1) + 1 FROM messages WHERE session_id = ?1",
82 params![session_str],
83 |row| row.get(0),
84 )
85 .map_err(|e| DeckError::Store(format!("next seq: {e}")))?;
86 tx.execute(
87 "INSERT INTO messages(session_id, seq, role, content, tool_calls) VALUES (?1, ?2, ?3, ?4, ?5)",
88 params![session_str, next, role, content, tool_calls],
89 )
90 .map_err(|e| DeckError::Store(format!("insert message: {e}")))?;
91 tx.commit()
92 .map_err(|e| DeckError::Store(format!("commit: {e}")))?;
93 Ok(())
94 })
95 .await
96 .map_err(|e| DeckError::Store(format!("join: {e}")))?
97 }
98
99 async fn load(&self, session: SessionId) -> Result<Vec<Message>> {
100 let session_str = session.to_string();
101 let conn = self.conn.clone();
102 tokio::task::spawn_blocking(move || -> Result<Vec<Message>> {
103 let conn = conn
104 .lock()
105 .map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
106 let mut stmt = conn
107 .prepare(
108 "SELECT role, content, tool_calls FROM messages WHERE session_id = ?1 ORDER BY seq ASC",
109 )
110 .map_err(|e| DeckError::Store(format!("prepare: {e}")))?;
111 let rows = stmt
112 .query_map(params![session_str], |row| {
113 let role: String = row.get(0)?;
114 let content: String = row.get(1)?;
115 let tool_calls: String = row.get(2)?;
116 Ok((role, content, tool_calls))
117 })
118 .map_err(|e| DeckError::Store(format!("query: {e}")))?;
119 let mut out = Vec::new();
120 for r in rows {
121 let (role, content, tc) = r.map_err(|e| DeckError::Store(format!("row: {e}")))?;
122 let role = deck_core::Role::from_wire_str(&role).ok_or_else(|| {
123 DeckError::Store(format!("unknown role variant in store: {role}"))
124 })?;
125 let tool_calls = serde_json::from_str(&tc).map_err(|e| {
126 DeckError::Store(format!("tool_calls decode in store: {e}"))
127 })?;
128 out.push(Message {
129 role,
130 content,
131 tool_calls,
132 });
133 }
134 Ok(out)
135 })
136 .await
137 .map_err(|e| DeckError::Store(format!("join: {e}")))?
138 }
139
140 async fn list(&self) -> Result<Vec<SessionId>> {
141 let conn = self.conn.clone();
142 tokio::task::spawn_blocking(move || -> Result<Vec<SessionId>> {
143 let conn = conn
144 .lock()
145 .map_err(|e| DeckError::Store(format!("mutex poisoned: {e}")))?;
146 let mut stmt = conn
147 .prepare("SELECT id FROM sessions ORDER BY created_at ASC")
148 .map_err(|e| DeckError::Store(format!("prepare: {e}")))?;
149 let rows = stmt
150 .query_map([], |row| row.get::<_, String>(0))
151 .map_err(|e| DeckError::Store(format!("query: {e}")))?;
152 let mut out = Vec::new();
153 for r in rows {
154 let s = r.map_err(|e| DeckError::Store(format!("row: {e}")))?;
155 let uuid = uuid::Uuid::parse_str(&s)
156 .map_err(|e| DeckError::Store(format!("uuid: {e}")))?;
157 out.push(SessionId(uuid));
158 }
159 Ok(out)
160 })
161 .await
162 .map_err(|e| DeckError::Store(format!("join: {e}")))?
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use deck_core::Role;
170 use tempfile::TempDir;
171
172 #[tokio::test]
173 async fn append_and_load_roundtrip() {
174 let dir = TempDir::new().unwrap();
175 let path = dir.path().join("sessions.db");
176 let store = SqliteStore::open(&path).expect("open");
177 let session = SessionId::new();
178 store
179 .append(
180 session,
181 &Message {
182 role: Role::User,
183 content: "hi".into(),
184 tool_calls: vec![],
185 },
186 )
187 .await
188 .unwrap();
189 let msgs = store.load(session).await.unwrap();
190 assert_eq!(msgs.len(), 1);
191 assert_eq!(msgs[0].content, "hi");
192 }
193}