1use std::sync::{Arc, Mutex};
2
3use async_trait::async_trait;
4use rusqlite::Connection;
5use serde_json::Value;
6use synaptic_core::{encode_namespace, now_iso, Item, SynapticError};
7
8#[derive(Debug, Clone)]
10pub struct SqliteStoreConfig {
11 pub path: String,
13}
14
15impl SqliteStoreConfig {
16 pub fn new(path: impl Into<String>) -> Self {
18 Self { path: path.into() }
19 }
20
21 pub fn in_memory() -> Self {
23 Self {
24 path: ":memory:".to_string(),
25 }
26 }
27}
28
29pub struct SqliteStore {
35 conn: Arc<Mutex<Connection>>,
36}
37
38impl SqliteStore {
39 pub fn new(config: SqliteStoreConfig) -> Result<Self, SynapticError> {
44 let conn = Connection::open(&config.path)
45 .map_err(|e| SynapticError::Store(format!("SQLite open error: {e}")))?;
46
47 conn.execute_batch(
48 "CREATE TABLE IF NOT EXISTS synaptic_store (
49 namespace TEXT NOT NULL,
50 key TEXT NOT NULL,
51 value TEXT NOT NULL,
52 created_at TEXT NOT NULL,
53 updated_at TEXT NOT NULL,
54 PRIMARY KEY (namespace, key)
55 );
56 CREATE VIRTUAL TABLE IF NOT EXISTS synaptic_store_fts USING fts5(
57 key, value, namespace UNINDEXED
58 );",
59 )
60 .map_err(|e| SynapticError::Store(format!("SQLite create table error: {e}")))?;
61
62 Ok(Self {
63 conn: Arc::new(Mutex::new(conn)),
64 })
65 }
66}
67
68type RowTuple = (String, String, String, String, String);
69
70fn collect_rows(
72 stmt: &mut rusqlite::Statement<'_>,
73 params: &[&dyn rusqlite::types::ToSql],
74) -> Result<Vec<RowTuple>, SynapticError> {
75 let rows = stmt
76 .query_map(params, |row| {
77 Ok((
78 row.get::<_, String>(0)?,
79 row.get::<_, String>(1)?,
80 row.get::<_, String>(2)?,
81 row.get::<_, String>(3)?,
82 row.get::<_, String>(4)?,
83 ))
84 })
85 .map_err(|e| SynapticError::Store(format!("SQLite query error: {e}")))?
86 .collect::<Result<Vec<_>, _>>()
87 .map_err(|e| SynapticError::Store(format!("SQLite row error: {e}")))?;
88 Ok(rows)
89}
90
91fn rows_to_items(rows: Vec<RowTuple>) -> Vec<Item> {
92 rows.into_iter()
93 .map(|(ns_str, k, val_str, created_at, updated_at)| {
94 let value: Value = serde_json::from_str(&val_str).unwrap_or(Value::Null);
95 Item {
96 namespace: ns_str.split(':').map(String::from).collect(),
97 key: k,
98 value,
99 created_at,
100 updated_at,
101 score: None,
102 }
103 })
104 .collect()
105}
106
107#[async_trait]
108impl synaptic_core::Store for SqliteStore {
109 async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
110 let conn = self.conn.clone();
111 let ns = encode_namespace(namespace);
112 let key = key.to_string();
113
114 tokio::task::spawn_blocking(move || {
115 let conn = conn
116 .lock()
117 .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
118
119 let mut stmt = conn
120 .prepare(
121 "SELECT namespace, key, value, created_at, updated_at
122 FROM synaptic_store WHERE namespace = ?1 AND key = ?2",
123 )
124 .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
125
126 let result = stmt.query_row(rusqlite::params![ns, key], |row| {
127 Ok((
128 row.get::<_, String>(0)?,
129 row.get::<_, String>(1)?,
130 row.get::<_, String>(2)?,
131 row.get::<_, String>(3)?,
132 row.get::<_, String>(4)?,
133 ))
134 });
135
136 match result {
137 Ok((ns_str, k, val_str, created_at, updated_at)) => {
138 let value: Value = serde_json::from_str(&val_str).map_err(|e| {
139 SynapticError::Store(format!("JSON deserialize error: {e}"))
140 })?;
141 Ok(Some(Item {
142 namespace: ns_str.split(':').map(String::from).collect(),
143 key: k,
144 value,
145 created_at,
146 updated_at,
147 score: None,
148 }))
149 }
150 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
151 Err(e) => Err(SynapticError::Store(format!("SQLite query error: {e}"))),
152 }
153 })
154 .await
155 .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
156 }
157
158 async fn search(
159 &self,
160 namespace: &[&str],
161 query: Option<&str>,
162 limit: usize,
163 ) -> Result<Vec<Item>, SynapticError> {
164 let conn = self.conn.clone();
165 let ns = encode_namespace(namespace);
166 let query = query.map(String::from);
167 let limit = limit as i64;
168
169 tokio::task::spawn_blocking(move || {
170 let conn = conn
171 .lock()
172 .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
173
174 let rows: Vec<RowTuple> = match &query {
175 Some(q) => {
176 let fts_result: Result<Vec<RowTuple>, rusqlite::Error> = (|| {
178 let mut stmt = conn.prepare(
179 "SELECT s.namespace, s.key, s.value, s.created_at, s.updated_at
180 FROM synaptic_store s
181 JOIN synaptic_store_fts f ON s.namespace = f.namespace AND s.key = f.key
182 WHERE f.namespace = ?1 AND synaptic_store_fts MATCH ?2
183 LIMIT ?3",
184 )?;
185 let rows = stmt
186 .query_map(rusqlite::params![ns, q, limit], |row| {
187 Ok((
188 row.get::<_, String>(0)?,
189 row.get::<_, String>(1)?,
190 row.get::<_, String>(2)?,
191 row.get::<_, String>(3)?,
192 row.get::<_, String>(4)?,
193 ))
194 })?
195 .collect::<Result<Vec<_>, _>>()?;
196 Ok(rows)
197 })();
198
199 match fts_result {
200 Ok(rows) => rows,
201 Err(_) => {
202 let like_pattern = format!("%{q}%");
204 let mut stmt = conn
205 .prepare(
206 "SELECT namespace, key, value, created_at, updated_at
207 FROM synaptic_store
208 WHERE namespace = ?1 AND (key LIKE ?2 OR value LIKE ?2)
209 LIMIT ?3",
210 )
211 .map_err(|e| {
212 SynapticError::Store(format!("SQLite prepare error: {e}"))
213 })?;
214 collect_rows(
215 &mut stmt,
216 &[
217 &ns as &dyn rusqlite::types::ToSql,
218 &like_pattern,
219 &limit,
220 ],
221 )?
222 }
223 }
224 }
225 None => {
226 let mut stmt = conn
227 .prepare(
228 "SELECT namespace, key, value, created_at, updated_at
229 FROM synaptic_store WHERE namespace = ?1 LIMIT ?2",
230 )
231 .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
232 collect_rows(
233 &mut stmt,
234 &[&ns as &dyn rusqlite::types::ToSql, &limit],
235 )?
236 }
237 };
238
239 Ok(rows_to_items(rows))
240 })
241 .await
242 .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
243 }
244
245 async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
246 let conn = self.conn.clone();
247 let ns = encode_namespace(namespace);
248 let key = key.to_string();
249 let value_str = serde_json::to_string(&value)
250 .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
251
252 tokio::task::spawn_blocking(move || {
253 let conn = conn
254 .lock()
255 .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
256
257 let existing_created_at: Option<String> = conn
259 .prepare("SELECT created_at FROM synaptic_store WHERE namespace = ?1 AND key = ?2")
260 .and_then(|mut stmt| stmt.query_row(rusqlite::params![ns, key], |row| row.get(0)))
261 .ok();
262
263 let now = now_iso();
264 let created_at = existing_created_at.unwrap_or_else(|| now.clone());
265
266 conn.execute(
267 "INSERT OR REPLACE INTO synaptic_store (namespace, key, value, created_at, updated_at)
268 VALUES (?1, ?2, ?3, ?4, ?5)",
269 rusqlite::params![ns, key, value_str, created_at, now],
270 )
271 .map_err(|e| SynapticError::Store(format!("SQLite insert error: {e}")))?;
272
273 conn.execute(
275 "DELETE FROM synaptic_store_fts WHERE namespace = ?1 AND key = ?2",
276 rusqlite::params![ns, key],
277 )
278 .map_err(|e| SynapticError::Store(format!("SQLite FTS delete error: {e}")))?;
279
280 conn.execute(
281 "INSERT INTO synaptic_store_fts (key, value, namespace) VALUES (?1, ?2, ?3)",
282 rusqlite::params![key, value_str, ns],
283 )
284 .map_err(|e| SynapticError::Store(format!("SQLite FTS insert error: {e}")))?;
285
286 Ok(())
287 })
288 .await
289 .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
290 }
291
292 async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
293 let conn = self.conn.clone();
294 let ns = encode_namespace(namespace);
295 let key = key.to_string();
296
297 tokio::task::spawn_blocking(move || {
298 let conn = conn
299 .lock()
300 .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
301
302 conn.execute(
303 "DELETE FROM synaptic_store WHERE namespace = ?1 AND key = ?2",
304 rusqlite::params![ns, key],
305 )
306 .map_err(|e| SynapticError::Store(format!("SQLite delete error: {e}")))?;
307
308 conn.execute(
309 "DELETE FROM synaptic_store_fts WHERE namespace = ?1 AND key = ?2",
310 rusqlite::params![ns, key],
311 )
312 .map_err(|e| SynapticError::Store(format!("SQLite FTS delete error: {e}")))?;
313
314 Ok(())
315 })
316 .await
317 .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
318 }
319
320 async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
321 let conn = self.conn.clone();
322 let prefix_str = if prefix.is_empty() {
323 String::new()
324 } else {
325 prefix.join(":")
326 };
327
328 tokio::task::spawn_blocking(move || {
329 let conn = conn
330 .lock()
331 .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
332
333 let raw_namespaces: Vec<String> = if prefix_str.is_empty() {
334 let mut stmt = conn
335 .prepare("SELECT DISTINCT namespace FROM synaptic_store")
336 .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
337 let v: Vec<String> = stmt
338 .query_map([], |row| row.get::<_, String>(0))
339 .map_err(|e| SynapticError::Store(format!("SQLite query error: {e}")))?
340 .filter_map(|r| r.ok())
341 .collect();
342 v
343 } else {
344 let like_pattern = format!("{prefix_str}%");
345 let mut stmt = conn
346 .prepare(
347 "SELECT DISTINCT namespace FROM synaptic_store WHERE namespace LIKE ?1",
348 )
349 .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
350 let v: Vec<String> = stmt
351 .query_map(rusqlite::params![like_pattern], |row| {
352 row.get::<_, String>(0)
353 })
354 .map_err(|e| SynapticError::Store(format!("SQLite query error: {e}")))?
355 .filter_map(|r| r.ok())
356 .collect();
357 v
358 };
359
360 let namespaces: Vec<Vec<String>> = raw_namespaces
361 .into_iter()
362 .map(|ns| ns.split(':').map(String::from).collect())
363 .collect();
364
365 Ok(namespaces)
366 })
367 .await
368 .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
369 }
370}