Skip to main content

synaptic_sqlite/
store.rs

1use std::sync::{Arc, Mutex};
2
3use async_trait::async_trait;
4use rusqlite::Connection;
5use serde_json::Value;
6use synaptic_core::{encode_namespace, now_iso, Item, SynapticError};
7
8/// Configuration for [`SqliteStore`].
9#[derive(Debug, Clone)]
10pub struct SqliteStoreConfig {
11    /// Path to the SQLite database file. Use `":memory:"` for an in-memory database.
12    pub path: String,
13}
14
15impl SqliteStoreConfig {
16    /// Create a new configuration with a file path.
17    pub fn new(path: impl Into<String>) -> Self {
18        Self { path: path.into() }
19    }
20
21    /// Create a configuration for an in-memory SQLite database.
22    pub fn in_memory() -> Self {
23        Self {
24            path: ":memory:".to_string(),
25        }
26    }
27}
28
29/// SQLite-backed implementation of the [`Store`](synaptic_core::Store) trait.
30///
31/// Uses a main table for key-value storage and an FTS5 virtual table for
32/// full-text search. The FTS5 table is manually synchronized on `put()` and
33/// `delete()` operations.
34pub struct SqliteStore {
35    conn: Arc<Mutex<Connection>>,
36}
37
38impl SqliteStore {
39    /// Create a new `SqliteStore` from the given configuration.
40    ///
41    /// Opens (or creates) the SQLite database and initializes the store
42    /// and FTS5 tables if they do not already exist.
43    pub fn new(config: SqliteStoreConfig) -> Result<Self, SynapticError> {
44        let conn = Connection::open(&config.path)
45            .map_err(|e| SynapticError::Store(format!("SQLite open error: {e}")))?;
46
47        conn.execute_batch(
48            "CREATE TABLE IF NOT EXISTS synaptic_store (
49                namespace TEXT NOT NULL,
50                key       TEXT NOT NULL,
51                value     TEXT NOT NULL,
52                created_at TEXT NOT NULL,
53                updated_at TEXT NOT NULL,
54                PRIMARY KEY (namespace, key)
55            );
56            CREATE VIRTUAL TABLE IF NOT EXISTS synaptic_store_fts USING fts5(
57                key, value, namespace UNINDEXED
58            );",
59        )
60        .map_err(|e| SynapticError::Store(format!("SQLite create table error: {e}")))?;
61
62        Ok(Self {
63            conn: Arc::new(Mutex::new(conn)),
64        })
65    }
66}
67
68type RowTuple = (String, String, String, String, String);
69
70/// Helper to query rows from a prepared statement.
71fn collect_rows(
72    stmt: &mut rusqlite::Statement<'_>,
73    params: &[&dyn rusqlite::types::ToSql],
74) -> Result<Vec<RowTuple>, SynapticError> {
75    let rows = stmt
76        .query_map(params, |row| {
77            Ok((
78                row.get::<_, String>(0)?,
79                row.get::<_, String>(1)?,
80                row.get::<_, String>(2)?,
81                row.get::<_, String>(3)?,
82                row.get::<_, String>(4)?,
83            ))
84        })
85        .map_err(|e| SynapticError::Store(format!("SQLite query error: {e}")))?
86        .collect::<Result<Vec<_>, _>>()
87        .map_err(|e| SynapticError::Store(format!("SQLite row error: {e}")))?;
88    Ok(rows)
89}
90
91fn rows_to_items(rows: Vec<RowTuple>) -> Vec<Item> {
92    rows.into_iter()
93        .map(|(ns_str, k, val_str, created_at, updated_at)| {
94            let value: Value = serde_json::from_str(&val_str).unwrap_or(Value::Null);
95            Item {
96                namespace: ns_str.split(':').map(String::from).collect(),
97                key: k,
98                value,
99                created_at,
100                updated_at,
101                score: None,
102            }
103        })
104        .collect()
105}
106
107#[async_trait]
108impl synaptic_core::Store for SqliteStore {
109    async fn get(&self, namespace: &[&str], key: &str) -> Result<Option<Item>, SynapticError> {
110        let conn = self.conn.clone();
111        let ns = encode_namespace(namespace);
112        let key = key.to_string();
113
114        tokio::task::spawn_blocking(move || {
115            let conn = conn
116                .lock()
117                .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
118
119            let mut stmt = conn
120                .prepare(
121                    "SELECT namespace, key, value, created_at, updated_at
122                     FROM synaptic_store WHERE namespace = ?1 AND key = ?2",
123                )
124                .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
125
126            let result = stmt.query_row(rusqlite::params![ns, key], |row| {
127                Ok((
128                    row.get::<_, String>(0)?,
129                    row.get::<_, String>(1)?,
130                    row.get::<_, String>(2)?,
131                    row.get::<_, String>(3)?,
132                    row.get::<_, String>(4)?,
133                ))
134            });
135
136            match result {
137                Ok((ns_str, k, val_str, created_at, updated_at)) => {
138                    let value: Value = serde_json::from_str(&val_str).map_err(|e| {
139                        SynapticError::Store(format!("JSON deserialize error: {e}"))
140                    })?;
141                    Ok(Some(Item {
142                        namespace: ns_str.split(':').map(String::from).collect(),
143                        key: k,
144                        value,
145                        created_at,
146                        updated_at,
147                        score: None,
148                    }))
149                }
150                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
151                Err(e) => Err(SynapticError::Store(format!("SQLite query error: {e}"))),
152            }
153        })
154        .await
155        .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
156    }
157
158    async fn search(
159        &self,
160        namespace: &[&str],
161        query: Option<&str>,
162        limit: usize,
163    ) -> Result<Vec<Item>, SynapticError> {
164        let conn = self.conn.clone();
165        let ns = encode_namespace(namespace);
166        let query = query.map(String::from);
167        let limit = limit as i64;
168
169        tokio::task::spawn_blocking(move || {
170            let conn = conn
171                .lock()
172                .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
173
174            let rows: Vec<RowTuple> = match &query {
175                Some(q) => {
176                    // Try FTS5 MATCH first, fall back to LIKE on failure.
177                    let fts_result: Result<Vec<RowTuple>, rusqlite::Error> = (|| {
178                        let mut stmt = conn.prepare(
179                            "SELECT s.namespace, s.key, s.value, s.created_at, s.updated_at
180                             FROM synaptic_store s
181                             JOIN synaptic_store_fts f ON s.namespace = f.namespace AND s.key = f.key
182                             WHERE f.namespace = ?1 AND synaptic_store_fts MATCH ?2
183                             LIMIT ?3",
184                        )?;
185                        let rows = stmt
186                            .query_map(rusqlite::params![ns, q, limit], |row| {
187                                Ok((
188                                    row.get::<_, String>(0)?,
189                                    row.get::<_, String>(1)?,
190                                    row.get::<_, String>(2)?,
191                                    row.get::<_, String>(3)?,
192                                    row.get::<_, String>(4)?,
193                                ))
194                            })?
195                            .collect::<Result<Vec<_>, _>>()?;
196                        Ok(rows)
197                    })();
198
199                    match fts_result {
200                        Ok(rows) => rows,
201                        Err(_) => {
202                            // Fall back to LIKE
203                            let like_pattern = format!("%{q}%");
204                            let mut stmt = conn
205                                .prepare(
206                                    "SELECT namespace, key, value, created_at, updated_at
207                                     FROM synaptic_store
208                                     WHERE namespace = ?1 AND (key LIKE ?2 OR value LIKE ?2)
209                                     LIMIT ?3",
210                                )
211                                .map_err(|e| {
212                                    SynapticError::Store(format!("SQLite prepare error: {e}"))
213                                })?;
214                            collect_rows(
215                                &mut stmt,
216                                &[
217                                    &ns as &dyn rusqlite::types::ToSql,
218                                    &like_pattern,
219                                    &limit,
220                                ],
221                            )?
222                        }
223                    }
224                }
225                None => {
226                    let mut stmt = conn
227                        .prepare(
228                            "SELECT namespace, key, value, created_at, updated_at
229                             FROM synaptic_store WHERE namespace = ?1 LIMIT ?2",
230                        )
231                        .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
232                    collect_rows(
233                        &mut stmt,
234                        &[&ns as &dyn rusqlite::types::ToSql, &limit],
235                    )?
236                }
237            };
238
239            Ok(rows_to_items(rows))
240        })
241        .await
242        .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
243    }
244
245    async fn put(&self, namespace: &[&str], key: &str, value: Value) -> Result<(), SynapticError> {
246        let conn = self.conn.clone();
247        let ns = encode_namespace(namespace);
248        let key = key.to_string();
249        let value_str = serde_json::to_string(&value)
250            .map_err(|e| SynapticError::Store(format!("JSON serialize error: {e}")))?;
251
252        tokio::task::spawn_blocking(move || {
253            let conn = conn
254                .lock()
255                .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
256
257            // Check existing to preserve created_at
258            let existing_created_at: Option<String> = conn
259                .prepare("SELECT created_at FROM synaptic_store WHERE namespace = ?1 AND key = ?2")
260                .and_then(|mut stmt| stmt.query_row(rusqlite::params![ns, key], |row| row.get(0)))
261                .ok();
262
263            let now = now_iso();
264            let created_at = existing_created_at.unwrap_or_else(|| now.clone());
265
266            conn.execute(
267                "INSERT OR REPLACE INTO synaptic_store (namespace, key, value, created_at, updated_at)
268                 VALUES (?1, ?2, ?3, ?4, ?5)",
269                rusqlite::params![ns, key, value_str, created_at, now],
270            )
271            .map_err(|e| SynapticError::Store(format!("SQLite insert error: {e}")))?;
272
273            // Sync FTS: delete old entry then insert new
274            conn.execute(
275                "DELETE FROM synaptic_store_fts WHERE namespace = ?1 AND key = ?2",
276                rusqlite::params![ns, key],
277            )
278            .map_err(|e| SynapticError::Store(format!("SQLite FTS delete error: {e}")))?;
279
280            conn.execute(
281                "INSERT INTO synaptic_store_fts (key, value, namespace) VALUES (?1, ?2, ?3)",
282                rusqlite::params![key, value_str, ns],
283            )
284            .map_err(|e| SynapticError::Store(format!("SQLite FTS insert error: {e}")))?;
285
286            Ok(())
287        })
288        .await
289        .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
290    }
291
292    async fn delete(&self, namespace: &[&str], key: &str) -> Result<(), SynapticError> {
293        let conn = self.conn.clone();
294        let ns = encode_namespace(namespace);
295        let key = key.to_string();
296
297        tokio::task::spawn_blocking(move || {
298            let conn = conn
299                .lock()
300                .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
301
302            conn.execute(
303                "DELETE FROM synaptic_store WHERE namespace = ?1 AND key = ?2",
304                rusqlite::params![ns, key],
305            )
306            .map_err(|e| SynapticError::Store(format!("SQLite delete error: {e}")))?;
307
308            conn.execute(
309                "DELETE FROM synaptic_store_fts WHERE namespace = ?1 AND key = ?2",
310                rusqlite::params![ns, key],
311            )
312            .map_err(|e| SynapticError::Store(format!("SQLite FTS delete error: {e}")))?;
313
314            Ok(())
315        })
316        .await
317        .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
318    }
319
320    async fn list_namespaces(&self, prefix: &[&str]) -> Result<Vec<Vec<String>>, SynapticError> {
321        let conn = self.conn.clone();
322        let prefix_str = if prefix.is_empty() {
323            String::new()
324        } else {
325            prefix.join(":")
326        };
327
328        tokio::task::spawn_blocking(move || {
329            let conn = conn
330                .lock()
331                .map_err(|e| SynapticError::Store(format!("lock error: {e}")))?;
332
333            let raw_namespaces: Vec<String> = if prefix_str.is_empty() {
334                let mut stmt = conn
335                    .prepare("SELECT DISTINCT namespace FROM synaptic_store")
336                    .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
337                let v: Vec<String> = stmt
338                    .query_map([], |row| row.get::<_, String>(0))
339                    .map_err(|e| SynapticError::Store(format!("SQLite query error: {e}")))?
340                    .filter_map(|r| r.ok())
341                    .collect();
342                v
343            } else {
344                let like_pattern = format!("{prefix_str}%");
345                let mut stmt = conn
346                    .prepare(
347                        "SELECT DISTINCT namespace FROM synaptic_store WHERE namespace LIKE ?1",
348                    )
349                    .map_err(|e| SynapticError::Store(format!("SQLite prepare error: {e}")))?;
350                let v: Vec<String> = stmt
351                    .query_map(rusqlite::params![like_pattern], |row| {
352                        row.get::<_, String>(0)
353                    })
354                    .map_err(|e| SynapticError::Store(format!("SQLite query error: {e}")))?
355                    .filter_map(|r| r.ok())
356                    .collect();
357                v
358            };
359
360            let namespaces: Vec<Vec<String>> = raw_namespaces
361                .into_iter()
362                .map(|ns| ns.split(':').map(String::from).collect())
363                .collect();
364
365            Ok(namespaces)
366        })
367        .await
368        .map_err(|e| SynapticError::Store(format!("spawn_blocking error: {e}")))?
369    }
370}