Skip to main content

actr_hyper/storage/
db.rs

1//! Actor isolated storage implementation
2//!
3//! Each Actor has an independent SQLite database file, with the path determined
4//! by Hyper's namespace resolver.
5//! All read/write operations are confined to this namespace; an Actor cannot access another Actor's data.
6
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10use tracing::{debug, error, info};
11
12use actr_platform_traits::{KvOp, KvStore, PlatformError};
13use async_trait::async_trait;
14
15use crate::error::{HyperError, HyperResult};
16
17/// Actor isolated storage handle
18///
19/// Each Actor has an independent SQLite database file, with the path determined
20/// by Hyper's namespace resolver.
21/// All read/write operations are confined to this namespace; an Actor cannot access another Actor's data.
22///
23/// The rusqlite connection is not Send; wrapped in `Arc<Mutex<Connection>>` for cross-thread sharing.
24/// All blocking I/O is offloaded to the blocking thread pool via `tokio::task::spawn_blocking`.
25#[derive(Clone)]
26pub struct ActorStore {
27    /// Shared SQLite connection (rusqlite is not Send, protected by Mutex)
28    conn: Arc<Mutex<rusqlite::Connection>>,
29    /// Database file path (for logging/debugging only)
30    namespace: PathBuf,
31}
32
33impl ActorStore {
34    /// Open or create an Actor's SQLite database
35    ///
36    /// Automatically creates the table on first call. Parent directory is created automatically if missing.
37    pub async fn open(db_path: &Path) -> HyperResult<Self> {
38        let db_path = db_path.to_path_buf();
39
40        // ensure parent directory exists
41        if let Some(parent) = db_path.parent() {
42            tokio::fs::create_dir_all(parent).await.map_err(|e| {
43                HyperError::Storage(format!(
44                    "failed to create storage directory `{}`: {e}",
45                    parent.display()
46                ))
47            })?;
48        }
49
50        let namespace = db_path.clone();
51
52        // rusqlite is a sync API, execute in blocking thread pool via spawn_blocking
53        let conn = tokio::task::spawn_blocking(move || -> HyperResult<rusqlite::Connection> {
54            let conn = rusqlite::Connection::open(&db_path).map_err(|e| {
55                error!(
56                    path = %db_path.display(),
57                    error = %e,
58                    "failed to open SQLite database"
59                );
60                HyperError::Storage(format!(
61                    "failed to open database `{}`: {e}",
62                    db_path.display()
63                ))
64            })?;
65
66            // enable WAL mode for improved concurrent read performance
67            conn.execute_batch("PRAGMA journal_mode=WAL;")
68                .map_err(|e| HyperError::Storage(format!("failed to set WAL mode: {e}")))?;
69
70            // create table (idempotent)
71            conn.execute_batch(
72                "CREATE TABLE IF NOT EXISTS kv_store (
73                    key        TEXT PRIMARY KEY NOT NULL,
74                    value      BLOB NOT NULL,
75                    updated_at INTEGER NOT NULL DEFAULT (unixepoch())
76                );",
77            )
78            .map_err(|e| {
79                HyperError::Storage(format!("failed to initialize kv_store table: {e}"))
80            })?;
81
82            Ok(conn)
83        })
84        .await
85        .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))??;
86
87        info!(
88            path = %namespace.display(),
89            "ActorStore ready"
90        );
91
92        Ok(Self {
93            conn: Arc::new(Mutex::new(conn)),
94            namespace,
95        })
96    }
97
98    /// Generic KV storage: write or update a key-value pair
99    pub async fn kv_set(&self, key: &str, value: &[u8]) -> HyperResult<()> {
100        let conn = Arc::clone(&self.conn);
101        let key = key.to_string();
102        let value = value.to_vec();
103        let ns = self.namespace.clone();
104
105        tokio::task::spawn_blocking(move || -> HyperResult<()> {
106            let conn = conn.lock().map_err(|e| {
107                HyperError::Storage(format!("failed to acquire database lock: {e}"))
108            })?;
109
110            conn.execute(
111                "INSERT INTO kv_store (key, value, updated_at)
112                 VALUES (?1, ?2, unixepoch())
113                 ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at",
114                rusqlite::params![key, value],
115            )
116            .map_err(|e| {
117                error!(
118                    namespace = %ns.display(),
119                    key = %key,
120                    error = %e,
121                    "kv_set write failed"
122                );
123                HyperError::Storage(format!("kv_set write `{key}` failed: {e}"))
124            })?;
125
126            debug!(namespace = %ns.display(), key = %key, "kv_set write succeeded");
127            Ok(())
128        })
129        .await
130        .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))??;
131
132        Ok(())
133    }
134
135    /// Generic KV storage: read a key's value, returns None if the key does not exist
136    pub async fn kv_get(&self, key: &str) -> HyperResult<Option<Vec<u8>>> {
137        let conn = Arc::clone(&self.conn);
138        let key = key.to_string();
139        let ns = self.namespace.clone();
140
141        tokio::task::spawn_blocking(move || -> HyperResult<Option<Vec<u8>>> {
142            let conn = conn.lock().map_err(|e| {
143                HyperError::Storage(format!("failed to acquire database lock: {e}"))
144            })?;
145
146            let result = conn.query_row(
147                "SELECT value FROM kv_store WHERE key = ?1",
148                rusqlite::params![key],
149                |row| row.get::<_, Vec<u8>>(0),
150            );
151
152            match result {
153                Ok(value) => {
154                    debug!(namespace = %ns.display(), key = %key, "kv_get hit");
155                    Ok(Some(value))
156                }
157                Err(rusqlite::Error::QueryReturnedNoRows) => {
158                    debug!(namespace = %ns.display(), key = %key, "kv_get miss");
159                    Ok(None)
160                }
161                Err(e) => {
162                    error!(
163                        namespace = %ns.display(),
164                        key = %key,
165                        error = %e,
166                        "kv_get read failed"
167                    );
168                    Err(HyperError::Storage(format!(
169                        "kv_get read `{key}` failed: {e}"
170                    )))
171                }
172            }
173        })
174        .await
175        .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
176    }
177
178    /// Generic KV storage: delete a key, returns whether a record was actually deleted
179    pub async fn kv_delete(&self, key: &str) -> HyperResult<bool> {
180        let conn = Arc::clone(&self.conn);
181        let key = key.to_string();
182        let ns = self.namespace.clone();
183
184        tokio::task::spawn_blocking(move || -> HyperResult<bool> {
185            let conn = conn.lock().map_err(|e| {
186                HyperError::Storage(format!("failed to acquire database lock: {e}"))
187            })?;
188
189            let affected = conn
190                .execute(
191                    "DELETE FROM kv_store WHERE key = ?1",
192                    rusqlite::params![key],
193                )
194                .map_err(|e| {
195                    error!(
196                        namespace = %ns.display(),
197                        key = %key,
198                        error = %e,
199                        "kv_delete failed"
200                    );
201                    HyperError::Storage(format!("kv_delete `{key}` failed: {e}"))
202                })?;
203
204            let deleted = affected > 0;
205            debug!(namespace = %ns.display(), key = %key, deleted, "kv_delete executed");
206            Ok(deleted)
207        })
208        .await
209        .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
210    }
211
212    /// List all keys, optionally filtered by prefix
213    pub async fn kv_list_keys(&self, prefix: Option<&str>) -> HyperResult<Vec<String>> {
214        let conn = Arc::clone(&self.conn);
215        let prefix = prefix.map(|s| s.to_string());
216        let ns = self.namespace.clone();
217
218        tokio::task::spawn_blocking(move || -> HyperResult<Vec<String>> {
219            let conn = conn.lock().map_err(|e| {
220                HyperError::Storage(format!("failed to acquire database lock: {e}"))
221            })?;
222
223            let keys = if let Some(ref pfx) = prefix {
224                // use LIKE pattern matching for prefix, escape wildcards
225                let pattern = format!("{}%", pfx.replace('%', "\\%").replace('_', "\\_"));
226                let mut stmt = conn
227                    .prepare("SELECT key FROM kv_store WHERE key LIKE ?1 ESCAPE '\\' ORDER BY key")
228                    .map_err(|e| HyperError::Storage(format!("failed to prepare SQL: {e}")))?;
229                let rows = stmt
230                    .query_map(rusqlite::params![pattern], |row| row.get::<_, String>(0))
231                    .map_err(|e| HyperError::Storage(format!("failed to query key list: {e}")))?;
232                rows.collect::<Result<Vec<_>, _>>()
233                    .map_err(|e| HyperError::Storage(format!("failed to read key row: {e}")))?
234            } else {
235                let mut stmt = conn
236                    .prepare("SELECT key FROM kv_store ORDER BY key")
237                    .map_err(|e| HyperError::Storage(format!("failed to prepare SQL: {e}")))?;
238                let rows = stmt
239                    .query_map([], |row| row.get::<_, String>(0))
240                    .map_err(|e| HyperError::Storage(format!("failed to query key list: {e}")))?;
241                rows.collect::<Result<Vec<_>, _>>()
242                    .map_err(|e| HyperError::Storage(format!("failed to read key row: {e}")))?
243            };
244
245            debug!(
246                namespace = %ns.display(),
247                count = keys.len(),
248                prefix = ?prefix,
249                "kv_list_keys query completed"
250            );
251            Ok(keys)
252        })
253        .await
254        .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
255    }
256
257    /// Batch operations (atomic transaction)
258    ///
259    /// All operations execute in a single transaction; if any step fails, all are rolled back.
260    pub async fn kv_batch(&self, ops: Vec<KvOp>) -> HyperResult<()> {
261        let conn = Arc::clone(&self.conn);
262        let ns = self.namespace.clone();
263
264        tokio::task::spawn_blocking(move || -> HyperResult<()> {
265            let mut conn = conn.lock().map_err(|e| {
266                HyperError::Storage(format!("failed to acquire database lock: {e}"))
267            })?;
268
269            let tx = conn.transaction().map_err(|e| {
270                HyperError::Storage(format!("failed to begin transaction: {e}"))
271            })?;
272
273            for op in &ops {
274                match op {
275                    KvOp::Set { key, value } => {
276                        tx.execute(
277                            "INSERT INTO kv_store (key, value, updated_at)
278                             VALUES (?1, ?2, unixepoch())
279                             ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at",
280                            rusqlite::params![key, value],
281                        )
282                        .map_err(|e| {
283                            error!(
284                                namespace = %ns.display(),
285                                key = %key,
286                                error = %e,
287                                "kv_batch set operation failed"
288                            );
289                            HyperError::Storage(format!("kv_batch set `{key}` failed: {e}"))
290                        })?;
291                        debug!(namespace = %ns.display(), key = %key, "kv_batch set");
292                    }
293                    KvOp::Delete { key } => {
294                        tx.execute(
295                            "DELETE FROM kv_store WHERE key = ?1",
296                            rusqlite::params![key],
297                        )
298                        .map_err(|e| {
299                            error!(
300                                namespace = %ns.display(),
301                                key = %key,
302                                error = %e,
303                                "kv_batch delete operation failed"
304                            );
305                            HyperError::Storage(format!("kv_batch delete `{key}` failed: {e}"))
306                        })?;
307                        debug!(namespace = %ns.display(), key = %key, "kv_batch delete");
308                    }
309                }
310            }
311
312            tx.commit().map_err(|e| {
313                error!(namespace = %ns.display(), error = %e, "kv_batch transaction commit failed");
314                HyperError::Storage(format!("kv_batch transaction commit failed: {e}"))
315            })?;
316
317            debug!(
318                namespace = %ns.display(),
319                ops_count = ops.len(),
320                "kv_batch transaction committed"
321            );
322            Ok(())
323        })
324        .await
325        .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
326    }
327}
328
329#[async_trait]
330impl KvStore for ActorStore {
331    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, PlatformError> {
332        self.kv_get(key)
333            .await
334            .map_err(|e| PlatformError::Storage(e.to_string()))
335    }
336
337    async fn set(&self, key: &str, value: &[u8]) -> Result<(), PlatformError> {
338        self.kv_set(key, value)
339            .await
340            .map_err(|e| PlatformError::Storage(e.to_string()))
341    }
342
343    async fn delete(&self, key: &str) -> Result<bool, PlatformError> {
344        self.kv_delete(key)
345            .await
346            .map_err(|e| PlatformError::Storage(e.to_string()))
347    }
348
349    async fn list_keys(&self, prefix: Option<&str>) -> Result<Vec<String>, PlatformError> {
350        self.kv_list_keys(prefix)
351            .await
352            .map_err(|e| PlatformError::Storage(e.to_string()))
353    }
354
355    async fn batch(&self, ops: Vec<KvOp>) -> Result<(), PlatformError> {
356        self.kv_batch(ops)
357            .await
358            .map_err(|e| PlatformError::Storage(e.to_string()))
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use tempfile::TempDir;
366
367    async fn open_test_store(dir: &TempDir) -> ActorStore {
368        let db_path = dir.path().join("test.db");
369        ActorStore::open(&db_path).await.unwrap()
370    }
371
372    #[tokio::test]
373    async fn kv_set_and_get() {
374        let dir = TempDir::new().unwrap();
375        let store = open_test_store(&dir).await;
376
377        store.kv_set("hello", b"world").await.unwrap();
378        let val = store.kv_get("hello").await.unwrap();
379        assert_eq!(val, Some(b"world".to_vec()));
380    }
381
382    #[tokio::test]
383    async fn kv_get_missing_returns_none() {
384        let dir = TempDir::new().unwrap();
385        let store = open_test_store(&dir).await;
386
387        let val = store.kv_get("nonexistent").await.unwrap();
388        assert_eq!(val, None);
389    }
390
391    #[tokio::test]
392    async fn kv_delete_removes_key() {
393        let dir = TempDir::new().unwrap();
394        let store = open_test_store(&dir).await;
395
396        store.kv_set("key1", b"value1").await.unwrap();
397        let deleted = store.kv_delete("key1").await.unwrap();
398        assert!(
399            deleted,
400            "should return true indicating a record was actually deleted"
401        );
402
403        let val = store.kv_get("key1").await.unwrap();
404        assert_eq!(val, None, "get should return None after deletion");
405    }
406
407    #[tokio::test]
408    async fn kv_delete_nonexistent_returns_false() {
409        let dir = TempDir::new().unwrap();
410        let store = open_test_store(&dir).await;
411
412        let deleted = store.kv_delete("ghost").await.unwrap();
413        assert!(!deleted, "deleting a non-existent key should return false");
414    }
415
416    #[tokio::test]
417    async fn kv_list_keys_returns_all() {
418        let dir = TempDir::new().unwrap();
419        let store = open_test_store(&dir).await;
420
421        store.kv_set("b", b"2").await.unwrap();
422        store.kv_set("a", b"1").await.unwrap();
423        store.kv_set("c", b"3").await.unwrap();
424
425        let keys = store.kv_list_keys(None).await.unwrap();
426        assert_eq!(
427            keys,
428            vec!["a", "b", "c"],
429            "should return all keys in lexicographic order"
430        );
431    }
432
433    #[tokio::test]
434    async fn kv_list_keys_prefix_filter() {
435        let dir = TempDir::new().unwrap();
436        let store = open_test_store(&dir).await;
437
438        store.kv_set("prefix:a", b"1").await.unwrap();
439        store.kv_set("prefix:b", b"2").await.unwrap();
440        store.kv_set("other:c", b"3").await.unwrap();
441
442        let keys = store.kv_list_keys(Some("prefix:")).await.unwrap();
443        assert_eq!(keys, vec!["prefix:a", "prefix:b"]);
444    }
445
446    #[tokio::test]
447    async fn kv_batch_atomic() {
448        let dir = TempDir::new().unwrap();
449        let store = open_test_store(&dir).await;
450
451        store.kv_set("existing", b"old").await.unwrap();
452
453        store
454            .kv_batch(vec![
455                KvOp::Set {
456                    key: "new_key".to_string(),
457                    value: b"new_value".to_vec(),
458                },
459                KvOp::Set {
460                    key: "existing".to_string(),
461                    value: b"updated".to_vec(),
462                },
463                KvOp::Delete {
464                    key: "existing".to_string(),
465                },
466            ])
467            .await
468            .unwrap();
469
470        // new_key should exist
471        let val = store.kv_get("new_key").await.unwrap();
472        assert_eq!(val, Some(b"new_value".to_vec()));
473
474        // existing was updated then deleted in the batch, should not exist
475        let val = store.kv_get("existing").await.unwrap();
476        assert_eq!(val, None);
477    }
478
479    #[tokio::test]
480    async fn data_persists_across_reopen() {
481        let dir = TempDir::new().unwrap();
482        let db_path = dir.path().join("persist.db");
483
484        {
485            let store = ActorStore::open(&db_path).await.unwrap();
486            store
487                .kv_set("persistent_key", b"persistent_value")
488                .await
489                .unwrap();
490        }
491
492        // reopen the same file
493        let store2 = ActorStore::open(&db_path).await.unwrap();
494        let val = store2.kv_get("persistent_key").await.unwrap();
495        assert_eq!(
496            val,
497            Some(b"persistent_value".to_vec()),
498            "data should persist after reopening the database"
499        );
500    }
501}