1use crate::api::engine_config::VibeStoreBackend;
2use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
3use crate::log::log_def::DESC;
4use crate::log_e;
5use crate::store::db::sql_def::{start_worker_loop, DbError, DbKvOp, DbWorker};
6use crate::store::db::tables::key_val::{
7 VibeKvValue, VibeTableKeyVal, DEFAULT_BUCKET, EXPIRES_AT_NEVER,
8};
9use crate::utils::date_util::exec_time_end;
10use crate::utils::global_ref::DB_CHANNEL_BUFFER_SIZE;
11use futures::channel::mpsc;
12use futures::SinkExt;
13use std::future::Future;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use tokio::sync::oneshot::Receiver;
19use tokio::sync::{oneshot, Mutex, RwLock};
20
21#[cfg(target_arch = "wasm32")]
22type TaskType = Pin<Box<dyn Future<Output = ()> + 'static>>;
23#[cfg(not(target_arch = "wasm32"))]
24type TaskType = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
25
26#[derive(Clone)]
27pub struct VibeDbClient {
29 db_worker: Arc<RwLock<DbWorker>>,
30 task_tx: Arc<Mutex<mpsc::Sender<TaskType>>>,
31 user_id: Arc<RwLock<Option<String>>>,
32 is_closed: Arc<AtomicBool>,
33}
34
35impl VibeDbClient {
36 pub fn new() -> Self {
42 Self::with_backend(VibeStoreBackend::default())
43 }
44
45 pub fn with_backend(backend: VibeStoreBackend) -> Self {
60 let (task_tx, task_rx) = mpsc::channel(DB_CHANNEL_BUFFER_SIZE);
61
62 let _worker_handle = start_worker_loop(task_rx);
63 Self {
64 db_worker: Arc::new(RwLock::new(DbWorker::with_backend(backend))),
65 task_tx: Arc::new(tokio::sync::Mutex::new(task_tx)),
66 user_id: Arc::new(RwLock::new(None)),
67 is_closed: Arc::new(AtomicBool::new(false)),
68 }
69 }
70
71 async fn execute<T>(
72 &self,
73 task: TaskType,
74 resp_rx: Receiver<Result<T, DbError>>,
75 ) -> Result<T, VibeEngineError> {
76 let mut sender = self.task_tx.lock().await;
77 if let Err(_) = sender.send(task).await {
78 return Err(VibeEngineError::from_code(
79 VibeEngineErrorCode::DatabaseThreadError,
80 ));
81 }
82
83 match resp_rx.await {
84 Ok(ret) => ret.map_err(VibeEngineError::from),
85 Err(_) => Err(VibeEngineError::from_code(
86 VibeEngineErrorCode::DatabaseThreadError,
87 )),
88 }
89 }
90
91 #[allow(dead_code)]
92 async fn execute_exec_time<T>(
93 &self,
94 task: TaskType,
95 resp_rx: Receiver<Result<T, DbError>>,
96 method_name: &str,
97 time: i64,
98 ) -> Result<T, VibeEngineError> {
99 let mut sender = self.task_tx.lock().await;
100 if let Err(_) = sender.send(task).await {
101 return Err(VibeEngineError::from_code(
102 VibeEngineErrorCode::DatabaseThreadError,
103 ));
104 }
105
106 match resp_rx.await {
107 Ok(ret) => {
108 exec_time_end(method_name, time);
109 ret.map_err(VibeEngineError::from)
110 }
111 Err(_) => {
112 exec_time_end(method_name, time);
113 Err(VibeEngineError::from_code(
114 VibeEngineErrorCode::DatabaseThreadError,
115 ))
116 }
117 }
118 }
119
120 #[allow(dead_code)]
121 async fn execute_trans<T, R, FN, Fut>(
122 &self,
123 task: TaskType,
124 resp_rx: Receiver<Result<T, DbError>>,
125 result_converter: FN,
126 ) -> Result<R, VibeEngineError>
127 where
128 FN: FnOnce(Result<T, DbError>) -> Fut,
129 Fut: Future<Output = Result<R, VibeEngineError>>,
130 {
131 let mut sender = self.task_tx.lock().await;
132 if let Err(_) = sender.send(task).await {
133 return Err(VibeEngineError::from_code(
134 VibeEngineErrorCode::DatabaseThreadError,
135 ));
136 }
137
138 match resp_rx.await {
139 Ok(db_result) => result_converter(db_result).await,
140 Err(_) => Err(VibeEngineError::from_code(
141 VibeEngineErrorCode::DatabaseThreadError,
142 )),
143 }
144 }
145}
146
147impl VibeDbClient {
148 pub async fn close(&self) -> Result<(), VibeEngineError> {
154 if self.is_closed.swap(true, Ordering::SeqCst) {
155 return Ok(());
156 }
157 let mut sender = self.task_tx.lock().await;
158 let ret = sender.close().await;
159 if let Err(error) = ret {
160 log_e!(
161 "close",
162 DESC,
163 format!("close sender error: {}", error.to_string())
164 );
165 }
166 let db_lock = self.db_worker.write();
167 db_lock.await.close().await?;
168 Ok(())
169 }
170
171 pub async fn try_open(
177 &self,
178 store_path: PathBuf,
179 user_id: String,
180 is_encrypt: bool,
181 ) -> Result<(), VibeEngineError> {
182 let (resp_tx, resp_rx) = oneshot::channel();
183 let db_worker_clone = self.db_worker.clone();
184 let opened_user_id = user_id.clone();
185
186 let task = Box::pin(async move {
187 let mut db_worker = db_worker_clone.write().await;
188 let result = db_worker.try_open(store_path, user_id, is_encrypt).await;
189 let _ = resp_tx.send(result);
190 });
191
192 self.execute(task, resp_rx).await?;
193 *self.user_id.write().await = Some(opened_user_id);
194 Ok(())
195 }
196}
197
198impl VibeDbClient {
202 pub async fn set(&self, key: String, value: VibeKvValue) -> Result<(), VibeEngineError> {
208 self.set_in_bucket(DEFAULT_BUCKET.to_string(), key, value, EXPIRES_AT_NEVER)
209 .await
210 }
211
212 pub async fn set_str(&self, key: String, value: String) -> Result<(), VibeEngineError> {
218 self.set(key, VibeKvValue::String(value)).await
219 }
220
221 pub async fn set_bool(&self, key: String, value: bool) -> Result<(), VibeEngineError> {
227 self.set(key, VibeKvValue::Bool(value)).await
228 }
229
230 pub async fn set_i32(&self, key: String, value: i32) -> Result<(), VibeEngineError> {
236 self.set(key, VibeKvValue::I32(value)).await
237 }
238
239 pub async fn get(&self, key: String) -> Result<Option<VibeKvValue>, VibeEngineError> {
245 self.get_in_bucket(DEFAULT_BUCKET.to_string(), key).await
246 }
247
248 pub async fn get_str(&self, key: String) -> Result<Option<String>, VibeEngineError> {
254 match self.get(key).await? {
255 Some(VibeKvValue::String(value)) => Ok(Some(value)),
256 _ => Ok(None),
257 }
258 }
259
260 pub async fn get_bool(&self, key: String) -> Result<Option<bool>, VibeEngineError> {
266 match self.get(key).await? {
267 Some(VibeKvValue::Bool(value)) => Ok(Some(value)),
268 _ => Ok(None),
269 }
270 }
271
272 pub async fn get_i32(&self, key: String) -> Result<Option<i32>, VibeEngineError> {
278 match self.get(key).await? {
279 Some(VibeKvValue::I32(value)) => Ok(Some(value)),
280 _ => Ok(None),
281 }
282 }
283
284 pub async fn remove(&self, key: String) -> Result<bool, VibeEngineError> {
290 self.remove_in_bucket(DEFAULT_BUCKET.to_string(), key).await
291 }
292
293 pub async fn contains(&self, key: String) -> Result<bool, VibeEngineError> {
299 self.contains_in_bucket(DEFAULT_BUCKET.to_string(), key)
300 .await
301 }
302
303 pub async fn list_keys(&self) -> Result<Vec<String>, VibeEngineError> {
309 self.list_keys_in_bucket(DEFAULT_BUCKET.to_string()).await
310 }
311
312 pub async fn current_user_id(&self) -> Result<String, VibeEngineError> {
318 self.user_id
319 .read()
320 .await
321 .clone()
322 .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::DatabaseNotOpened))
323 }
324}
325
326impl VibeDbClient {
330 pub async fn set_in_bucket(
336 &self,
337 bucket: String,
338 key: String,
339 value: VibeKvValue,
340 expires_at_ms: i64,
341 ) -> Result<(), VibeEngineError> {
342 validate_key(&key)?;
343 let user_id = self.current_user_id().await?;
344 let row = VibeTableKeyVal::new_in_bucket(&user_id, &bucket, &key, value, expires_at_ms);
345 self.insert_or_replace_key_val(row).await
346 }
347
348 pub async fn get_in_bucket(
354 &self,
355 bucket: String,
356 key: String,
357 ) -> Result<Option<VibeKvValue>, VibeEngineError> {
358 validate_key(&key)?;
359 let user_id = self.current_user_id().await?;
360 let row = self.get_key_val(user_id, bucket, key).await?;
361 let now = crate::platform::now();
362 Ok(row.and_then(|r| if r.is_expired(now) { None } else { r.value() }))
363 }
364
365 pub async fn remove_in_bucket(
371 &self,
372 bucket: String,
373 key: String,
374 ) -> Result<bool, VibeEngineError> {
375 validate_key(&key)?;
376 let user_id = self.current_user_id().await?;
377 self.remove_key_val(user_id, bucket, key).await
378 }
379
380 pub async fn contains_in_bucket(
386 &self,
387 bucket: String,
388 key: String,
389 ) -> Result<bool, VibeEngineError> {
390 validate_key(&key)?;
391 let user_id = self.current_user_id().await?;
392 let row = self.get_key_val(user_id, bucket, key).await?;
394 let now = crate::platform::now();
395 Ok(row.map(|r| !r.is_expired(now)).unwrap_or(false))
396 }
397
398 pub async fn list_keys_in_bucket(
404 &self,
405 bucket: String,
406 ) -> Result<Vec<String>, VibeEngineError> {
407 let user_id = self.current_user_id().await?;
408 self.list_key_vals(user_id, bucket).await
409 }
410
411 pub async fn get_many_in_bucket(
417 &self,
418 bucket: String,
419 keys: Vec<String>,
420 ) -> Result<Vec<(String, VibeKvValue)>, VibeEngineError> {
421 if keys.is_empty() {
422 return Ok(Vec::new());
423 }
424 for k in &keys {
425 validate_key(k)?;
426 }
427 let user_id = self.current_user_id().await?;
428 let rows = self.get_key_val_vec(user_id, bucket, keys).await?;
429 let now = crate::platform::now();
430 Ok(rows
431 .into_iter()
432 .filter(|r| !r.is_expired(now))
433 .filter_map(|r| {
434 let key = r.key.clone();
435 r.value().map(|v| (key, v))
436 })
437 .collect())
438 }
439
440 pub async fn set_many_in_bucket(
446 &self,
447 bucket: String,
448 items: Vec<(String, VibeKvValue, i64 )>,
449 ) -> Result<(), VibeEngineError> {
450 for (k, _, _) in &items {
451 validate_key(k)?;
452 }
453 let user_id = self.current_user_id().await?;
454 let ops: Vec<DbKvOp> = items
455 .into_iter()
456 .map(|(k, v, expires)| {
457 DbKvOp::Set(VibeTableKeyVal::new_in_bucket(
458 &user_id, &bucket, &k, v, expires,
459 ))
460 })
461 .collect();
462 self.transaction_ops(ops).await
463 }
464
465 pub async fn remove_many_in_bucket(
471 &self,
472 bucket: String,
473 keys: Vec<String>,
474 ) -> Result<(), VibeEngineError> {
475 for k in &keys {
476 validate_key(k)?;
477 }
478 let user_id = self.current_user_id().await?;
479 let ops: Vec<DbKvOp> = keys
480 .into_iter()
481 .map(|k| DbKvOp::Remove {
482 user_id: user_id.clone(),
483 bucket: bucket.clone(),
484 key: k,
485 })
486 .collect();
487 self.transaction_ops(ops).await
488 }
489
490 pub async fn transaction_ops(&self, ops: Vec<DbKvOp>) -> Result<(), VibeEngineError> {
496 if ops.is_empty() {
497 return Ok(());
498 }
499 let (resp_tx, resp_rx) = oneshot::channel();
500 let db_worker_clone = self.db_worker.clone();
501 let task = Box::pin(async move {
502 let db_worker = db_worker_clone.read().await;
503 let result = db_worker.transaction(ops).await;
504 let _ = resp_tx.send(result);
505 });
506 self.execute(task, resp_rx).await
507 }
508
509 pub async fn purge_expired(&self) -> Result<usize, VibeEngineError> {
515 let now = crate::platform::now();
516 let (resp_tx, resp_rx) = oneshot::channel();
517 let db_worker_clone = self.db_worker.clone();
518 let task = Box::pin(async move {
519 let db_worker = db_worker_clone.read().await;
520 let result = db_worker.purge_expired(now).await;
521 let _ = resp_tx.send(result);
522 });
523 self.execute(task, resp_rx).await
524 }
525}
526
527impl VibeDbClient {
531 pub async fn insert_or_replace_key_val(
537 &self,
538 table: VibeTableKeyVal,
539 ) -> Result<(), VibeEngineError> {
540 let (resp_tx, resp_rx) = oneshot::channel();
541 let db_worker_clone = self.db_worker.clone();
542
543 let task = Box::pin(async move {
544 let db_worker = db_worker_clone.read().await;
545 let result = db_worker.insert_or_replace_key_val(table).await;
546 let _ = resp_tx.send(result);
547 });
548
549 self.execute(task, resp_rx).await
550 }
551
552 pub async fn get_key_val(
558 &self,
559 user_id: String,
560 bucket: String,
561 key: String,
562 ) -> Result<Option<VibeTableKeyVal>, VibeEngineError> {
563 let (resp_tx, resp_rx) = oneshot::channel();
564 let db_worker_clone = self.db_worker.clone();
565
566 let task = Box::pin(async move {
567 let db_worker = db_worker_clone.read().await;
568 let result = db_worker.get_key_val(user_id, bucket, key).await;
569 let _ = resp_tx.send(result);
570 });
571
572 self.execute(task, resp_rx).await
573 }
574
575 pub async fn get_key_val_vec(
581 &self,
582 user_id: String,
583 bucket: String,
584 keys: Vec<String>,
585 ) -> Result<Vec<VibeTableKeyVal>, VibeEngineError> {
586 let (resp_tx, resp_rx) = oneshot::channel();
587 let db_worker_clone = self.db_worker.clone();
588
589 let task = Box::pin(async move {
590 let db_worker = db_worker_clone.read().await;
591 let result = db_worker.get_key_val_vec(user_id, bucket, keys).await;
592 let _ = resp_tx.send(result);
593 });
594
595 self.execute(task, resp_rx).await
596 }
597
598 pub async fn remove_key_val(
604 &self,
605 user_id: String,
606 bucket: String,
607 key: String,
608 ) -> Result<bool, VibeEngineError> {
609 let (resp_tx, resp_rx) = oneshot::channel();
610 let db_worker_clone = self.db_worker.clone();
611
612 let task = Box::pin(async move {
613 let db_worker = db_worker_clone.read().await;
614 let result = db_worker.remove_key_val(user_id, bucket, key).await;
615 let _ = resp_tx.send(result);
616 });
617
618 self.execute(task, resp_rx).await
619 }
620
621 pub async fn contains_key_val(
627 &self,
628 user_id: String,
629 bucket: String,
630 key: String,
631 ) -> Result<bool, VibeEngineError> {
632 let (resp_tx, resp_rx) = oneshot::channel();
633 let db_worker_clone = self.db_worker.clone();
634
635 let task = Box::pin(async move {
636 let db_worker = db_worker_clone.read().await;
637 let result = db_worker.contains_key_val(user_id, bucket, key).await;
638 let _ = resp_tx.send(result);
639 });
640
641 self.execute(task, resp_rx).await
642 }
643
644 pub async fn list_key_vals(
650 &self,
651 user_id: String,
652 bucket: String,
653 ) -> Result<Vec<String>, VibeEngineError> {
654 let (resp_tx, resp_rx) = oneshot::channel();
655 let db_worker_clone = self.db_worker.clone();
656
657 let task = Box::pin(async move {
658 let db_worker = db_worker_clone.read().await;
659 let result = db_worker.list_key_vals(user_id, bucket).await;
660 let _ = resp_tx.send(result);
661 });
662
663 self.execute(task, resp_rx).await
664 }
665}
666
667fn validate_key(key: &str) -> Result<(), VibeEngineError> {
668 if key.trim().is_empty() {
669 return Err(VibeEngineError::from_error_code(
670 VibeEngineErrorCode::ParameterEmpty,
671 ));
672 }
673 Ok(())
674}
675
676#[cfg(test)]
677mod strict_tests {
678 use super::*;
679 include!(concat!(
680 env!("CARGO_MANIFEST_DIR"),
681 "/test/unit/store/db_client_tests.rs"
682 ));
683}