Skip to main content

veilid_core/table_store/
table_db.rs

1use crate::*;
2
3cfg_if! {
4    if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
5        use keyvaluedb_web::*;
6        use keyvaluedb::*;
7    } else {
8        use keyvaluedb_sqlite::*;
9        use keyvaluedb::*;
10    }
11}
12
13impl_veilid_log_facility!("tstore");
14
15#[must_use]
16struct CryptInfo {
17    secret: SharedSecret,
18}
19impl CryptInfo {
20    pub fn new(secret: SharedSecret) -> Self {
21        Self { secret }
22    }
23}
24
25#[must_use]
26pub struct TableDBUnlockedInner {
27    registry: VeilidComponentRegistry,
28    table: String,
29    database: Database,
30    // Encryption and decryption key will be the same unless configured for an in-place migration
31    encrypt_info: Option<CryptInfo>,
32    decrypt_info: Option<CryptInfo>,
33}
34
35impl fmt::Debug for TableDBUnlockedInner {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        write!(f, "TableDBUnlockedInner(table={})", self.table)
38    }
39}
40
41#[derive(Debug, Clone)]
42#[must_use]
43pub struct TableDB {
44    opened_column_count: u32,
45    unlocked_inner: Arc<TableDBUnlockedInner>,
46}
47
48impl VeilidComponentRegistryAccessor for TableDB {
49    fn registry(&self) -> VeilidComponentRegistry {
50        self.unlocked_inner.registry.clone()
51    }
52}
53
54impl TableDB {
55    pub(super) fn new(
56        table: String,
57        registry: VeilidComponentRegistry,
58        database: Database,
59        encryption_key: Option<SharedSecret>,
60        decryption_key: Option<SharedSecret>,
61        opened_column_count: u32,
62    ) -> Self {
63        let encrypt_info = encryption_key.map(CryptInfo::new);
64        let decrypt_info = decryption_key.map(CryptInfo::new);
65
66        let total_columns = database.num_columns().unwrap_or_log();
67
68        Self {
69            opened_column_count: if opened_column_count == 0 {
70                total_columns
71            } else {
72                opened_column_count
73            },
74            unlocked_inner: Arc::new(TableDBUnlockedInner {
75                registry,
76                table,
77                database,
78                encrypt_info,
79                decrypt_info,
80            }),
81        }
82    }
83
84    pub(super) fn new_from_unlocked_inner(
85        unlocked_inner: Arc<TableDBUnlockedInner>,
86        opened_column_count: u32,
87    ) -> Self {
88        let db = &unlocked_inner.database;
89        let total_columns = db.num_columns().unwrap_or_log();
90        Self {
91            opened_column_count: if opened_column_count == 0 {
92                total_columns
93            } else {
94                opened_column_count
95            },
96            unlocked_inner,
97        }
98    }
99
100    pub(super) fn unlocked_inner(&self) -> Arc<TableDBUnlockedInner> {
101        self.unlocked_inner.clone()
102    }
103
104    /// Get the internal name of the table
105    #[must_use]
106    pub fn table_name(&self) -> String {
107        self.unlocked_inner.table.clone()
108    }
109
110    /// Get the io stats for the table
111    #[cfg_attr(
112        feature = "instrument",
113        instrument(level = "trace", target = "tstore", skip_all)
114    )]
115    #[must_use]
116    pub fn io_stats(&self, kind: IoStatsKind) -> IoStats {
117        self.unlocked_inner.database.io_stats(kind)
118    }
119
120    /// Cleanup the database
121    pub async fn cleanup(&self) -> VeilidAPIResult<()> {
122        self.unlocked_inner
123            .database
124            .cleanup()
125            .measure_debug(
126                TimestampDuration::new_secs(1),
127                veilid_log_dbg!(self, "TableDB::cleanup {}", self.table_name()),
128            )
129            .await
130            .map_err(VeilidAPIError::internal)
131    }
132
133    /// Get the total number of columns in the TableDB.
134    /// Not the number of columns that were opened, rather the total number that could be opened.
135    #[cfg_attr(
136        feature = "instrument",
137        instrument(level = "trace", target = "tstore", skip_all)
138    )]
139    pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
140        let db = &self.unlocked_inner.database;
141        db.num_columns().map_err(VeilidAPIError::from)
142    }
143
144    /// Estimate the storage size for a table entry
145    /// Overestimates size on disk because records are compressed in the tabledb
146    /// Rough guess for sqlite based on their file format. Other databases may vary.
147    pub fn estimate_storage_size(
148        &self,
149        _col: u32,
150        key: &[u8],
151        value: &[u8],
152    ) -> VeilidAPIResult<u64> {
153        let size =
154            // Count of fields byte
155            1 +
156            // Type of field byte
157            1 +
158            // Length of key times two because it uses hex encoding sometimes
159            key.len() * 2 +
160            // Length of key length
161            4 +
162            // Length of value
163            value.len() +
164            // Length of value length
165            4 +
166            // Extra padding for max length and whatever else
167            // XXX: at some point we should measure this on disk to figure out a better estimate :P
168            4;
169        size.try_into().map_err(VeilidAPIError::internal)
170    }
171
172    /// Estimate the storage size for a table entry if it is json encoded
173    pub fn estimate_storage_size_json<T>(
174        &self,
175        col: u32,
176        key: &[u8],
177        value: &T,
178    ) -> VeilidAPIResult<u64>
179    where
180        T: serde::Serialize,
181    {
182        let value_json = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
183        self.estimate_storage_size(col, key, &value_json)
184    }
185
186    /// Encrypt buffer using encrypt key and prepend nonce to output.
187    /// Keyed nonces are unique because keys must be unique.
188    /// Normally they must be sequential or random, but the critical.
189    /// requirement is that they are different for each encryption
190    /// but if the contents are guaranteed to be unique, then a nonce
191    /// can be generated from the hash of the contents and the encryption key itself.
192    #[cfg_attr(
193        feature = "instrument",
194        instrument(level = "trace", target = "tstore", skip_all)
195    )]
196    async fn maybe_encrypt(&self, data: &[u8], keyed_nonce: bool) -> Vec<u8> {
197        let data = compress_prepend_size(data);
198        if let Some(ei) = &self.unlocked_inner.encrypt_info {
199            let crypto = self.crypto();
200            let vcrypto = crypto.get_async(ei.secret.kind()).unwrap_or_log();
201            let mut out = unsafe { unaligned_u8_vec_uninit(vcrypto.nonce_length() + data.len()) };
202
203            if keyed_nonce {
204                // Key content nonce
205                let mut noncedata = Vec::with_capacity(data.len() + ei.secret.ref_value().len());
206                noncedata.extend_from_slice(&data);
207                noncedata.extend_from_slice(ei.secret.ref_value());
208                let noncehash = vcrypto.generate_hash(&noncedata).await.value();
209                // Key content nonce is first 'nonce_length' bytes of generated hash
210                out[0..vcrypto.nonce_length()]
211                    .copy_from_slice(&noncehash[0..vcrypto.nonce_length()]);
212            } else {
213                // Random nonce
214                random_bytes(&mut out[0..vcrypto.nonce_length()]);
215            }
216
217            let (nonce, encout) = out.split_at_mut(vcrypto.nonce_length());
218            vcrypto
219                .crypt_b2b_no_auth(&data, encout, &Nonce::new(nonce), &ei.secret)
220                .await
221                .unwrap_or_log();
222            out
223        } else {
224            data
225        }
226    }
227
228    /// Decrypt buffer using decrypt key with nonce prepended to input
229    #[cfg_attr(
230        feature = "instrument",
231        instrument(level = "trace", target = "tstore", skip_all)
232    )]
233    async fn maybe_decrypt(&self, data: &[u8]) -> std::io::Result<Vec<u8>> {
234        if let Some(di) = &self.unlocked_inner.decrypt_info {
235            let crypto = self.crypto();
236            let vcrypto = crypto.get_async(di.secret.kind()).unwrap_or_log();
237            assert!(data.len() >= vcrypto.nonce_length());
238            if data.len() == vcrypto.nonce_length() {
239                return Ok(Vec::new());
240            }
241
242            let mut out = unsafe { unaligned_u8_vec_uninit(data.len() - vcrypto.nonce_length()) };
243
244            vcrypto
245                .crypt_b2b_no_auth(
246                    &data[vcrypto.nonce_length()..],
247                    &mut out,
248                    &Nonce::new(&data[0..vcrypto.nonce_length()]),
249                    &di.secret,
250                )
251                .await
252                .unwrap_or_log();
253            decompress_size_prepended(&out, None).map_err(|e| std::io::Error::other(e.to_string()))
254        } else {
255            decompress_size_prepended(data, None).map_err(|e| std::io::Error::other(e.to_string()))
256        }
257    }
258
259    /// Get the list of keys in a column of the TableDB
260    #[cfg_attr(
261        feature = "instrument",
262        instrument(level = "trace", target = "tstore", skip_all)
263    )]
264    pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
265        if col >= self.opened_column_count {
266            apibail_generic!(
267                "Column exceeds opened column count {} >= {}",
268                col,
269                self.opened_column_count
270            );
271        }
272        let db = self.unlocked_inner.database.clone();
273        let out = Vec::new();
274        let (mut out, _) = db
275            .iter_keys(col, None, out, |out, ekey| {
276                //let key = self.maybe_decrypt(k).await?;
277                out.push(ekey.clone());
278                Ok(Option::<()>::None)
279            })
280            .await
281            .map_err(VeilidAPIError::from)?;
282
283        for k in &mut out {
284            *k = self.maybe_decrypt(k).await.map_err(VeilidAPIError::from)?;
285        }
286
287        Ok(out)
288    }
289
290    /// Get the number of keys in a column of the TableDB
291    #[cfg_attr(
292        feature = "instrument",
293        instrument(level = "trace", target = "tstore", skip_all)
294    )]
295    pub async fn get_key_count(&self, col: u32) -> VeilidAPIResult<u64> {
296        if col >= self.opened_column_count {
297            apibail_generic!(
298                "Column exceeds opened column count {} >= {}",
299                col,
300                self.opened_column_count
301            );
302        }
303        let db = self.unlocked_inner.database.clone();
304        let key_count = db.num_keys(col).await.map_err(VeilidAPIError::from)?;
305        Ok(key_count)
306    }
307
308    /// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping.
309    #[cfg_attr(
310        feature = "instrument",
311        instrument(level = "trace", target = "tstore", skip_all)
312    )]
313    #[must_use]
314    pub fn transact(&self) -> TableDBTransaction {
315        let dbt = self.unlocked_inner.database.transaction();
316        TableDBTransaction::new(self.clone(), dbt)
317    }
318
319    /// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
320    #[cfg_attr(
321        feature = "instrument",
322        instrument(level = "trace", target = "tstore", skip_all)
323    )]
324    pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
325        if col >= self.opened_column_count {
326            apibail_generic!(
327                "Column exceeds opened column count {} >= {}",
328                col,
329                self.opened_column_count
330            );
331        }
332        let db = self.unlocked_inner.database.clone();
333        let mut dbt = db.transaction();
334        dbt.put(
335            col,
336            self.maybe_encrypt(key, true).await,
337            self.maybe_encrypt(value, false).await,
338        );
339        db.write(dbt).await.map_err(VeilidAPIError::generic)
340    }
341
342    /// Store a key in json format with a value in a column in the TableDB. Performs a single transaction immediately.
343    #[cfg_attr(
344        feature = "instrument",
345        instrument(level = "trace", target = "tstore", skip_all)
346    )]
347    pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
348    where
349        T: serde::Serialize,
350    {
351        let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
352        self.store(col, key, &value).await
353    }
354
355    /// Read a key from a column in the TableDB immediately.
356    #[cfg_attr(
357        feature = "instrument",
358        instrument(level = "trace", target = "tstore", skip_all)
359    )]
360    pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
361        if col >= self.opened_column_count {
362            apibail_generic!(
363                "Column exceeds opened column count {} >= {}",
364                col,
365                self.opened_column_count
366            );
367        }
368        let db = self.unlocked_inner.database.clone();
369        let key = self.maybe_encrypt(key, true).await;
370        match db.get(col, &key).await.map_err(VeilidAPIError::from)? {
371            Some(v) => Ok(Some(
372                self.maybe_decrypt(&v).await.map_err(VeilidAPIError::from)?,
373            )),
374            None => Ok(None),
375        }
376    }
377
378    /// Read an serde-json key from a column in the TableDB immediately
379    #[cfg_attr(
380        feature = "instrument",
381        instrument(level = "trace", target = "tstore", skip_all)
382    )]
383    pub async fn load_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
384    where
385        T: for<'de> serde::Deserialize<'de>,
386    {
387        let out = match self.load(col, key).await? {
388            Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
389            None => None,
390        };
391        Ok(out)
392    }
393
394    /// Delete key with from a column in the TableDB
395    #[cfg_attr(
396        feature = "instrument",
397        instrument(level = "trace", target = "tstore", skip_all)
398    )]
399    pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
400        if col >= self.opened_column_count {
401            apibail_generic!(
402                "Column exceeds opened column count {} >= {}",
403                col,
404                self.opened_column_count
405            );
406        }
407        let key = self.maybe_encrypt(key, true).await;
408
409        let db = self.unlocked_inner.database.clone();
410
411        match db.delete(col, &key).await.map_err(VeilidAPIError::from)? {
412            Some(v) => Ok(Some(
413                self.maybe_decrypt(&v).await.map_err(VeilidAPIError::from)?,
414            )),
415            None => Ok(None),
416        }
417    }
418
419    /// Delete serde-json key with from a column in the TableDB
420    #[cfg_attr(
421        feature = "instrument",
422        instrument(level = "trace", target = "tstore", skip_all)
423    )]
424    pub async fn delete_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
425    where
426        T: for<'de> serde::Deserialize<'de>,
427    {
428        let old_value = match self.delete(col, key).await? {
429            Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
430            None => None,
431        };
432        Ok(old_value)
433    }
434}
435
436////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
437
438struct TableDBTransactionInner {
439    registry: VeilidComponentRegistry,
440    dbt: Option<DBTransaction>,
441}
442
443impl fmt::Debug for TableDBTransactionInner {
444    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
445        write!(
446            f,
447            "TableDBTransactionInner({})",
448            match &self.dbt {
449                Some(dbt) => format!("len={}", dbt.ops.len()),
450                None => "".to_owned(),
451            }
452        )
453    }
454}
455
456impl Drop for TableDBTransactionInner {
457    fn drop(&mut self) {
458        if self.dbt.is_some() {
459            let registry = &self.registry;
460            veilid_log!(registry error "Dropped transaction without commit or rollback");
461        }
462    }
463}
464
465/// A TableDB transaction
466/// Atomically commits a group of writes or deletes to the TableDB
467#[derive(Debug, Clone)]
468pub struct TableDBTransaction {
469    db: TableDB,
470    inner: Arc<Mutex<TableDBTransactionInner>>,
471}
472
473impl VeilidComponentRegistryAccessor for TableDBTransaction {
474    fn registry(&self) -> VeilidComponentRegistry {
475        self.db.registry()
476    }
477}
478
479impl TableDBTransaction {
480    fn new(db: TableDB, dbt: DBTransaction) -> Self {
481        let registry = db.registry();
482        Self {
483            db,
484            inner: Arc::new(Mutex::new(TableDBTransactionInner {
485                registry,
486                dbt: Some(dbt),
487            })),
488        }
489    }
490
491    /// Commit the transaction. Performs all actions atomically.
492    #[cfg_attr(
493        feature = "instrument",
494        instrument(level = "trace", target = "tstore", skip_all)
495    )]
496    pub async fn commit(self) -> VeilidAPIResult<()> {
497        let dbt = {
498            let mut inner = self.inner.lock();
499            inner
500                .dbt
501                .take()
502                .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
503        };
504
505        let db = self.db.unlocked_inner.database.clone();
506        db.write(dbt).await.map_err(|e| {
507            veilid_log!(self error "commit failed, transaction lost: {:?}", e);
508            VeilidAPIError::generic(format!("commit failed, transaction lost: {}", e))
509        })
510    }
511
512    /// Rollback the transaction. Does nothing to the TableDB.
513    #[cfg_attr(
514        feature = "instrument",
515        instrument(level = "trace", target = "tstore", skip_all)
516    )]
517    pub fn rollback(self) {
518        let mut inner = self.inner.lock();
519        inner.dbt = None;
520    }
521
522    /// Store a key with a value in a column in the TableDB
523    #[cfg_attr(
524        feature = "instrument",
525        instrument(level = "trace", target = "tstore", skip_all)
526    )]
527    pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
528        if col >= self.db.opened_column_count {
529            apibail_generic!(
530                "Column exceeds opened column count {} >= {}",
531                col,
532                self.db.opened_column_count
533            );
534        }
535
536        let key = self.db.maybe_encrypt(key, true).await;
537        let value = self.db.maybe_encrypt(value, false).await;
538        let mut inner = self.inner.lock();
539        inner
540            .dbt
541            .as_mut()
542            .ok_or_else(|| VeilidAPIError::generic("store failed, transaction already completed"))?
543            .put_owned(col, key, value);
544        Ok(())
545    }
546
547    /// Store a key in json format with a value in a column in the TableDB
548    #[cfg_attr(
549        feature = "instrument",
550        instrument(level = "trace", target = "tstore", skip_all)
551    )]
552    pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
553    where
554        T: serde::Serialize,
555    {
556        let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
557        self.store(col, key, &value).await
558    }
559
560    /// Delete key with from a column in the TableDB
561    #[cfg_attr(
562        feature = "instrument",
563        instrument(level = "trace", target = "tstore", skip_all)
564    )]
565    pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> {
566        if col >= self.db.opened_column_count {
567            apibail_generic!(
568                "Column exceeds opened column count {} >= {}",
569                col,
570                self.db.opened_column_count
571            );
572        }
573
574        let key = self.db.maybe_encrypt(key, true).await;
575        let mut inner = self.inner.lock();
576        inner
577            .dbt
578            .as_mut()
579            .ok_or_else(|| VeilidAPIError::generic("delete failed, transaction already completed"))?
580            .delete_owned(col, key);
581        Ok(())
582    }
583}