1use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10use tracing::{debug, error, info};
11
12use actr_platform_traits::{KvOp, KvStore, PlatformError};
13use async_trait::async_trait;
14
15use crate::error::{HyperError, HyperResult};
16
17#[derive(Clone)]
26pub struct ActorStore {
27 conn: Arc<Mutex<rusqlite::Connection>>,
29 namespace: PathBuf,
31}
32
33impl ActorStore {
34 pub async fn open(db_path: &Path) -> HyperResult<Self> {
38 let db_path = db_path.to_path_buf();
39
40 if let Some(parent) = db_path.parent() {
42 tokio::fs::create_dir_all(parent).await.map_err(|e| {
43 HyperError::Storage(format!(
44 "failed to create storage directory `{}`: {e}",
45 parent.display()
46 ))
47 })?;
48 }
49
50 let namespace = db_path.clone();
51
52 let conn = tokio::task::spawn_blocking(move || -> HyperResult<rusqlite::Connection> {
54 let conn = rusqlite::Connection::open(&db_path).map_err(|e| {
55 error!(
56 path = %db_path.display(),
57 error = %e,
58 "failed to open SQLite database"
59 );
60 HyperError::Storage(format!(
61 "failed to open database `{}`: {e}",
62 db_path.display()
63 ))
64 })?;
65
66 conn.execute_batch("PRAGMA journal_mode=WAL;")
68 .map_err(|e| HyperError::Storage(format!("failed to set WAL mode: {e}")))?;
69
70 conn.execute_batch(
72 "CREATE TABLE IF NOT EXISTS kv_store (
73 key TEXT PRIMARY KEY NOT NULL,
74 value BLOB NOT NULL,
75 updated_at INTEGER NOT NULL DEFAULT (unixepoch())
76 );",
77 )
78 .map_err(|e| {
79 HyperError::Storage(format!("failed to initialize kv_store table: {e}"))
80 })?;
81
82 Ok(conn)
83 })
84 .await
85 .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))??;
86
87 info!(
88 path = %namespace.display(),
89 "ActorStore ready"
90 );
91
92 Ok(Self {
93 conn: Arc::new(Mutex::new(conn)),
94 namespace,
95 })
96 }
97
98 pub async fn kv_set(&self, key: &str, value: &[u8]) -> HyperResult<()> {
100 let conn = Arc::clone(&self.conn);
101 let key = key.to_string();
102 let value = value.to_vec();
103 let ns = self.namespace.clone();
104
105 tokio::task::spawn_blocking(move || -> HyperResult<()> {
106 let conn = conn.lock().map_err(|e| {
107 HyperError::Storage(format!("failed to acquire database lock: {e}"))
108 })?;
109
110 conn.execute(
111 "INSERT INTO kv_store (key, value, updated_at)
112 VALUES (?1, ?2, unixepoch())
113 ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at",
114 rusqlite::params![key, value],
115 )
116 .map_err(|e| {
117 error!(
118 namespace = %ns.display(),
119 key = %key,
120 error = %e,
121 "kv_set write failed"
122 );
123 HyperError::Storage(format!("kv_set write `{key}` failed: {e}"))
124 })?;
125
126 debug!(namespace = %ns.display(), key = %key, "kv_set write succeeded");
127 Ok(())
128 })
129 .await
130 .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))??;
131
132 Ok(())
133 }
134
135 pub async fn kv_get(&self, key: &str) -> HyperResult<Option<Vec<u8>>> {
137 let conn = Arc::clone(&self.conn);
138 let key = key.to_string();
139 let ns = self.namespace.clone();
140
141 tokio::task::spawn_blocking(move || -> HyperResult<Option<Vec<u8>>> {
142 let conn = conn.lock().map_err(|e| {
143 HyperError::Storage(format!("failed to acquire database lock: {e}"))
144 })?;
145
146 let result = conn.query_row(
147 "SELECT value FROM kv_store WHERE key = ?1",
148 rusqlite::params![key],
149 |row| row.get::<_, Vec<u8>>(0),
150 );
151
152 match result {
153 Ok(value) => {
154 debug!(namespace = %ns.display(), key = %key, "kv_get hit");
155 Ok(Some(value))
156 }
157 Err(rusqlite::Error::QueryReturnedNoRows) => {
158 debug!(namespace = %ns.display(), key = %key, "kv_get miss");
159 Ok(None)
160 }
161 Err(e) => {
162 error!(
163 namespace = %ns.display(),
164 key = %key,
165 error = %e,
166 "kv_get read failed"
167 );
168 Err(HyperError::Storage(format!(
169 "kv_get read `{key}` failed: {e}"
170 )))
171 }
172 }
173 })
174 .await
175 .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
176 }
177
178 pub async fn kv_delete(&self, key: &str) -> HyperResult<bool> {
180 let conn = Arc::clone(&self.conn);
181 let key = key.to_string();
182 let ns = self.namespace.clone();
183
184 tokio::task::spawn_blocking(move || -> HyperResult<bool> {
185 let conn = conn.lock().map_err(|e| {
186 HyperError::Storage(format!("failed to acquire database lock: {e}"))
187 })?;
188
189 let affected = conn
190 .execute(
191 "DELETE FROM kv_store WHERE key = ?1",
192 rusqlite::params![key],
193 )
194 .map_err(|e| {
195 error!(
196 namespace = %ns.display(),
197 key = %key,
198 error = %e,
199 "kv_delete failed"
200 );
201 HyperError::Storage(format!("kv_delete `{key}` failed: {e}"))
202 })?;
203
204 let deleted = affected > 0;
205 debug!(namespace = %ns.display(), key = %key, deleted, "kv_delete executed");
206 Ok(deleted)
207 })
208 .await
209 .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
210 }
211
212 pub async fn kv_list_keys(&self, prefix: Option<&str>) -> HyperResult<Vec<String>> {
214 let conn = Arc::clone(&self.conn);
215 let prefix = prefix.map(|s| s.to_string());
216 let ns = self.namespace.clone();
217
218 tokio::task::spawn_blocking(move || -> HyperResult<Vec<String>> {
219 let conn = conn.lock().map_err(|e| {
220 HyperError::Storage(format!("failed to acquire database lock: {e}"))
221 })?;
222
223 let keys = if let Some(ref pfx) = prefix {
224 let pattern = format!("{}%", pfx.replace('%', "\\%").replace('_', "\\_"));
226 let mut stmt = conn
227 .prepare("SELECT key FROM kv_store WHERE key LIKE ?1 ESCAPE '\\' ORDER BY key")
228 .map_err(|e| HyperError::Storage(format!("failed to prepare SQL: {e}")))?;
229 let rows = stmt
230 .query_map(rusqlite::params![pattern], |row| row.get::<_, String>(0))
231 .map_err(|e| HyperError::Storage(format!("failed to query key list: {e}")))?;
232 rows.collect::<Result<Vec<_>, _>>()
233 .map_err(|e| HyperError::Storage(format!("failed to read key row: {e}")))?
234 } else {
235 let mut stmt = conn
236 .prepare("SELECT key FROM kv_store ORDER BY key")
237 .map_err(|e| HyperError::Storage(format!("failed to prepare SQL: {e}")))?;
238 let rows = stmt
239 .query_map([], |row| row.get::<_, String>(0))
240 .map_err(|e| HyperError::Storage(format!("failed to query key list: {e}")))?;
241 rows.collect::<Result<Vec<_>, _>>()
242 .map_err(|e| HyperError::Storage(format!("failed to read key row: {e}")))?
243 };
244
245 debug!(
246 namespace = %ns.display(),
247 count = keys.len(),
248 prefix = ?prefix,
249 "kv_list_keys query completed"
250 );
251 Ok(keys)
252 })
253 .await
254 .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
255 }
256
257 pub async fn kv_batch(&self, ops: Vec<KvOp>) -> HyperResult<()> {
261 let conn = Arc::clone(&self.conn);
262 let ns = self.namespace.clone();
263
264 tokio::task::spawn_blocking(move || -> HyperResult<()> {
265 let mut conn = conn.lock().map_err(|e| {
266 HyperError::Storage(format!("failed to acquire database lock: {e}"))
267 })?;
268
269 let tx = conn.transaction().map_err(|e| {
270 HyperError::Storage(format!("failed to begin transaction: {e}"))
271 })?;
272
273 for op in &ops {
274 match op {
275 KvOp::Set { key, value } => {
276 tx.execute(
277 "INSERT INTO kv_store (key, value, updated_at)
278 VALUES (?1, ?2, unixepoch())
279 ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at",
280 rusqlite::params![key, value],
281 )
282 .map_err(|e| {
283 error!(
284 namespace = %ns.display(),
285 key = %key,
286 error = %e,
287 "kv_batch set operation failed"
288 );
289 HyperError::Storage(format!("kv_batch set `{key}` failed: {e}"))
290 })?;
291 debug!(namespace = %ns.display(), key = %key, "kv_batch set");
292 }
293 KvOp::Delete { key } => {
294 tx.execute(
295 "DELETE FROM kv_store WHERE key = ?1",
296 rusqlite::params![key],
297 )
298 .map_err(|e| {
299 error!(
300 namespace = %ns.display(),
301 key = %key,
302 error = %e,
303 "kv_batch delete operation failed"
304 );
305 HyperError::Storage(format!("kv_batch delete `{key}` failed: {e}"))
306 })?;
307 debug!(namespace = %ns.display(), key = %key, "kv_batch delete");
308 }
309 }
310 }
311
312 tx.commit().map_err(|e| {
313 error!(namespace = %ns.display(), error = %e, "kv_batch transaction commit failed");
314 HyperError::Storage(format!("kv_batch transaction commit failed: {e}"))
315 })?;
316
317 debug!(
318 namespace = %ns.display(),
319 ops_count = ops.len(),
320 "kv_batch transaction committed"
321 );
322 Ok(())
323 })
324 .await
325 .map_err(|e| HyperError::Storage(format!("spawn_blocking task failed: {e}")))?
326 }
327}
328
329#[async_trait]
330impl KvStore for ActorStore {
331 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, PlatformError> {
332 self.kv_get(key)
333 .await
334 .map_err(|e| PlatformError::Storage(e.to_string()))
335 }
336
337 async fn set(&self, key: &str, value: &[u8]) -> Result<(), PlatformError> {
338 self.kv_set(key, value)
339 .await
340 .map_err(|e| PlatformError::Storage(e.to_string()))
341 }
342
343 async fn delete(&self, key: &str) -> Result<bool, PlatformError> {
344 self.kv_delete(key)
345 .await
346 .map_err(|e| PlatformError::Storage(e.to_string()))
347 }
348
349 async fn list_keys(&self, prefix: Option<&str>) -> Result<Vec<String>, PlatformError> {
350 self.kv_list_keys(prefix)
351 .await
352 .map_err(|e| PlatformError::Storage(e.to_string()))
353 }
354
355 async fn batch(&self, ops: Vec<KvOp>) -> Result<(), PlatformError> {
356 self.kv_batch(ops)
357 .await
358 .map_err(|e| PlatformError::Storage(e.to_string()))
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use tempfile::TempDir;
366
367 async fn open_test_store(dir: &TempDir) -> ActorStore {
368 let db_path = dir.path().join("test.db");
369 ActorStore::open(&db_path).await.unwrap()
370 }
371
372 #[tokio::test]
373 async fn kv_set_and_get() {
374 let dir = TempDir::new().unwrap();
375 let store = open_test_store(&dir).await;
376
377 store.kv_set("hello", b"world").await.unwrap();
378 let val = store.kv_get("hello").await.unwrap();
379 assert_eq!(val, Some(b"world".to_vec()));
380 }
381
382 #[tokio::test]
383 async fn kv_get_missing_returns_none() {
384 let dir = TempDir::new().unwrap();
385 let store = open_test_store(&dir).await;
386
387 let val = store.kv_get("nonexistent").await.unwrap();
388 assert_eq!(val, None);
389 }
390
391 #[tokio::test]
392 async fn kv_delete_removes_key() {
393 let dir = TempDir::new().unwrap();
394 let store = open_test_store(&dir).await;
395
396 store.kv_set("key1", b"value1").await.unwrap();
397 let deleted = store.kv_delete("key1").await.unwrap();
398 assert!(
399 deleted,
400 "should return true indicating a record was actually deleted"
401 );
402
403 let val = store.kv_get("key1").await.unwrap();
404 assert_eq!(val, None, "get should return None after deletion");
405 }
406
407 #[tokio::test]
408 async fn kv_delete_nonexistent_returns_false() {
409 let dir = TempDir::new().unwrap();
410 let store = open_test_store(&dir).await;
411
412 let deleted = store.kv_delete("ghost").await.unwrap();
413 assert!(!deleted, "deleting a non-existent key should return false");
414 }
415
416 #[tokio::test]
417 async fn kv_list_keys_returns_all() {
418 let dir = TempDir::new().unwrap();
419 let store = open_test_store(&dir).await;
420
421 store.kv_set("b", b"2").await.unwrap();
422 store.kv_set("a", b"1").await.unwrap();
423 store.kv_set("c", b"3").await.unwrap();
424
425 let keys = store.kv_list_keys(None).await.unwrap();
426 assert_eq!(
427 keys,
428 vec!["a", "b", "c"],
429 "should return all keys in lexicographic order"
430 );
431 }
432
433 #[tokio::test]
434 async fn kv_list_keys_prefix_filter() {
435 let dir = TempDir::new().unwrap();
436 let store = open_test_store(&dir).await;
437
438 store.kv_set("prefix:a", b"1").await.unwrap();
439 store.kv_set("prefix:b", b"2").await.unwrap();
440 store.kv_set("other:c", b"3").await.unwrap();
441
442 let keys = store.kv_list_keys(Some("prefix:")).await.unwrap();
443 assert_eq!(keys, vec!["prefix:a", "prefix:b"]);
444 }
445
446 #[tokio::test]
447 async fn kv_batch_atomic() {
448 let dir = TempDir::new().unwrap();
449 let store = open_test_store(&dir).await;
450
451 store.kv_set("existing", b"old").await.unwrap();
452
453 store
454 .kv_batch(vec![
455 KvOp::Set {
456 key: "new_key".to_string(),
457 value: b"new_value".to_vec(),
458 },
459 KvOp::Set {
460 key: "existing".to_string(),
461 value: b"updated".to_vec(),
462 },
463 KvOp::Delete {
464 key: "existing".to_string(),
465 },
466 ])
467 .await
468 .unwrap();
469
470 let val = store.kv_get("new_key").await.unwrap();
472 assert_eq!(val, Some(b"new_value".to_vec()));
473
474 let val = store.kv_get("existing").await.unwrap();
476 assert_eq!(val, None);
477 }
478
479 #[tokio::test]
480 async fn data_persists_across_reopen() {
481 let dir = TempDir::new().unwrap();
482 let db_path = dir.path().join("persist.db");
483
484 {
485 let store = ActorStore::open(&db_path).await.unwrap();
486 store
487 .kv_set("persistent_key", b"persistent_value")
488 .await
489 .unwrap();
490 }
491
492 let store2 = ActorStore::open(&db_path).await.unwrap();
494 let val = store2.kv_get("persistent_key").await.unwrap();
495 assert_eq!(
496 val,
497 Some(b"persistent_value".to_vec()),
498 "data should persist after reopening the database"
499 );
500 }
501}