Skip to main content

veilid_core/table_store/
mod.rs

1use super::*;
2
3mod table_db;
4mod tasks;
5
6pub use table_db::*;
7
8#[cfg(any(test, feature = "test-util"))]
9#[doc(hidden)]
10pub mod tests_table_store;
11
12#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
13mod wasm;
14#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
15use wasm::*;
16#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
17mod native;
18#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
19use native::*;
20
21use keyvaluedb::*;
22use weak_table::WeakValueHashMap;
23
24impl_veilid_log_facility!("tstore");
25
26const ALL_TABLE_NAMES: &[u8] = b"all_table_names";
27const FLUSH_TABLES_INTERVAL_SECS: u32 = 60;
28const CLEANUP_TABLES_INTERVAL_SECS: u32 = 600;
29
30/// Description of column
31#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
32#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), derive(Tsify))]
33#[cfg_attr(feature = "json-camel-case", serde(rename_all = "camelCase"))]
34#[must_use]
35pub struct ColumnInfo {
36    pub key_count: AlignedU64,
37}
38
39/// IO Stats for table
40#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
41#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), derive(Tsify))]
42#[cfg_attr(feature = "json-camel-case", serde(rename_all = "camelCase"))]
43#[must_use]
44pub struct IOStatsInfo {
45    /// Number of transaction.
46    pub transactions: AlignedU64,
47    /// Number of read operations.
48    pub reads: AlignedU64,
49    /// Number of reads resulted in a read from cache.
50    pub cache_reads: AlignedU64,
51    /// Number of write operations.
52    pub writes: AlignedU64,
53    /// Number of bytes read
54    pub bytes_read: ByteCount,
55    /// Number of bytes read from cache
56    pub cache_read_bytes: ByteCount,
57    /// Number of bytes write
58    pub bytes_written: ByteCount,
59    /// Start of the statistic period.
60    pub started: Timestamp,
61    /// Total duration of the statistic period.
62    pub span: TimestampDuration,
63}
64
65/// Description of table
66#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
67#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), derive(Tsify))]
68#[cfg_attr(feature = "json-camel-case", serde(rename_all = "camelCase"))]
69#[must_use]
70pub struct TableInfo {
71    /// Internal table name
72    pub table_name: String,
73    /// IO statistics since previous query
74    pub io_stats_since_previous: IOStatsInfo,
75    /// IO statistics since database open
76    pub io_stats_overall: IOStatsInfo,
77    /// Total number of columns in the table
78    pub column_count: u32,
79    /// Column descriptions
80    pub columns: Vec<ColumnInfo>,
81}
82
83#[must_use]
84struct TableStoreInner {
85    opened: WeakValueHashMap<String, Weak<TableDBUnlockedInner>>,
86    encryption_key: Option<SharedSecret>,
87    all_table_names: HashMap<String, String>,
88    all_tables_db: Option<Database>,
89    /// Tick subscription
90    tick_subscription: Option<EventBusSubscription>,
91}
92
93impl fmt::Debug for TableStoreInner {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        f.debug_struct("TableStoreInner")
96            .field("opened", &self.opened)
97            .field("encryption_key", &self.encryption_key)
98            .field("all_table_names", &self.all_table_names)
99            .finish()
100    }
101}
102
103/// Veilid Table Storage.
104/// Database for storing key value pairs persistently and securely across runs.
105#[must_use]
106pub struct TableStore {
107    registry: VeilidComponentRegistry,
108    inner: Mutex<TableStoreInner>, // Sync mutex here because TableDB drops can happen at any time
109    table_store_driver: TableStoreDriver,
110    async_lock: Arc<AsyncMutex<()>>,
111    flush_tables_task: TickTask<EyreReport>,
112    cleanup_tables_task: TickTask<EyreReport>,
113}
114
115impl fmt::Debug for TableStore {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("TableStore")
118            .field("registry", &self.registry)
119            .field("inner", &self.inner)
120            .field("async_lock", &self.async_lock)
121            .finish()
122    }
123}
124
125impl_veilid_component!(TableStore);
126
127impl TableStore {
128    fn new_inner() -> TableStoreInner {
129        TableStoreInner {
130            opened: WeakValueHashMap::new(),
131            encryption_key: None,
132            all_table_names: HashMap::new(),
133            all_tables_db: None,
134            tick_subscription: None,
135        }
136    }
137    pub(crate) fn new(registry: VeilidComponentRegistry) -> Self {
138        let inner = Self::new_inner();
139        let table_store_driver = TableStoreDriver::new(registry.clone());
140
141        let this = Self {
142            registry,
143            inner: Mutex::new(inner),
144            table_store_driver,
145            async_lock: Arc::new(AsyncMutex::new(())),
146            flush_tables_task: TickTask::new("flush_tables_task", FLUSH_TABLES_INTERVAL_SECS),
147            cleanup_tables_task: TickTask::new("cleanup_tables_task", CLEANUP_TABLES_INTERVAL_SECS),
148        };
149
150        this.setup_tasks();
151
152        this
153    }
154
155    // Flush internal control state
156    async fn flush(&self) {
157        let (all_table_names_value, all_tables_db) = {
158            let inner = self.inner.lock();
159            let all_table_names_value = serialize_json_bytes(&inner.all_table_names);
160            (
161                all_table_names_value,
162                inner.all_tables_db.clone().unwrap_or_log(),
163            )
164        };
165        let mut dbt = DBTransaction::new();
166        dbt.put(0, ALL_TABLE_NAMES, &all_table_names_value);
167        if let Err(e) = all_tables_db.write(dbt).await {
168            error!("failed to write all tables db: {}", e);
169        }
170    }
171
172    // Cleanup/vacuum database
173    async fn cleanup(&self) {
174        // Vacuum the open databases while we're at it
175        let all_open_db: Vec<_> = {
176            let inner = self.inner.lock();
177            inner.opened.values().collect()
178        };
179        for db in all_open_db {
180            let tdb = TableDB::new_from_unlocked_inner(db, 0);
181            if let Err(e) = tdb.cleanup().await {
182                veilid_log!(self error "Error cleaning up database '{}': {}", tdb.table_name(), e);
183            }
184        }
185    }
186
187    // Internal naming support
188    // Adds rename capability and ensures names of tables are totally unique and valid
189
190    fn namespaced_name(&self, table: &str) -> VeilidAPIResult<String> {
191        if !table
192            .chars()
193            .all(|c| char::is_alphanumeric(c) || c == '_' || c == '-')
194        {
195            apibail_invalid_argument!("table name is invalid", "table", table);
196        }
197        let namespace = self.config().namespace.clone();
198        Ok(if namespace.is_empty() {
199            table.to_string()
200        } else {
201            format!("_ns_{}_{}", namespace, table)
202        })
203    }
204
205    fn name_get_or_create(&self, table: &str) -> VeilidAPIResult<String> {
206        let name = self.namespaced_name(table)?;
207
208        let mut inner = self.inner.lock();
209        // Do we have this name yet?
210        if let Some(real_name) = inner.all_table_names.get(&name) {
211            return Ok(real_name.clone());
212        }
213
214        // If not, make a new low level name mapping
215        let mut real_name_bytes = [0u8; 32];
216        random_bytes(&mut real_name_bytes);
217        let real_name = data_encoding::BASE64URL_NOPAD.encode(&real_name_bytes);
218
219        if inner
220            .all_table_names
221            .insert(name.to_owned(), real_name.clone())
222            .is_some()
223        {
224            panic!("should not have had some value");
225        };
226
227        Ok(real_name)
228    }
229
230    #[cfg_attr(
231        feature = "instrument",
232        instrument(level = "trace", target = "tstore", skip_all)
233    )]
234    fn name_delete(&self, table: &str) -> VeilidAPIResult<Option<String>> {
235        let name = self.namespaced_name(table)?;
236        let mut inner = self.inner.lock();
237        let real_name = inner.all_table_names.remove(&name);
238        Ok(real_name)
239    }
240
241    #[cfg_attr(
242        feature = "instrument",
243        instrument(level = "trace", target = "tstore", skip_all)
244    )]
245    fn name_get(&self, table: &str) -> VeilidAPIResult<Option<String>> {
246        let name = self.namespaced_name(table)?;
247        let inner = self.inner.lock();
248        let real_name = inner.all_table_names.get(&name).cloned();
249        Ok(real_name)
250    }
251
252    #[cfg_attr(
253        feature = "instrument",
254        instrument(level = "trace", target = "tstore", skip_all)
255    )]
256    fn name_rename(&self, old_table: &str, new_table: &str) -> VeilidAPIResult<()> {
257        let old_name = self.namespaced_name(old_table)?;
258        let new_name = self.namespaced_name(new_table)?;
259
260        let mut inner = self.inner.lock();
261        // Ensure new name doesn't exist
262        if inner.all_table_names.contains_key(&new_name) {
263            return Err(VeilidAPIError::generic("new table already exists"));
264        }
265        // Do we have this name yet?
266        let Some(real_name) = inner.all_table_names.remove(&old_name) else {
267            return Err(VeilidAPIError::generic("table does not exist"));
268        };
269        // Insert with new name
270        inner.all_table_names.insert(new_name.to_owned(), real_name);
271
272        Ok(())
273    }
274
275    /// List all known tables
276    #[cfg_attr(
277        feature = "instrument",
278        instrument(level = "trace", target = "tstore", skip_all)
279    )]
280    pub fn list_all(&self) -> Vec<(String, String)> {
281        let inner = self.inner.lock();
282        inner
283            .all_table_names
284            .iter()
285            .map(|(k, v)| (k.clone(), v.clone()))
286            .collect::<Vec<(String, String)>>()
287    }
288
289    /// Delete all known tables
290    #[cfg_attr(
291        feature = "instrument",
292        instrument(level = "trace", target = "tstore", skip_all)
293    )]
294    pub async fn delete_all(&self) {
295        // Get all tables
296        let real_names = {
297            let mut inner = self.inner.lock();
298            let real_names = inner
299                .all_table_names
300                .values()
301                .cloned()
302                .collect::<Vec<String>>();
303            inner.all_table_names.clear();
304            real_names
305        };
306
307        // Delete all tables
308        for table_name in real_names {
309            if let Err(e) = self.table_store_driver.delete(&table_name).await {
310                error!("error deleting table: {}", e);
311            }
312        }
313        self.flush().await;
314    }
315
316    #[cfg_attr(
317        feature = "instrument",
318        instrument(level = "trace", target = "tstore", skip_all)
319    )]
320    pub(crate) async fn maybe_unprotect_device_encryption_key(
321        &self,
322        dek_bytes: &[u8],
323        device_encryption_key_password: &str,
324    ) -> EyreResult<SharedSecret> {
325        // Ensure the key is at least as long as necessary
326        // to check for crypto kind
327        if dek_bytes.len() < 4 {
328            bail!("device encryption key is not valid");
329        }
330
331        // Get cryptosystem
332        let kind = CryptoKind::try_from(&dek_bytes[0..4]).unwrap_or_log();
333        let crypto = self.crypto();
334        let Some(vcrypto) = crypto.get_async(kind) else {
335            bail!("unsupported cryptosystem '{kind}'");
336        };
337
338        if !device_encryption_key_password.is_empty() {
339            if dek_bytes.len()
340                != (4
341                    + vcrypto.shared_secret_length()
342                    + vcrypto.aead_overhead()
343                    + vcrypto.nonce_length())
344            {
345                bail!("password protected device encryption key is not valid");
346            }
347            let protected_key =
348                &dek_bytes[4..(4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead())];
349            let nonce = Nonce::new(
350                &dek_bytes[(4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead())..],
351            );
352            let shared_secret = vcrypto
353                .derive_shared_secret(device_encryption_key_password.as_bytes(), &nonce)
354                .await
355                .wrap_err("failed to derive shared secret")?;
356
357            let unprotected_key = vcrypto
358                .decrypt_aead(protected_key, &nonce, &shared_secret, None)
359                .await
360                .wrap_err("failed to decrypt device encryption key")?;
361
362            return Ok(SharedSecret::new(
363                kind,
364                BareSharedSecret::new(unprotected_key.as_slice()),
365            ));
366        }
367
368        if dek_bytes.len() != (4 + vcrypto.shared_secret_length()) {
369            bail!("unprotected device encryption key is not valid");
370        }
371
372        Ok(SharedSecret::new(
373            kind,
374            BareSharedSecret::new(&dek_bytes[4..]),
375        ))
376    }
377
378    #[cfg_attr(
379        feature = "instrument",
380        instrument(level = "trace", target = "tstore", skip_all)
381    )]
382    pub(crate) async fn maybe_protect_device_encryption_key(
383        &self,
384        dek: SharedSecret,
385        device_encryption_key_password: &str,
386    ) -> EyreResult<Vec<u8>> {
387        // Check if we are to protect the key
388        if device_encryption_key_password.is_empty() {
389            veilid_log!(self debug "no dek password");
390            // Return the unprotected key bytes
391            return Ok(Vec::from(dek));
392        }
393
394        // Get cryptosystem
395        let crypto = self.crypto();
396        let Some(vcrypto) = crypto.get_async(dek.kind()) else {
397            bail!("unsupported cryptosystem '{}'", dek.kind());
398        };
399
400        let nonce = vcrypto.random_nonce().await;
401        let shared_secret = vcrypto
402            .derive_shared_secret(device_encryption_key_password.as_bytes(), &nonce)
403            .await
404            .wrap_err("failed to derive shared secret")?;
405        let protected_key = vcrypto
406            .encrypt_aead(dek.ref_value(), &nonce, &shared_secret, None)
407            .await
408            .wrap_err("failed to decrypt device encryption key")?;
409        let mut out = Vec::with_capacity(
410            4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead() + vcrypto.nonce_length(),
411        );
412        out.extend_from_slice(dek.kind().bytes());
413        out.extend_from_slice(&protected_key);
414        out.extend_from_slice(&nonce);
415        assert_eq!(
416            out.len(),
417            4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead() + vcrypto.nonce_length()
418        );
419        Ok(out)
420    }
421
422    #[cfg_attr(
423        feature = "instrument",
424        instrument(level = "trace", target = "tstore", skip_all)
425    )]
426    async fn load_device_encryption_key(&self) -> EyreResult<Option<SharedSecret>> {
427        let dek_bytes: Option<Vec<u8>> = self
428            .protected_store()
429            .load_user_secret("device_encryption_key")?;
430        let Some(dek_bytes) = dek_bytes else {
431            veilid_log!(self debug "no device encryption key");
432            return Ok(None);
433        };
434
435        // Get device encryption key protection password if we have it
436        let device_encryption_key_password = self
437            .config()
438            .protected_store
439            .device_encryption_key_password
440            .clone();
441
442        Ok(Some(
443            self.maybe_unprotect_device_encryption_key(&dek_bytes, &device_encryption_key_password)
444                .await?,
445        ))
446    }
447
448    #[cfg_attr(
449        feature = "instrument",
450        instrument(level = "trace", target = "tstore", skip_all)
451    )]
452    async fn save_device_encryption_key(
453        &self,
454        device_encryption_key: Option<SharedSecret>,
455    ) -> EyreResult<()> {
456        let Some(device_encryption_key) = device_encryption_key else {
457            // Remove the device encryption key
458            let existed = self
459                .protected_store()
460                .remove_user_secret("device_encryption_key")?;
461            veilid_log!(self debug "removed device encryption key. existed: {}", existed);
462            return Ok(());
463        };
464
465        // Get new device encryption key protection password if we are changing it
466        let new_device_encryption_key_password = self
467            .config()
468            .protected_store
469            .new_device_encryption_key_password
470            .clone();
471        let device_encryption_key_password =
472            if let Some(new_device_encryption_key_password) = new_device_encryption_key_password {
473                // Change password
474                veilid_log!(self debug "changing dek password");
475                new_device_encryption_key_password
476            } else {
477                // Get device encryption key protection password if we have it
478                veilid_log!(self debug "saving with existing dek password");
479                self.config()
480                    .protected_store
481                    .device_encryption_key_password
482                    .clone()
483            };
484
485        let dek_bytes = self
486            .maybe_protect_device_encryption_key(
487                device_encryption_key,
488                &device_encryption_key_password,
489            )
490            .await?;
491
492        // Save the new device encryption key
493        let existed = self
494            .protected_store()
495            .save_user_secret("device_encryption_key", &dek_bytes)?;
496        veilid_log!(self debug "saving device encryption key. existed: {}", existed);
497        Ok(())
498    }
499
500    fn log_facilities_impl(&self) -> VeilidComponentLogFacilities {
501        VeilidComponentLogFacilities::new().with_facility(
502            VeilidComponentLogFacility::try_new_with_tags("tstore", ["#common"]).unwrap(),
503        )
504    }
505
506    #[cfg_attr(
507        feature = "instrument",
508        instrument(level = "trace", target = "tstore", skip_all)
509    )]
510    async fn init_async(&self) -> EyreResult<()> {
511        {
512            let _async_guard = self.async_lock.lock().await;
513
514            // Get device encryption key from protected store
515            let mut device_encryption_key = self.load_device_encryption_key().await?;
516            let mut device_encryption_key_changed = false;
517            if let Some(device_encryption_key) = &device_encryption_key {
518                // If encryption in current use is not the best encryption, then run table migration
519                let best_kind = best_crypto_kind();
520                if device_encryption_key.kind() != best_kind {
521                    // XXX: Run migration. See issue #209
522                    veilid_log!(self error "Need to write migration support");
523                }
524            } else {
525                // If we don't have an encryption key yet, then make one with the best cryptography and save it
526                let crypto = self.crypto();
527                let vcrypto = crypto.best_async();
528                let shared_secret = vcrypto.random_shared_secret().await;
529
530                device_encryption_key = Some(shared_secret);
531                device_encryption_key_changed = true;
532            }
533
534            // Check for password change
535            let changing_password = self
536                .config()
537                .protected_store
538                .new_device_encryption_key_password
539                .is_some();
540
541            // Save encryption key if it has changed or if the protecting password wants to change
542            if device_encryption_key_changed || changing_password {
543                self.save_device_encryption_key(device_encryption_key.clone())
544                    .await?;
545            }
546
547            // Deserialize all table names
548            let all_tables_db = match self
549                .table_store_driver
550                .open("__veilid_all_tables", 1, 1)
551                .await
552            {
553                Ok(db) => db,
554                Err(e) => {
555                    veilid_log!(self error "failed to create all tables table: {}", e);
556                    return Err(e.into());
557                }
558            };
559            match all_tables_db.get(0, ALL_TABLE_NAMES).await {
560                Ok(Some(v)) => match deserialize_json_bytes::<HashMap<String, String>>(&v) {
561                    Ok(all_table_names) => {
562                        let mut inner = self.inner.lock();
563                        inner.all_table_names = all_table_names;
564                    }
565                    Err(e) => {
566                        veilid_log!(self error "could not deserialize __veilid_all_tables: {}", e);
567                    }
568                },
569                Ok(None) => {
570                    // No table names yet, that's okay
571                    veilid_log!(self trace "__veilid_all_tables is empty");
572                }
573                Err(e) => {
574                    veilid_log!(self error "could not get __veilid_all_tables: {}", e);
575                }
576            };
577
578            {
579                let mut inner = self.inner.lock();
580                inner.encryption_key = device_encryption_key;
581                inner.all_tables_db = Some(all_tables_db);
582            }
583
584            let do_delete = self.config().table_store.delete;
585
586            if do_delete {
587                self.delete_all().await;
588            }
589        }
590
591        // Set up crypto
592        let crypto = self.crypto();
593        crypto.table_store_setup(self).await?;
594
595        Ok(())
596    }
597
598    #[cfg_attr(
599        feature = "instrument",
600        instrument(level = "trace", target = "tstore", skip_all)
601    )]
602    #[allow(clippy::unused_async)]
603    async fn post_init_async(&self) -> EyreResult<()> {
604        // Register event handlers
605        let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler);
606
607        let mut inner = self.inner.lock();
608
609        // Schedule tick
610        inner.tick_subscription = Some(tick_subscription);
611
612        Ok(())
613    }
614
615    #[cfg_attr(
616        feature = "instrument",
617        instrument(level = "trace", target = "tstore", skip_all)
618    )]
619    async fn pre_terminate_async(&self) {
620        // Stop background operations
621        {
622            let mut inner = self.inner.lock();
623            if let Some(sub) = inner.tick_subscription.take() {
624                self.event_bus().unsubscribe(sub);
625            }
626        }
627
628        // Cancel all tasks associated with the tick future
629        self.cancel_tasks().await;
630    }
631
632    #[cfg_attr(
633        feature = "instrument",
634        instrument(level = "trace", target = "tstore", skip_all)
635    )]
636    async fn terminate_async(&self) {
637        let _async_guard = self.async_lock.lock().await;
638
639        self.flush().await;
640
641        let mut inner = self.inner.lock();
642        inner.opened.shrink_to_fit();
643        if !inner.opened.is_empty() {
644            veilid_log!(self warn
645                "all open databases should have been closed: {:?}",
646                inner.opened
647            );
648            inner.opened.clear();
649        }
650        inner.all_tables_db = None;
651        inner.all_table_names.clear();
652        inner.encryption_key = None;
653    }
654
655    /// Get or create a TableDB database table. If the column count is greater than an
656    /// existing TableDB's column count, the database will be upgraded to add the missing columns.
657    #[cfg_attr(
658        feature = "instrument",
659        instrument(level = "trace", target = "tstore", skip_all)
660    )]
661    pub async fn open(&self, name: &str, column_count: u32) -> VeilidAPIResult<TableDB> {
662        self.open_pooled(name, column_count, 1).await
663    }
664
665    #[cfg_attr(
666        feature = "instrument",
667        instrument(level = "trace", target = "tstore", skip_all)
668    )]
669    pub async fn open_pooled(
670        &self,
671        name: &str,
672        column_count: u32,
673        concurrency: usize,
674    ) -> VeilidAPIResult<TableDB> {
675        let _async_guard = self.async_lock.lock().await;
676
677        // If we aren't initialized yet, bail
678        {
679            let inner = self.inner.lock();
680            if inner.all_tables_db.is_none() {
681                apibail_not_initialized!();
682            }
683        }
684
685        let table_name = self.name_get_or_create(name)?;
686
687        // See if this table is already opened, if so the column count must be the same
688        {
689            let inner = self.inner.lock();
690            if let Some(table_db_unlocked_inner) = inner.opened.get(&table_name) {
691                let tdb = TableDB::new_from_unlocked_inner(table_db_unlocked_inner, column_count);
692
693                // Ensure column count isnt bigger
694                let existing_col_count = tdb.get_column_count()?;
695                if column_count > existing_col_count {
696                    return Err(VeilidAPIError::generic(format!(
697                        "database must be closed before increasing column count {} -> {}",
698                        existing_col_count, column_count,
699                    )));
700                }
701
702                return Ok(tdb);
703            }
704        }
705
706        // Open table db using platform-specific driver
707        let mut db = match self
708            .table_store_driver
709            .open(&table_name, column_count, concurrency)
710            .await
711        {
712            Ok(db) => db,
713            Err(e) => {
714                self.name_delete(name).expect_or_log("removing name failed");
715                self.flush().await;
716                return Err(e);
717            }
718        };
719
720        // Flush table names to disk
721        self.flush().await;
722
723        // If more columns are available, open the low level db with the max column count but restrict the tabledb object to the number requested
724        let existing_col_count = db.num_columns().map_err(VeilidAPIError::from)?;
725        if existing_col_count > column_count {
726            drop(db);
727            db = match self
728                .table_store_driver
729                .open(&table_name, existing_col_count, concurrency)
730                .await
731            {
732                Ok(db) => db,
733                Err(e) => {
734                    self.name_delete(name).expect_or_log("removing name failed");
735                    self.flush().await;
736                    return Err(e);
737                }
738            };
739        }
740
741        // Wrap low-level Database in TableDB object
742        let mut inner = self.inner.lock();
743        let table_db = TableDB::new(
744            table_name.clone(),
745            self.registry(),
746            db,
747            inner.encryption_key.clone(),
748            inner.encryption_key.clone(),
749            column_count,
750        );
751
752        // Keep track of opened DBs
753        inner
754            .opened
755            .insert(table_name.clone(), table_db.unlocked_inner());
756
757        Ok(table_db)
758    }
759
760    /// Delete a TableDB table by name
761    #[cfg_attr(
762        feature = "instrument",
763        instrument(level = "trace", target = "tstore", skip_all)
764    )]
765    pub async fn delete(&self, name: &str) -> VeilidAPIResult<bool> {
766        let _async_guard = self.async_lock.lock().await;
767        // If we aren't initialized yet, bail
768        {
769            let inner = self.inner.lock();
770            if inner.all_tables_db.is_none() {
771                apibail_not_initialized!();
772            }
773        }
774
775        let Some(table_name) = self.name_get(name)? else {
776            // Did not exist in name table
777            return Ok(false);
778        };
779
780        // See if this table is opened
781        {
782            let inner = self.inner.lock();
783            if inner.opened.contains_key(&table_name) {
784                apibail_generic!("Not deleting table that is still opened");
785            }
786        }
787
788        // Delete table db using platform-specific driver
789        let deleted = self.table_store_driver.delete(&table_name).await?;
790        if !deleted {
791            // Table missing? Just remove name
792            veilid_log!(self warn
793                "table existed in name table but not in storage: {} : {}",
794                name, table_name
795            );
796        }
797        self.name_delete(name)
798            .expect_or_log("failed to delete name");
799        self.flush().await;
800
801        Ok(true)
802    }
803
804    /// Get the description of a TableDB table
805    #[cfg_attr(
806        feature = "instrument",
807        instrument(level = "trace", target = "tstore", skip_all)
808    )]
809    pub async fn info(&self, name: &str) -> VeilidAPIResult<Option<TableInfo>> {
810        // Open with the default number of columns
811        let tdb = self.open(name, 0).await?;
812        let internal_name = tdb.table_name();
813        let io_stats_since_previous = tdb.io_stats(IoStatsKind::SincePrevious);
814        let io_stats_overall = tdb.io_stats(IoStatsKind::Overall);
815        let column_count = tdb.get_column_count()?;
816        let mut columns = Vec::<ColumnInfo>::with_capacity(column_count as usize);
817        for col in 0..column_count {
818            let key_count = tdb.get_key_count(col).await?;
819            columns.push(ColumnInfo {
820                key_count: AlignedU64::new(key_count),
821            })
822        }
823        Ok(Some(TableInfo {
824            table_name: internal_name,
825            io_stats_since_previous: IOStatsInfo {
826                transactions: AlignedU64::new(io_stats_since_previous.transactions),
827                reads: AlignedU64::new(io_stats_since_previous.reads),
828                cache_reads: AlignedU64::new(io_stats_since_previous.cache_reads),
829                writes: AlignedU64::new(io_stats_since_previous.writes),
830                bytes_read: ByteCount::new(io_stats_since_previous.bytes_read),
831                cache_read_bytes: ByteCount::new(io_stats_since_previous.cache_read_bytes),
832                bytes_written: ByteCount::new(io_stats_since_previous.bytes_written),
833                started: Timestamp::new(
834                    io_stats_since_previous
835                        .started
836                        .duration_since(std::time::SystemTime::UNIX_EPOCH)
837                        .unwrap_or_default()
838                        .as_micros() as u64,
839                ),
840                span: TimestampDuration::new(io_stats_since_previous.span.as_micros() as u64),
841            },
842            io_stats_overall: IOStatsInfo {
843                transactions: AlignedU64::new(io_stats_overall.transactions),
844                reads: AlignedU64::new(io_stats_overall.reads),
845                cache_reads: AlignedU64::new(io_stats_overall.cache_reads),
846                writes: AlignedU64::new(io_stats_overall.writes),
847                bytes_read: ByteCount::new(io_stats_overall.bytes_read),
848                cache_read_bytes: ByteCount::new(io_stats_overall.cache_read_bytes),
849                bytes_written: ByteCount::new(io_stats_overall.bytes_written),
850                started: Timestamp::new(
851                    io_stats_overall
852                        .started
853                        .duration_since(std::time::SystemTime::UNIX_EPOCH)
854                        .unwrap_or_default()
855                        .as_micros() as u64,
856                ),
857                span: TimestampDuration::new(io_stats_overall.span.as_micros() as u64),
858            },
859            column_count,
860            columns,
861        }))
862    }
863
864    /// Rename a TableDB table
865    #[cfg_attr(
866        feature = "instrument",
867        instrument(level = "trace", target = "tstore", skip_all)
868    )]
869    pub async fn rename(&self, old_name: &str, new_name: &str) -> VeilidAPIResult<()> {
870        let _async_guard = self.async_lock.lock().await;
871        // If we aren't initialized yet, bail
872        {
873            let inner = self.inner.lock();
874            if inner.all_tables_db.is_none() {
875                apibail_not_initialized!();
876            }
877        }
878        veilid_log!(self debug "TableStore::rename {} -> {}", old_name, new_name);
879        self.name_rename(old_name, new_name)?;
880        self.flush().await;
881        Ok(())
882    }
883
884    async fn tick_event_handler(&self, evt: Arc<TickEvent>) {
885        let lag = evt.last_tick_ts.map(|x| evt.cur_tick_ts.duration_since(x));
886        if let Err(e) = self.tick(lag).await {
887            error!("Error in table store tick: {}", e);
888        }
889    }
890}