dbx_core/engine/
async_api.rs1use crate::engine::database::Database;
2use crate::error::DbxResult;
3use std::sync::Arc;
4use tokio::task::spawn_blocking;
5
6#[derive(Clone)]
8pub struct DatabaseAsync {
9 inner: Arc<Database>,
10}
11
12impl DatabaseAsync {
13 pub fn new(db: Arc<Database>) -> Self {
15 Self { inner: db }
16 }
17
18 pub fn inner(&self) -> Arc<Database> {
20 self.inner.clone()
21 }
22
23 pub async fn insert(&self, table: String, key: Vec<u8>, value: Vec<u8>) -> DbxResult<()> {
25 let db = self.inner.clone();
26 spawn_blocking(move || db.insert(&table, &key, &value))
27 .await
28 .unwrap_or_else(|e| {
29 Err(crate::error::DbxError::InvalidOperation {
30 message: "Async insert thread panicked".to_string(),
31 context: e.to_string(),
32 })
33 })
34 }
35
36 pub async fn get(&self, table: String, key: Vec<u8>) -> DbxResult<Option<Vec<u8>>> {
38 let db = self.inner.clone();
39 spawn_blocking(move || db.get(&table, &key))
40 .await
41 .unwrap_or_else(|e| {
42 Err(crate::error::DbxError::InvalidOperation {
43 message: "Async get thread panicked".to_string(),
44 context: e.to_string(),
45 })
46 })
47 }
48
49 pub async fn delete(&self, table: String, key: Vec<u8>) -> DbxResult<()> {
51 let db = self.inner.clone();
52 spawn_blocking(move || db.delete(&table, &key).map(|_| ()))
53 .await
54 .unwrap_or_else(|e| {
55 Err(crate::error::DbxError::InvalidOperation {
56 message: "Async delete thread panicked".to_string(),
57 context: e.to_string(),
58 })
59 })
60 }
61
62 pub async fn insert_if_not_exists(
64 &self,
65 table: String,
66 key: Vec<u8>,
67 value: Vec<u8>,
68 ) -> DbxResult<bool> {
69 let db = self.inner.clone();
70 spawn_blocking(move || db.insert_if_not_exists(&table, &key, &value))
71 .await
72 .unwrap_or_else(|e| {
73 Err(crate::error::DbxError::InvalidOperation {
74 message: "Async CAS thread panicked".to_string(),
75 context: e.to_string(),
76 })
77 })
78 }
79
80 pub async fn compare_and_swap(
82 &self,
83 table: String,
84 key: Vec<u8>,
85 expected: Vec<u8>,
86 new_value: Vec<u8>,
87 ) -> DbxResult<bool> {
88 let db = self.inner.clone();
89 spawn_blocking(move || db.compare_and_swap(&table, &key, &expected, &new_value))
90 .await
91 .unwrap_or_else(|e| {
92 Err(crate::error::DbxError::InvalidOperation {
93 message: "Async CAS thread panicked".to_string(),
94 context: e.to_string(),
95 })
96 })
97 }
98}
99
100use crate::grid::dlm::DistributedLockManager;
102use std::time::Duration;
103
104#[derive(Clone)]
108pub struct GridDatabaseAsync {
109 inner: Arc<Database>,
110 dlm: Arc<DistributedLockManager>,
111}
112
113impl GridDatabaseAsync {
114 pub fn new(db: Arc<Database>, dlm: Arc<DistributedLockManager>) -> Self {
115 Self { inner: db, dlm }
116 }
117
118 pub async fn insert_with_lock(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
120 let fencing_token = self
121 .dlm
122 .acquire(table, key, 5000, Duration::from_secs(3))
123 .await
124 .map_err(|e| crate::error::DbxError::InvalidOperation {
125 message: "DLM acquire failed (insert)".to_string(),
126 context: format!("{:?}", e),
127 })?;
128
129 let db = self.inner.clone();
130 let t = table.to_string();
131 let k = key.to_vec();
132 let v = value.to_vec();
133
134 let result = spawn_blocking(move || db.insert(&t, &k, &v))
135 .await
136 .unwrap_or_else(|e| {
137 Err(crate::error::DbxError::InvalidOperation {
138 message: "Grid async insert thread panicked".to_string(),
139 context: e.to_string(),
140 })
141 });
142
143 self.dlm.release(table, key, fencing_token).await;
145
146 result
147 }
148
149 pub async fn compare_and_swap_with_lock(
151 &self,
152 table: &str,
153 key: &[u8],
154 expected: &[u8],
155 new_value: &[u8],
156 ) -> DbxResult<bool> {
157 let fencing_token = self
158 .dlm
159 .acquire(table, key, 5000, Duration::from_secs(3))
160 .await
161 .map_err(|e| crate::error::DbxError::InvalidOperation {
162 message: "DLM acquire failed (CAS)".to_string(),
163 context: format!("{:?}", e),
164 })?;
165
166 let db = self.inner.clone();
167 let t = table.to_string();
168 let k = key.to_vec();
169 let ex = expected.to_vec();
170 let nv = new_value.to_vec();
171
172 let result = spawn_blocking(move || db.compare_and_swap(&t, &k, &ex, &nv))
173 .await
174 .unwrap_or_else(|e| {
175 Err(crate::error::DbxError::InvalidOperation {
176 message: "Grid async CAS thread panicked".to_string(),
177 context: e.to_string(),
178 })
179 });
180
181 self.dlm.release(table, key, fencing_token).await;
182
183 result
184 }
185}