Skip to main content

vibe_ready/store/db/
db_client.rs

1use crate::api::engine_config::VibeStoreBackend;
2use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
3use crate::log::log_def::DESC;
4use crate::log_e;
5use crate::store::db::sql_def::{start_worker_loop, DbError, DbKvOp, DbWorker};
6use crate::store::db::tables::key_val::{
7    VibeKvValue, VibeTableKeyVal, DEFAULT_BUCKET, EXPIRES_AT_NEVER,
8};
9use crate::utils::date_util::exec_time_end;
10use crate::utils::global_ref::DB_CHANNEL_BUFFER_SIZE;
11use futures::channel::mpsc;
12use futures::SinkExt;
13use std::future::Future;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use tokio::sync::oneshot::Receiver;
19use tokio::sync::{oneshot, Mutex, RwLock};
20
21#[cfg(target_arch = "wasm32")]
22type TaskType = Pin<Box<dyn Future<Output = ()> + 'static>>;
23#[cfg(not(target_arch = "wasm32"))]
24type TaskType = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
25
26#[derive(Clone)]
27/// Client used by the SDK to coordinate database worker operations.
28pub struct VibeDbClient {
29    db_worker: Arc<RwLock<DbWorker>>,
30    task_tx: Arc<Mutex<mpsc::Sender<TaskType>>>,
31    user_id: Arc<RwLock<Option<String>>>,
32    is_closed: Arc<AtomicBool>,
33}
34
35impl VibeDbClient {
36    /// Creates a database client using the default store backend.
37    ///
38    /// # Returns
39    ///
40    /// A [`VibeDbClient`] with its worker loop initialized.
41    pub fn new() -> Self {
42        Self::with_backend(VibeStoreBackend::default())
43    }
44
45    /// Creates a database client using a specific store backend.
46    ///
47    /// # Returns
48    ///
49    /// A [`VibeDbClient`] with its worker loop initialized for `backend`.
50    ///
51    /// # Examples
52    ///
53    /// ```
54    /// use vibe_ready::{VibeDbClient, VibeStoreBackend};
55    ///
56    /// let client = VibeDbClient::with_backend(VibeStoreBackend::Noop);
57    /// drop(client);
58    /// ```
59    pub fn with_backend(backend: VibeStoreBackend) -> Self {
60        let (task_tx, task_rx) = mpsc::channel(DB_CHANNEL_BUFFER_SIZE);
61
62        let _worker_handle = start_worker_loop(task_rx);
63        Self {
64            db_worker: Arc::new(RwLock::new(DbWorker::with_backend(backend))),
65            task_tx: Arc::new(tokio::sync::Mutex::new(task_tx)),
66            user_id: Arc::new(RwLock::new(None)),
67            is_closed: Arc::new(AtomicBool::new(false)),
68        }
69    }
70
71    async fn execute<T>(
72        &self,
73        task: TaskType,
74        resp_rx: Receiver<Result<T, DbError>>,
75    ) -> Result<T, VibeEngineError> {
76        let mut sender = self.task_tx.lock().await;
77        if let Err(_) = sender.send(task).await {
78            return Err(VibeEngineError::from_code(
79                VibeEngineErrorCode::DatabaseThreadError,
80            ));
81        }
82
83        match resp_rx.await {
84            Ok(ret) => ret.map_err(VibeEngineError::from),
85            Err(_) => Err(VibeEngineError::from_code(
86                VibeEngineErrorCode::DatabaseThreadError,
87            )),
88        }
89    }
90
91    #[allow(dead_code)]
92    async fn execute_exec_time<T>(
93        &self,
94        task: TaskType,
95        resp_rx: Receiver<Result<T, DbError>>,
96        method_name: &str,
97        time: i64,
98    ) -> Result<T, VibeEngineError> {
99        let mut sender = self.task_tx.lock().await;
100        if let Err(_) = sender.send(task).await {
101            return Err(VibeEngineError::from_code(
102                VibeEngineErrorCode::DatabaseThreadError,
103            ));
104        }
105
106        match resp_rx.await {
107            Ok(ret) => {
108                exec_time_end(method_name, time);
109                ret.map_err(VibeEngineError::from)
110            }
111            Err(_) => {
112                exec_time_end(method_name, time);
113                Err(VibeEngineError::from_code(
114                    VibeEngineErrorCode::DatabaseThreadError,
115                ))
116            }
117        }
118    }
119
120    #[allow(dead_code)]
121    async fn execute_trans<T, R, FN, Fut>(
122        &self,
123        task: TaskType,
124        resp_rx: Receiver<Result<T, DbError>>,
125        result_converter: FN,
126    ) -> Result<R, VibeEngineError>
127    where
128        FN: FnOnce(Result<T, DbError>) -> Fut,
129        Fut: Future<Output = Result<R, VibeEngineError>>,
130    {
131        let mut sender = self.task_tx.lock().await;
132        if let Err(_) = sender.send(task).await {
133            return Err(VibeEngineError::from_code(
134                VibeEngineErrorCode::DatabaseThreadError,
135            ));
136        }
137
138        match resp_rx.await {
139            Ok(db_result) => result_converter(db_result).await,
140            Err(_) => Err(VibeEngineError::from_code(
141                VibeEngineErrorCode::DatabaseThreadError,
142            )),
143        }
144    }
145}
146
147impl VibeDbClient {
148    /// Closes the worker channel and database backend.
149    ///
150    /// # Returns
151    ///
152    /// `Ok(())` when closed or already closed, otherwise [`VibeEngineError`].
153    pub async fn close(&self) -> Result<(), VibeEngineError> {
154        if self.is_closed.swap(true, Ordering::SeqCst) {
155            return Ok(());
156        }
157        let mut sender = self.task_tx.lock().await;
158        let ret = sender.close().await;
159        if let Err(error) = ret {
160            log_e!(
161                "close",
162                DESC,
163                format!("close sender error: {}", error.to_string())
164            );
165        }
166        let db_lock = self.db_worker.write();
167        db_lock.await.close().await?;
168        Ok(())
169    }
170
171    /// Opens the configured backend at `store_path` for `user_id`.
172    ///
173    /// # Returns
174    ///
175    /// `Ok(())` when the backend is ready, or [`VibeEngineError`] on open failure.
176    pub async fn try_open(
177        &self,
178        store_path: PathBuf,
179        user_id: String,
180        is_encrypt: bool,
181    ) -> Result<(), VibeEngineError> {
182        let (resp_tx, resp_rx) = oneshot::channel();
183        let db_worker_clone = self.db_worker.clone();
184        let opened_user_id = user_id.clone();
185
186        let task = Box::pin(async move {
187            let mut db_worker = db_worker_clone.write().await;
188            let result = db_worker.try_open(store_path, user_id, is_encrypt).await;
189            let _ = resp_tx.send(result);
190        });
191
192        self.execute(task, resp_rx).await?;
193        *self.user_id.write().await = Some(opened_user_id);
194        Ok(())
195    }
196}
197
198// ---------------------------------------------------------------------------
199// Backward compatible API: routes to the default bucket.
200// ---------------------------------------------------------------------------
201impl VibeDbClient {
202    /// Sets a value in the default bucket.
203    ///
204    /// # Returns
205    ///
206    /// `Ok(())` when the value is written.
207    pub async fn set(&self, key: String, value: VibeKvValue) -> Result<(), VibeEngineError> {
208        self.set_in_bucket(DEFAULT_BUCKET.to_string(), key, value, EXPIRES_AT_NEVER)
209            .await
210    }
211
212    /// Sets a string value in the default bucket.
213    ///
214    /// # Returns
215    ///
216    /// `Ok(())` when the value is written.
217    pub async fn set_str(&self, key: String, value: String) -> Result<(), VibeEngineError> {
218        self.set(key, VibeKvValue::String(value)).await
219    }
220
221    /// Sets a boolean value in the default bucket.
222    ///
223    /// # Returns
224    ///
225    /// `Ok(())` when the value is written.
226    pub async fn set_bool(&self, key: String, value: bool) -> Result<(), VibeEngineError> {
227        self.set(key, VibeKvValue::Bool(value)).await
228    }
229
230    /// Sets an `i32` value in the default bucket.
231    ///
232    /// # Returns
233    ///
234    /// `Ok(())` when the value is written.
235    pub async fn set_i32(&self, key: String, value: i32) -> Result<(), VibeEngineError> {
236        self.set(key, VibeKvValue::I32(value)).await
237    }
238
239    /// Gets a value from the default bucket.
240    ///
241    /// # Returns
242    ///
243    /// `Ok(Some(value))`, `Ok(None)`, or [`VibeEngineError`].
244    pub async fn get(&self, key: String) -> Result<Option<VibeKvValue>, VibeEngineError> {
245        self.get_in_bucket(DEFAULT_BUCKET.to_string(), key).await
246    }
247
248    /// Gets a string value from the default bucket.
249    ///
250    /// # Returns
251    ///
252    /// `Ok(Some(String))` only when the value is a string.
253    pub async fn get_str(&self, key: String) -> Result<Option<String>, VibeEngineError> {
254        match self.get(key).await? {
255            Some(VibeKvValue::String(value)) => Ok(Some(value)),
256            _ => Ok(None),
257        }
258    }
259
260    /// Gets a boolean value from the default bucket.
261    ///
262    /// # Returns
263    ///
264    /// `Ok(Some(bool))` only when the value is a boolean.
265    pub async fn get_bool(&self, key: String) -> Result<Option<bool>, VibeEngineError> {
266        match self.get(key).await? {
267            Some(VibeKvValue::Bool(value)) => Ok(Some(value)),
268            _ => Ok(None),
269        }
270    }
271
272    /// Gets an `i32` value from the default bucket.
273    ///
274    /// # Returns
275    ///
276    /// `Ok(Some(i32))` only when the value is an `i32`.
277    pub async fn get_i32(&self, key: String) -> Result<Option<i32>, VibeEngineError> {
278        match self.get(key).await? {
279            Some(VibeKvValue::I32(value)) => Ok(Some(value)),
280            _ => Ok(None),
281        }
282    }
283
284    /// Removes a key from the default bucket.
285    ///
286    /// # Returns
287    ///
288    /// `Ok(true)` when the key existed and was removed.
289    pub async fn remove(&self, key: String) -> Result<bool, VibeEngineError> {
290        self.remove_in_bucket(DEFAULT_BUCKET.to_string(), key).await
291    }
292
293    /// Checks whether a key exists in the default bucket.
294    ///
295    /// # Returns
296    ///
297    /// `Ok(true)` when the key exists and is not expired.
298    pub async fn contains(&self, key: String) -> Result<bool, VibeEngineError> {
299        self.contains_in_bucket(DEFAULT_BUCKET.to_string(), key)
300            .await
301    }
302
303    /// Lists keys in the default bucket.
304    ///
305    /// # Returns
306    ///
307    /// A vector of key names.
308    pub async fn list_keys(&self) -> Result<Vec<String>, VibeEngineError> {
309        self.list_keys_in_bucket(DEFAULT_BUCKET.to_string()).await
310    }
311
312    /// Returns the user id associated with the open backend.
313    ///
314    /// # Returns
315    ///
316    /// `Ok(String)` after [`VibeDbClient::try_open`], otherwise a database-not-open error.
317    pub async fn current_user_id(&self) -> Result<String, VibeEngineError> {
318        self.user_id
319            .read()
320            .await
321            .clone()
322            .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::DatabaseNotOpened))
323    }
324}
325
326// ---------------------------------------------------------------------------
327// Bucket-aware API.
328// ---------------------------------------------------------------------------
329impl VibeDbClient {
330    /// Sets a value in a named bucket.
331    ///
332    /// # Returns
333    ///
334    /// `Ok(())` when the value is written.
335    pub async fn set_in_bucket(
336        &self,
337        bucket: String,
338        key: String,
339        value: VibeKvValue,
340        expires_at_ms: i64,
341    ) -> Result<(), VibeEngineError> {
342        validate_key(&key)?;
343        let user_id = self.current_user_id().await?;
344        let row = VibeTableKeyVal::new_in_bucket(&user_id, &bucket, &key, value, expires_at_ms);
345        self.insert_or_replace_key_val(row).await
346    }
347
348    /// Gets a value from a named bucket.
349    ///
350    /// # Returns
351    ///
352    /// `Ok(Some(value))`, `Ok(None)` for missing/expired keys, or an error.
353    pub async fn get_in_bucket(
354        &self,
355        bucket: String,
356        key: String,
357    ) -> Result<Option<VibeKvValue>, VibeEngineError> {
358        validate_key(&key)?;
359        let user_id = self.current_user_id().await?;
360        let row = self.get_key_val(user_id, bucket, key).await?;
361        let now = crate::platform::now();
362        Ok(row.and_then(|r| if r.is_expired(now) { None } else { r.value() }))
363    }
364
365    /// Removes a key from a named bucket.
366    ///
367    /// # Returns
368    ///
369    /// `Ok(true)` when the key existed and was removed.
370    pub async fn remove_in_bucket(
371        &self,
372        bucket: String,
373        key: String,
374    ) -> Result<bool, VibeEngineError> {
375        validate_key(&key)?;
376        let user_id = self.current_user_id().await?;
377        self.remove_key_val(user_id, bucket, key).await
378    }
379
380    /// Checks whether a key exists in a named bucket.
381    ///
382    /// # Returns
383    ///
384    /// `Ok(true)` when the key exists and is not expired.
385    pub async fn contains_in_bucket(
386        &self,
387        bucket: String,
388        key: String,
389    ) -> Result<bool, VibeEngineError> {
390        validate_key(&key)?;
391        let user_id = self.current_user_id().await?;
392        // Treat expired keys as missing to keep semantics in sync with `get`.
393        let row = self.get_key_val(user_id, bucket, key).await?;
394        let now = crate::platform::now();
395        Ok(row.map(|r| !r.is_expired(now)).unwrap_or(false))
396    }
397
398    /// Lists keys in a named bucket.
399    ///
400    /// # Returns
401    ///
402    /// A vector of key names.
403    pub async fn list_keys_in_bucket(
404        &self,
405        bucket: String,
406    ) -> Result<Vec<String>, VibeEngineError> {
407        let user_id = self.current_user_id().await?;
408        self.list_key_vals(user_id, bucket).await
409    }
410
411    /// Reads multiple keys from a named bucket.
412    ///
413    /// # Returns
414    ///
415    /// A vector of `(key, value)` pairs for existing, non-expired keys.
416    pub async fn get_many_in_bucket(
417        &self,
418        bucket: String,
419        keys: Vec<String>,
420    ) -> Result<Vec<(String, VibeKvValue)>, VibeEngineError> {
421        if keys.is_empty() {
422            return Ok(Vec::new());
423        }
424        for k in &keys {
425            validate_key(k)?;
426        }
427        let user_id = self.current_user_id().await?;
428        let rows = self.get_key_val_vec(user_id, bucket, keys).await?;
429        let now = crate::platform::now();
430        Ok(rows
431            .into_iter()
432            .filter(|r| !r.is_expired(now))
433            .filter_map(|r| {
434                let key = r.key.clone();
435                r.value().map(|v| (key, v))
436            })
437            .collect())
438    }
439
440    /// Writes multiple values to a named bucket in one transaction.
441    ///
442    /// # Returns
443    ///
444    /// `Ok(())` when the batch commit succeeds.
445    pub async fn set_many_in_bucket(
446        &self,
447        bucket: String,
448        items: Vec<(String, VibeKvValue, i64 /* expires_at_ms */)>,
449    ) -> Result<(), VibeEngineError> {
450        for (k, _, _) in &items {
451            validate_key(k)?;
452        }
453        let user_id = self.current_user_id().await?;
454        let ops: Vec<DbKvOp> = items
455            .into_iter()
456            .map(|(k, v, expires)| {
457                DbKvOp::Set(VibeTableKeyVal::new_in_bucket(
458                    &user_id, &bucket, &k, v, expires,
459                ))
460            })
461            .collect();
462        self.transaction_ops(ops).await
463    }
464
465    /// Removes multiple keys from a named bucket in one transaction.
466    ///
467    /// # Returns
468    ///
469    /// `Ok(())` when the batch commit succeeds.
470    pub async fn remove_many_in_bucket(
471        &self,
472        bucket: String,
473        keys: Vec<String>,
474    ) -> Result<(), VibeEngineError> {
475        for k in &keys {
476            validate_key(k)?;
477        }
478        let user_id = self.current_user_id().await?;
479        let ops: Vec<DbKvOp> = keys
480            .into_iter()
481            .map(|k| DbKvOp::Remove {
482                user_id: user_id.clone(),
483                bucket: bucket.clone(),
484                key: k,
485            })
486            .collect();
487        self.transaction_ops(ops).await
488    }
489
490    /// Apply pre-built ops (already user-scoped) inside one DB transaction.
491    ///
492    /// # Returns
493    ///
494    /// `Ok(())` when every operation commits atomically.
495    pub async fn transaction_ops(&self, ops: Vec<DbKvOp>) -> Result<(), VibeEngineError> {
496        if ops.is_empty() {
497            return Ok(());
498        }
499        let (resp_tx, resp_rx) = oneshot::channel();
500        let db_worker_clone = self.db_worker.clone();
501        let task = Box::pin(async move {
502            let db_worker = db_worker_clone.read().await;
503            let result = db_worker.transaction(ops).await;
504            let _ = resp_tx.send(result);
505        });
506        self.execute(task, resp_rx).await
507    }
508
509    /// Purges expired rows from the backend.
510    ///
511    /// # Returns
512    ///
513    /// Number of rows removed by the backend.
514    pub async fn purge_expired(&self) -> Result<usize, VibeEngineError> {
515        let now = crate::platform::now();
516        let (resp_tx, resp_rx) = oneshot::channel();
517        let db_worker_clone = self.db_worker.clone();
518        let task = Box::pin(async move {
519            let db_worker = db_worker_clone.read().await;
520            let result = db_worker.purge_expired(now).await;
521            let _ = resp_tx.send(result);
522        });
523        self.execute(task, resp_rx).await
524    }
525}
526
527// ---------------------------------------------------------------------------
528// Lower level row APIs (kept for advanced integrations).
529// ---------------------------------------------------------------------------
530impl VibeDbClient {
531    /// Inserts or replaces a low-level key-value row.
532    ///
533    /// # Returns
534    ///
535    /// `Ok(())` when the row is written.
536    pub async fn insert_or_replace_key_val(
537        &self,
538        table: VibeTableKeyVal,
539    ) -> Result<(), VibeEngineError> {
540        let (resp_tx, resp_rx) = oneshot::channel();
541        let db_worker_clone = self.db_worker.clone();
542
543        let task = Box::pin(async move {
544            let db_worker = db_worker_clone.read().await;
545            let result = db_worker.insert_or_replace_key_val(table).await;
546            let _ = resp_tx.send(result);
547        });
548
549        self.execute(task, resp_rx).await
550    }
551
552    /// Gets a low-level key-value row.
553    ///
554    /// # Returns
555    ///
556    /// `Ok(Some(row))`, `Ok(None)`, or [`VibeEngineError`].
557    pub async fn get_key_val(
558        &self,
559        user_id: String,
560        bucket: String,
561        key: String,
562    ) -> Result<Option<VibeTableKeyVal>, VibeEngineError> {
563        let (resp_tx, resp_rx) = oneshot::channel();
564        let db_worker_clone = self.db_worker.clone();
565
566        let task = Box::pin(async move {
567            let db_worker = db_worker_clone.read().await;
568            let result = db_worker.get_key_val(user_id, bucket, key).await;
569            let _ = resp_tx.send(result);
570        });
571
572        self.execute(task, resp_rx).await
573    }
574
575    /// Gets multiple low-level key-value rows.
576    ///
577    /// # Returns
578    ///
579    /// A vector of matching rows.
580    pub async fn get_key_val_vec(
581        &self,
582        user_id: String,
583        bucket: String,
584        keys: Vec<String>,
585    ) -> Result<Vec<VibeTableKeyVal>, VibeEngineError> {
586        let (resp_tx, resp_rx) = oneshot::channel();
587        let db_worker_clone = self.db_worker.clone();
588
589        let task = Box::pin(async move {
590            let db_worker = db_worker_clone.read().await;
591            let result = db_worker.get_key_val_vec(user_id, bucket, keys).await;
592            let _ = resp_tx.send(result);
593        });
594
595        self.execute(task, resp_rx).await
596    }
597
598    /// Removes a low-level key-value row.
599    ///
600    /// # Returns
601    ///
602    /// `Ok(true)` when a row was removed.
603    pub async fn remove_key_val(
604        &self,
605        user_id: String,
606        bucket: String,
607        key: String,
608    ) -> Result<bool, VibeEngineError> {
609        let (resp_tx, resp_rx) = oneshot::channel();
610        let db_worker_clone = self.db_worker.clone();
611
612        let task = Box::pin(async move {
613            let db_worker = db_worker_clone.read().await;
614            let result = db_worker.remove_key_val(user_id, bucket, key).await;
615            let _ = resp_tx.send(result);
616        });
617
618        self.execute(task, resp_rx).await
619    }
620
621    /// Checks whether a low-level key-value row exists.
622    ///
623    /// # Returns
624    ///
625    /// `Ok(true)` when the row exists.
626    pub async fn contains_key_val(
627        &self,
628        user_id: String,
629        bucket: String,
630        key: String,
631    ) -> Result<bool, VibeEngineError> {
632        let (resp_tx, resp_rx) = oneshot::channel();
633        let db_worker_clone = self.db_worker.clone();
634
635        let task = Box::pin(async move {
636            let db_worker = db_worker_clone.read().await;
637            let result = db_worker.contains_key_val(user_id, bucket, key).await;
638            let _ = resp_tx.send(result);
639        });
640
641        self.execute(task, resp_rx).await
642    }
643
644    /// Lists low-level key names for a user and bucket.
645    ///
646    /// # Returns
647    ///
648    /// A vector of key names.
649    pub async fn list_key_vals(
650        &self,
651        user_id: String,
652        bucket: String,
653    ) -> Result<Vec<String>, VibeEngineError> {
654        let (resp_tx, resp_rx) = oneshot::channel();
655        let db_worker_clone = self.db_worker.clone();
656
657        let task = Box::pin(async move {
658            let db_worker = db_worker_clone.read().await;
659            let result = db_worker.list_key_vals(user_id, bucket).await;
660            let _ = resp_tx.send(result);
661        });
662
663        self.execute(task, resp_rx).await
664    }
665}
666
667fn validate_key(key: &str) -> Result<(), VibeEngineError> {
668    if key.trim().is_empty() {
669        return Err(VibeEngineError::from_error_code(
670            VibeEngineErrorCode::ParameterEmpty,
671        ));
672    }
673    Ok(())
674}
675
676#[cfg(test)]
677mod strict_tests {
678    use super::*;
679    include!(concat!(
680        env!("CARGO_MANIFEST_DIR"),
681        "/test/unit/store/db_client_tests.rs"
682    ));
683}