1use crate::*;
2
3cfg_if! {
4 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
5 use keyvaluedb_web::*;
6 use keyvaluedb::*;
7 } else {
8 use keyvaluedb_sqlite::*;
9 use keyvaluedb::*;
10 }
11}
12
13impl_veilid_log_facility!("tstore");
14
15#[must_use]
16struct CryptInfo {
17 secret: SharedSecret,
18}
19impl CryptInfo {
20 pub fn new(secret: SharedSecret) -> Self {
21 Self { secret }
22 }
23}
24
25#[must_use]
26pub struct TableDBUnlockedInner {
27 registry: VeilidComponentRegistry,
28 table: String,
29 database: Database,
30 encrypt_info: Option<CryptInfo>,
32 decrypt_info: Option<CryptInfo>,
33}
34
35impl fmt::Debug for TableDBUnlockedInner {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 write!(f, "TableDBUnlockedInner(table={})", self.table)
38 }
39}
40
41#[derive(Debug, Clone)]
42#[must_use]
43pub struct TableDB {
44 opened_column_count: u32,
45 unlocked_inner: Arc<TableDBUnlockedInner>,
46}
47
48impl VeilidComponentRegistryAccessor for TableDB {
49 fn registry(&self) -> VeilidComponentRegistry {
50 self.unlocked_inner.registry.clone()
51 }
52}
53
54impl TableDB {
55 pub(super) fn new(
56 table: String,
57 registry: VeilidComponentRegistry,
58 database: Database,
59 encryption_key: Option<SharedSecret>,
60 decryption_key: Option<SharedSecret>,
61 opened_column_count: u32,
62 ) -> Self {
63 let encrypt_info = encryption_key.map(CryptInfo::new);
64 let decrypt_info = decryption_key.map(CryptInfo::new);
65
66 let total_columns = database.num_columns().unwrap_or_log();
67
68 Self {
69 opened_column_count: if opened_column_count == 0 {
70 total_columns
71 } else {
72 opened_column_count
73 },
74 unlocked_inner: Arc::new(TableDBUnlockedInner {
75 registry,
76 table,
77 database,
78 encrypt_info,
79 decrypt_info,
80 }),
81 }
82 }
83
84 pub(super) fn new_from_unlocked_inner(
85 unlocked_inner: Arc<TableDBUnlockedInner>,
86 opened_column_count: u32,
87 ) -> Self {
88 let db = &unlocked_inner.database;
89 let total_columns = db.num_columns().unwrap_or_log();
90 Self {
91 opened_column_count: if opened_column_count == 0 {
92 total_columns
93 } else {
94 opened_column_count
95 },
96 unlocked_inner,
97 }
98 }
99
100 pub(super) fn unlocked_inner(&self) -> Arc<TableDBUnlockedInner> {
101 self.unlocked_inner.clone()
102 }
103
104 #[must_use]
106 pub fn table_name(&self) -> String {
107 self.unlocked_inner.table.clone()
108 }
109
110 #[cfg_attr(
112 feature = "instrument",
113 instrument(level = "trace", target = "tstore", skip_all)
114 )]
115 #[must_use]
116 pub fn io_stats(&self, kind: IoStatsKind) -> IoStats {
117 self.unlocked_inner.database.io_stats(kind)
118 }
119
120 pub async fn cleanup(&self) -> VeilidAPIResult<()> {
122 self.unlocked_inner
123 .database
124 .cleanup()
125 .measure_debug(
126 TimestampDuration::new_secs(1),
127 veilid_log_dbg!(self, "TableDB::cleanup {}", self.table_name()),
128 )
129 .await
130 .map_err(VeilidAPIError::internal)
131 }
132
133 #[cfg_attr(
136 feature = "instrument",
137 instrument(level = "trace", target = "tstore", skip_all)
138 )]
139 pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
140 let db = &self.unlocked_inner.database;
141 db.num_columns().map_err(VeilidAPIError::from)
142 }
143
144 pub fn estimate_storage_size(
148 &self,
149 _col: u32,
150 key: &[u8],
151 value: &[u8],
152 ) -> VeilidAPIResult<u64> {
153 let size =
154 1 +
156 1 +
158 key.len() * 2 +
160 4 +
162 value.len() +
164 4 +
166 4;
169 size.try_into().map_err(VeilidAPIError::internal)
170 }
171
172 pub fn estimate_storage_size_json<T>(
174 &self,
175 col: u32,
176 key: &[u8],
177 value: &T,
178 ) -> VeilidAPIResult<u64>
179 where
180 T: serde::Serialize,
181 {
182 let value_json = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
183 self.estimate_storage_size(col, key, &value_json)
184 }
185
186 #[cfg_attr(
193 feature = "instrument",
194 instrument(level = "trace", target = "tstore", skip_all)
195 )]
196 async fn maybe_encrypt(&self, data: &[u8], keyed_nonce: bool) -> Vec<u8> {
197 let data = compress_prepend_size(data);
198 if let Some(ei) = &self.unlocked_inner.encrypt_info {
199 let crypto = self.crypto();
200 let vcrypto = crypto.get_async(ei.secret.kind()).unwrap_or_log();
201 let mut out = unsafe { unaligned_u8_vec_uninit(vcrypto.nonce_length() + data.len()) };
202
203 if keyed_nonce {
204 let mut noncedata = Vec::with_capacity(data.len() + ei.secret.ref_value().len());
206 noncedata.extend_from_slice(&data);
207 noncedata.extend_from_slice(ei.secret.ref_value());
208 let noncehash = vcrypto.generate_hash(&noncedata).await.value();
209 out[0..vcrypto.nonce_length()]
211 .copy_from_slice(&noncehash[0..vcrypto.nonce_length()]);
212 } else {
213 random_bytes(&mut out[0..vcrypto.nonce_length()]);
215 }
216
217 let (nonce, encout) = out.split_at_mut(vcrypto.nonce_length());
218 vcrypto
219 .crypt_b2b_no_auth(&data, encout, &Nonce::new(nonce), &ei.secret)
220 .await
221 .unwrap_or_log();
222 out
223 } else {
224 data
225 }
226 }
227
228 #[cfg_attr(
230 feature = "instrument",
231 instrument(level = "trace", target = "tstore", skip_all)
232 )]
233 async fn maybe_decrypt(&self, data: &[u8]) -> std::io::Result<Vec<u8>> {
234 if let Some(di) = &self.unlocked_inner.decrypt_info {
235 let crypto = self.crypto();
236 let vcrypto = crypto.get_async(di.secret.kind()).unwrap_or_log();
237 assert!(data.len() >= vcrypto.nonce_length());
238 if data.len() == vcrypto.nonce_length() {
239 return Ok(Vec::new());
240 }
241
242 let mut out = unsafe { unaligned_u8_vec_uninit(data.len() - vcrypto.nonce_length()) };
243
244 vcrypto
245 .crypt_b2b_no_auth(
246 &data[vcrypto.nonce_length()..],
247 &mut out,
248 &Nonce::new(&data[0..vcrypto.nonce_length()]),
249 &di.secret,
250 )
251 .await
252 .unwrap_or_log();
253 decompress_size_prepended(&out, None).map_err(|e| std::io::Error::other(e.to_string()))
254 } else {
255 decompress_size_prepended(data, None).map_err(|e| std::io::Error::other(e.to_string()))
256 }
257 }
258
259 #[cfg_attr(
261 feature = "instrument",
262 instrument(level = "trace", target = "tstore", skip_all)
263 )]
264 pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
265 if col >= self.opened_column_count {
266 apibail_generic!(
267 "Column exceeds opened column count {} >= {}",
268 col,
269 self.opened_column_count
270 );
271 }
272 let db = self.unlocked_inner.database.clone();
273 let out = Vec::new();
274 let (mut out, _) = db
275 .iter_keys(col, None, out, |out, ekey| {
276 out.push(ekey.clone());
278 Ok(Option::<()>::None)
279 })
280 .await
281 .map_err(VeilidAPIError::from)?;
282
283 for k in &mut out {
284 *k = self.maybe_decrypt(k).await.map_err(VeilidAPIError::from)?;
285 }
286
287 Ok(out)
288 }
289
290 #[cfg_attr(
292 feature = "instrument",
293 instrument(level = "trace", target = "tstore", skip_all)
294 )]
295 pub async fn get_key_count(&self, col: u32) -> VeilidAPIResult<u64> {
296 if col >= self.opened_column_count {
297 apibail_generic!(
298 "Column exceeds opened column count {} >= {}",
299 col,
300 self.opened_column_count
301 );
302 }
303 let db = self.unlocked_inner.database.clone();
304 let key_count = db.num_keys(col).await.map_err(VeilidAPIError::from)?;
305 Ok(key_count)
306 }
307
308 #[cfg_attr(
310 feature = "instrument",
311 instrument(level = "trace", target = "tstore", skip_all)
312 )]
313 #[must_use]
314 pub fn transact(&self) -> TableDBTransaction {
315 let dbt = self.unlocked_inner.database.transaction();
316 TableDBTransaction::new(self.clone(), dbt)
317 }
318
319 #[cfg_attr(
321 feature = "instrument",
322 instrument(level = "trace", target = "tstore", skip_all)
323 )]
324 pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
325 if col >= self.opened_column_count {
326 apibail_generic!(
327 "Column exceeds opened column count {} >= {}",
328 col,
329 self.opened_column_count
330 );
331 }
332 let db = self.unlocked_inner.database.clone();
333 let mut dbt = db.transaction();
334 dbt.put(
335 col,
336 self.maybe_encrypt(key, true).await,
337 self.maybe_encrypt(value, false).await,
338 );
339 db.write(dbt).await.map_err(VeilidAPIError::generic)
340 }
341
342 #[cfg_attr(
344 feature = "instrument",
345 instrument(level = "trace", target = "tstore", skip_all)
346 )]
347 pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
348 where
349 T: serde::Serialize,
350 {
351 let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
352 self.store(col, key, &value).await
353 }
354
355 #[cfg_attr(
357 feature = "instrument",
358 instrument(level = "trace", target = "tstore", skip_all)
359 )]
360 pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
361 if col >= self.opened_column_count {
362 apibail_generic!(
363 "Column exceeds opened column count {} >= {}",
364 col,
365 self.opened_column_count
366 );
367 }
368 let db = self.unlocked_inner.database.clone();
369 let key = self.maybe_encrypt(key, true).await;
370 match db.get(col, &key).await.map_err(VeilidAPIError::from)? {
371 Some(v) => Ok(Some(
372 self.maybe_decrypt(&v).await.map_err(VeilidAPIError::from)?,
373 )),
374 None => Ok(None),
375 }
376 }
377
378 #[cfg_attr(
380 feature = "instrument",
381 instrument(level = "trace", target = "tstore", skip_all)
382 )]
383 pub async fn load_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
384 where
385 T: for<'de> serde::Deserialize<'de>,
386 {
387 let out = match self.load(col, key).await? {
388 Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
389 None => None,
390 };
391 Ok(out)
392 }
393
394 #[cfg_attr(
396 feature = "instrument",
397 instrument(level = "trace", target = "tstore", skip_all)
398 )]
399 pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
400 if col >= self.opened_column_count {
401 apibail_generic!(
402 "Column exceeds opened column count {} >= {}",
403 col,
404 self.opened_column_count
405 );
406 }
407 let key = self.maybe_encrypt(key, true).await;
408
409 let db = self.unlocked_inner.database.clone();
410
411 match db.delete(col, &key).await.map_err(VeilidAPIError::from)? {
412 Some(v) => Ok(Some(
413 self.maybe_decrypt(&v).await.map_err(VeilidAPIError::from)?,
414 )),
415 None => Ok(None),
416 }
417 }
418
419 #[cfg_attr(
421 feature = "instrument",
422 instrument(level = "trace", target = "tstore", skip_all)
423 )]
424 pub async fn delete_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
425 where
426 T: for<'de> serde::Deserialize<'de>,
427 {
428 let old_value = match self.delete(col, key).await? {
429 Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
430 None => None,
431 };
432 Ok(old_value)
433 }
434}
435
436struct TableDBTransactionInner {
439 registry: VeilidComponentRegistry,
440 dbt: Option<DBTransaction>,
441}
442
443impl fmt::Debug for TableDBTransactionInner {
444 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
445 write!(
446 f,
447 "TableDBTransactionInner({})",
448 match &self.dbt {
449 Some(dbt) => format!("len={}", dbt.ops.len()),
450 None => "".to_owned(),
451 }
452 )
453 }
454}
455
456impl Drop for TableDBTransactionInner {
457 fn drop(&mut self) {
458 if self.dbt.is_some() {
459 let registry = &self.registry;
460 veilid_log!(registry error "Dropped transaction without commit or rollback");
461 }
462 }
463}
464
465#[derive(Debug, Clone)]
468pub struct TableDBTransaction {
469 db: TableDB,
470 inner: Arc<Mutex<TableDBTransactionInner>>,
471}
472
473impl VeilidComponentRegistryAccessor for TableDBTransaction {
474 fn registry(&self) -> VeilidComponentRegistry {
475 self.db.registry()
476 }
477}
478
479impl TableDBTransaction {
480 fn new(db: TableDB, dbt: DBTransaction) -> Self {
481 let registry = db.registry();
482 Self {
483 db,
484 inner: Arc::new(Mutex::new(TableDBTransactionInner {
485 registry,
486 dbt: Some(dbt),
487 })),
488 }
489 }
490
491 #[cfg_attr(
493 feature = "instrument",
494 instrument(level = "trace", target = "tstore", skip_all)
495 )]
496 pub async fn commit(self) -> VeilidAPIResult<()> {
497 let dbt = {
498 let mut inner = self.inner.lock();
499 inner
500 .dbt
501 .take()
502 .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
503 };
504
505 let db = self.db.unlocked_inner.database.clone();
506 db.write(dbt).await.map_err(|e| {
507 veilid_log!(self error "commit failed, transaction lost: {:?}", e);
508 VeilidAPIError::generic(format!("commit failed, transaction lost: {}", e))
509 })
510 }
511
512 #[cfg_attr(
514 feature = "instrument",
515 instrument(level = "trace", target = "tstore", skip_all)
516 )]
517 pub fn rollback(self) {
518 let mut inner = self.inner.lock();
519 inner.dbt = None;
520 }
521
522 #[cfg_attr(
524 feature = "instrument",
525 instrument(level = "trace", target = "tstore", skip_all)
526 )]
527 pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
528 if col >= self.db.opened_column_count {
529 apibail_generic!(
530 "Column exceeds opened column count {} >= {}",
531 col,
532 self.db.opened_column_count
533 );
534 }
535
536 let key = self.db.maybe_encrypt(key, true).await;
537 let value = self.db.maybe_encrypt(value, false).await;
538 let mut inner = self.inner.lock();
539 inner
540 .dbt
541 .as_mut()
542 .ok_or_else(|| VeilidAPIError::generic("store failed, transaction already completed"))?
543 .put_owned(col, key, value);
544 Ok(())
545 }
546
547 #[cfg_attr(
549 feature = "instrument",
550 instrument(level = "trace", target = "tstore", skip_all)
551 )]
552 pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
553 where
554 T: serde::Serialize,
555 {
556 let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
557 self.store(col, key, &value).await
558 }
559
560 #[cfg_attr(
562 feature = "instrument",
563 instrument(level = "trace", target = "tstore", skip_all)
564 )]
565 pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> {
566 if col >= self.db.opened_column_count {
567 apibail_generic!(
568 "Column exceeds opened column count {} >= {}",
569 col,
570 self.db.opened_column_count
571 );
572 }
573
574 let key = self.db.maybe_encrypt(key, true).await;
575 let mut inner = self.inner.lock();
576 inner
577 .dbt
578 .as_mut()
579 .ok_or_else(|| VeilidAPIError::generic("delete failed, transaction already completed"))?
580 .delete_owned(col, key);
581 Ok(())
582 }
583}