1use super::*;
2
3mod table_db;
4mod tasks;
5
6pub use table_db::*;
7
8#[cfg(any(test, feature = "test-util"))]
9#[doc(hidden)]
10pub mod tests_table_store;
11
12#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
13mod wasm;
14#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
15use wasm::*;
16#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
17mod native;
18#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
19use native::*;
20
21use keyvaluedb::*;
22use weak_table::WeakValueHashMap;
23
24impl_veilid_log_facility!("tstore");
25
26const ALL_TABLE_NAMES: &[u8] = b"all_table_names";
27const FLUSH_TABLES_INTERVAL_SECS: u32 = 60;
28const CLEANUP_TABLES_INTERVAL_SECS: u32 = 600;
29
30#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
32#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), derive(Tsify))]
33#[cfg_attr(feature = "json-camel-case", serde(rename_all = "camelCase"))]
34#[must_use]
35pub struct ColumnInfo {
36 pub key_count: AlignedU64,
37}
38
39#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
41#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), derive(Tsify))]
42#[cfg_attr(feature = "json-camel-case", serde(rename_all = "camelCase"))]
43#[must_use]
44pub struct IOStatsInfo {
45 pub transactions: AlignedU64,
47 pub reads: AlignedU64,
49 pub cache_reads: AlignedU64,
51 pub writes: AlignedU64,
53 pub bytes_read: ByteCount,
55 pub cache_read_bytes: ByteCount,
57 pub bytes_written: ByteCount,
59 pub started: Timestamp,
61 pub span: TimestampDuration,
63}
64
65#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
67#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), derive(Tsify))]
68#[cfg_attr(feature = "json-camel-case", serde(rename_all = "camelCase"))]
69#[must_use]
70pub struct TableInfo {
71 pub table_name: String,
73 pub io_stats_since_previous: IOStatsInfo,
75 pub io_stats_overall: IOStatsInfo,
77 pub column_count: u32,
79 pub columns: Vec<ColumnInfo>,
81}
82
83#[must_use]
84struct TableStoreInner {
85 opened: WeakValueHashMap<String, Weak<TableDBUnlockedInner>>,
86 encryption_key: Option<SharedSecret>,
87 all_table_names: HashMap<String, String>,
88 all_tables_db: Option<Database>,
89 tick_subscription: Option<EventBusSubscription>,
91}
92
93impl fmt::Debug for TableStoreInner {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 f.debug_struct("TableStoreInner")
96 .field("opened", &self.opened)
97 .field("encryption_key", &self.encryption_key)
98 .field("all_table_names", &self.all_table_names)
99 .finish()
100 }
101}
102
103#[must_use]
106pub struct TableStore {
107 registry: VeilidComponentRegistry,
108 inner: Mutex<TableStoreInner>, table_store_driver: TableStoreDriver,
110 async_lock: Arc<AsyncMutex<()>>,
111 flush_tables_task: TickTask<EyreReport>,
112 cleanup_tables_task: TickTask<EyreReport>,
113}
114
115impl fmt::Debug for TableStore {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("TableStore")
118 .field("registry", &self.registry)
119 .field("inner", &self.inner)
120 .field("async_lock", &self.async_lock)
121 .finish()
122 }
123}
124
125impl_veilid_component!(TableStore);
126
127impl TableStore {
128 fn new_inner() -> TableStoreInner {
129 TableStoreInner {
130 opened: WeakValueHashMap::new(),
131 encryption_key: None,
132 all_table_names: HashMap::new(),
133 all_tables_db: None,
134 tick_subscription: None,
135 }
136 }
137 pub(crate) fn new(registry: VeilidComponentRegistry) -> Self {
138 let inner = Self::new_inner();
139 let table_store_driver = TableStoreDriver::new(registry.clone());
140
141 let this = Self {
142 registry,
143 inner: Mutex::new(inner),
144 table_store_driver,
145 async_lock: Arc::new(AsyncMutex::new(())),
146 flush_tables_task: TickTask::new("flush_tables_task", FLUSH_TABLES_INTERVAL_SECS),
147 cleanup_tables_task: TickTask::new("cleanup_tables_task", CLEANUP_TABLES_INTERVAL_SECS),
148 };
149
150 this.setup_tasks();
151
152 this
153 }
154
155 async fn flush(&self) {
157 let (all_table_names_value, all_tables_db) = {
158 let inner = self.inner.lock();
159 let all_table_names_value = serialize_json_bytes(&inner.all_table_names);
160 (
161 all_table_names_value,
162 inner.all_tables_db.clone().unwrap_or_log(),
163 )
164 };
165 let mut dbt = DBTransaction::new();
166 dbt.put(0, ALL_TABLE_NAMES, &all_table_names_value);
167 if let Err(e) = all_tables_db.write(dbt).await {
168 error!("failed to write all tables db: {}", e);
169 }
170 }
171
172 async fn cleanup(&self) {
174 let all_open_db: Vec<_> = {
176 let inner = self.inner.lock();
177 inner.opened.values().collect()
178 };
179 for db in all_open_db {
180 let tdb = TableDB::new_from_unlocked_inner(db, 0);
181 if let Err(e) = tdb.cleanup().await {
182 veilid_log!(self error "Error cleaning up database '{}': {}", tdb.table_name(), e);
183 }
184 }
185 }
186
187 fn namespaced_name(&self, table: &str) -> VeilidAPIResult<String> {
191 if !table
192 .chars()
193 .all(|c| char::is_alphanumeric(c) || c == '_' || c == '-')
194 {
195 apibail_invalid_argument!("table name is invalid", "table", table);
196 }
197 let namespace = self.config().namespace.clone();
198 Ok(if namespace.is_empty() {
199 table.to_string()
200 } else {
201 format!("_ns_{}_{}", namespace, table)
202 })
203 }
204
205 fn name_get_or_create(&self, table: &str) -> VeilidAPIResult<String> {
206 let name = self.namespaced_name(table)?;
207
208 let mut inner = self.inner.lock();
209 if let Some(real_name) = inner.all_table_names.get(&name) {
211 return Ok(real_name.clone());
212 }
213
214 let mut real_name_bytes = [0u8; 32];
216 random_bytes(&mut real_name_bytes);
217 let real_name = data_encoding::BASE64URL_NOPAD.encode(&real_name_bytes);
218
219 if inner
220 .all_table_names
221 .insert(name.to_owned(), real_name.clone())
222 .is_some()
223 {
224 panic!("should not have had some value");
225 };
226
227 Ok(real_name)
228 }
229
230 #[cfg_attr(
231 feature = "instrument",
232 instrument(level = "trace", target = "tstore", skip_all)
233 )]
234 fn name_delete(&self, table: &str) -> VeilidAPIResult<Option<String>> {
235 let name = self.namespaced_name(table)?;
236 let mut inner = self.inner.lock();
237 let real_name = inner.all_table_names.remove(&name);
238 Ok(real_name)
239 }
240
241 #[cfg_attr(
242 feature = "instrument",
243 instrument(level = "trace", target = "tstore", skip_all)
244 )]
245 fn name_get(&self, table: &str) -> VeilidAPIResult<Option<String>> {
246 let name = self.namespaced_name(table)?;
247 let inner = self.inner.lock();
248 let real_name = inner.all_table_names.get(&name).cloned();
249 Ok(real_name)
250 }
251
252 #[cfg_attr(
253 feature = "instrument",
254 instrument(level = "trace", target = "tstore", skip_all)
255 )]
256 fn name_rename(&self, old_table: &str, new_table: &str) -> VeilidAPIResult<()> {
257 let old_name = self.namespaced_name(old_table)?;
258 let new_name = self.namespaced_name(new_table)?;
259
260 let mut inner = self.inner.lock();
261 if inner.all_table_names.contains_key(&new_name) {
263 return Err(VeilidAPIError::generic("new table already exists"));
264 }
265 let Some(real_name) = inner.all_table_names.remove(&old_name) else {
267 return Err(VeilidAPIError::generic("table does not exist"));
268 };
269 inner.all_table_names.insert(new_name.to_owned(), real_name);
271
272 Ok(())
273 }
274
275 #[cfg_attr(
277 feature = "instrument",
278 instrument(level = "trace", target = "tstore", skip_all)
279 )]
280 pub fn list_all(&self) -> Vec<(String, String)> {
281 let inner = self.inner.lock();
282 inner
283 .all_table_names
284 .iter()
285 .map(|(k, v)| (k.clone(), v.clone()))
286 .collect::<Vec<(String, String)>>()
287 }
288
289 #[cfg_attr(
291 feature = "instrument",
292 instrument(level = "trace", target = "tstore", skip_all)
293 )]
294 pub async fn delete_all(&self) {
295 let real_names = {
297 let mut inner = self.inner.lock();
298 let real_names = inner
299 .all_table_names
300 .values()
301 .cloned()
302 .collect::<Vec<String>>();
303 inner.all_table_names.clear();
304 real_names
305 };
306
307 for table_name in real_names {
309 if let Err(e) = self.table_store_driver.delete(&table_name).await {
310 error!("error deleting table: {}", e);
311 }
312 }
313 self.flush().await;
314 }
315
316 #[cfg_attr(
317 feature = "instrument",
318 instrument(level = "trace", target = "tstore", skip_all)
319 )]
320 pub(crate) async fn maybe_unprotect_device_encryption_key(
321 &self,
322 dek_bytes: &[u8],
323 device_encryption_key_password: &str,
324 ) -> EyreResult<SharedSecret> {
325 if dek_bytes.len() < 4 {
328 bail!("device encryption key is not valid");
329 }
330
331 let kind = CryptoKind::try_from(&dek_bytes[0..4]).unwrap_or_log();
333 let crypto = self.crypto();
334 let Some(vcrypto) = crypto.get_async(kind) else {
335 bail!("unsupported cryptosystem '{kind}'");
336 };
337
338 if !device_encryption_key_password.is_empty() {
339 if dek_bytes.len()
340 != (4
341 + vcrypto.shared_secret_length()
342 + vcrypto.aead_overhead()
343 + vcrypto.nonce_length())
344 {
345 bail!("password protected device encryption key is not valid");
346 }
347 let protected_key =
348 &dek_bytes[4..(4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead())];
349 let nonce = Nonce::new(
350 &dek_bytes[(4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead())..],
351 );
352 let shared_secret = vcrypto
353 .derive_shared_secret(device_encryption_key_password.as_bytes(), &nonce)
354 .await
355 .wrap_err("failed to derive shared secret")?;
356
357 let unprotected_key = vcrypto
358 .decrypt_aead(protected_key, &nonce, &shared_secret, None)
359 .await
360 .wrap_err("failed to decrypt device encryption key")?;
361
362 return Ok(SharedSecret::new(
363 kind,
364 BareSharedSecret::new(unprotected_key.as_slice()),
365 ));
366 }
367
368 if dek_bytes.len() != (4 + vcrypto.shared_secret_length()) {
369 bail!("unprotected device encryption key is not valid");
370 }
371
372 Ok(SharedSecret::new(
373 kind,
374 BareSharedSecret::new(&dek_bytes[4..]),
375 ))
376 }
377
378 #[cfg_attr(
379 feature = "instrument",
380 instrument(level = "trace", target = "tstore", skip_all)
381 )]
382 pub(crate) async fn maybe_protect_device_encryption_key(
383 &self,
384 dek: SharedSecret,
385 device_encryption_key_password: &str,
386 ) -> EyreResult<Vec<u8>> {
387 if device_encryption_key_password.is_empty() {
389 veilid_log!(self debug "no dek password");
390 return Ok(Vec::from(dek));
392 }
393
394 let crypto = self.crypto();
396 let Some(vcrypto) = crypto.get_async(dek.kind()) else {
397 bail!("unsupported cryptosystem '{}'", dek.kind());
398 };
399
400 let nonce = vcrypto.random_nonce().await;
401 let shared_secret = vcrypto
402 .derive_shared_secret(device_encryption_key_password.as_bytes(), &nonce)
403 .await
404 .wrap_err("failed to derive shared secret")?;
405 let protected_key = vcrypto
406 .encrypt_aead(dek.ref_value(), &nonce, &shared_secret, None)
407 .await
408 .wrap_err("failed to decrypt device encryption key")?;
409 let mut out = Vec::with_capacity(
410 4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead() + vcrypto.nonce_length(),
411 );
412 out.extend_from_slice(dek.kind().bytes());
413 out.extend_from_slice(&protected_key);
414 out.extend_from_slice(&nonce);
415 assert_eq!(
416 out.len(),
417 4 + vcrypto.shared_secret_length() + vcrypto.aead_overhead() + vcrypto.nonce_length()
418 );
419 Ok(out)
420 }
421
422 #[cfg_attr(
423 feature = "instrument",
424 instrument(level = "trace", target = "tstore", skip_all)
425 )]
426 async fn load_device_encryption_key(&self) -> EyreResult<Option<SharedSecret>> {
427 let dek_bytes: Option<Vec<u8>> = self
428 .protected_store()
429 .load_user_secret("device_encryption_key")?;
430 let Some(dek_bytes) = dek_bytes else {
431 veilid_log!(self debug "no device encryption key");
432 return Ok(None);
433 };
434
435 let device_encryption_key_password = self
437 .config()
438 .protected_store
439 .device_encryption_key_password
440 .clone();
441
442 Ok(Some(
443 self.maybe_unprotect_device_encryption_key(&dek_bytes, &device_encryption_key_password)
444 .await?,
445 ))
446 }
447
448 #[cfg_attr(
449 feature = "instrument",
450 instrument(level = "trace", target = "tstore", skip_all)
451 )]
452 async fn save_device_encryption_key(
453 &self,
454 device_encryption_key: Option<SharedSecret>,
455 ) -> EyreResult<()> {
456 let Some(device_encryption_key) = device_encryption_key else {
457 let existed = self
459 .protected_store()
460 .remove_user_secret("device_encryption_key")?;
461 veilid_log!(self debug "removed device encryption key. existed: {}", existed);
462 return Ok(());
463 };
464
465 let new_device_encryption_key_password = self
467 .config()
468 .protected_store
469 .new_device_encryption_key_password
470 .clone();
471 let device_encryption_key_password =
472 if let Some(new_device_encryption_key_password) = new_device_encryption_key_password {
473 veilid_log!(self debug "changing dek password");
475 new_device_encryption_key_password
476 } else {
477 veilid_log!(self debug "saving with existing dek password");
479 self.config()
480 .protected_store
481 .device_encryption_key_password
482 .clone()
483 };
484
485 let dek_bytes = self
486 .maybe_protect_device_encryption_key(
487 device_encryption_key,
488 &device_encryption_key_password,
489 )
490 .await?;
491
492 let existed = self
494 .protected_store()
495 .save_user_secret("device_encryption_key", &dek_bytes)?;
496 veilid_log!(self debug "saving device encryption key. existed: {}", existed);
497 Ok(())
498 }
499
500 fn log_facilities_impl(&self) -> VeilidComponentLogFacilities {
501 VeilidComponentLogFacilities::new().with_facility(
502 VeilidComponentLogFacility::try_new_with_tags("tstore", ["#common"]).unwrap(),
503 )
504 }
505
506 #[cfg_attr(
507 feature = "instrument",
508 instrument(level = "trace", target = "tstore", skip_all)
509 )]
510 async fn init_async(&self) -> EyreResult<()> {
511 {
512 let _async_guard = self.async_lock.lock().await;
513
514 let mut device_encryption_key = self.load_device_encryption_key().await?;
516 let mut device_encryption_key_changed = false;
517 if let Some(device_encryption_key) = &device_encryption_key {
518 let best_kind = best_crypto_kind();
520 if device_encryption_key.kind() != best_kind {
521 veilid_log!(self error "Need to write migration support");
523 }
524 } else {
525 let crypto = self.crypto();
527 let vcrypto = crypto.best_async();
528 let shared_secret = vcrypto.random_shared_secret().await;
529
530 device_encryption_key = Some(shared_secret);
531 device_encryption_key_changed = true;
532 }
533
534 let changing_password = self
536 .config()
537 .protected_store
538 .new_device_encryption_key_password
539 .is_some();
540
541 if device_encryption_key_changed || changing_password {
543 self.save_device_encryption_key(device_encryption_key.clone())
544 .await?;
545 }
546
547 let all_tables_db = match self
549 .table_store_driver
550 .open("__veilid_all_tables", 1, 1)
551 .await
552 {
553 Ok(db) => db,
554 Err(e) => {
555 veilid_log!(self error "failed to create all tables table: {}", e);
556 return Err(e.into());
557 }
558 };
559 match all_tables_db.get(0, ALL_TABLE_NAMES).await {
560 Ok(Some(v)) => match deserialize_json_bytes::<HashMap<String, String>>(&v) {
561 Ok(all_table_names) => {
562 let mut inner = self.inner.lock();
563 inner.all_table_names = all_table_names;
564 }
565 Err(e) => {
566 veilid_log!(self error "could not deserialize __veilid_all_tables: {}", e);
567 }
568 },
569 Ok(None) => {
570 veilid_log!(self trace "__veilid_all_tables is empty");
572 }
573 Err(e) => {
574 veilid_log!(self error "could not get __veilid_all_tables: {}", e);
575 }
576 };
577
578 {
579 let mut inner = self.inner.lock();
580 inner.encryption_key = device_encryption_key;
581 inner.all_tables_db = Some(all_tables_db);
582 }
583
584 let do_delete = self.config().table_store.delete;
585
586 if do_delete {
587 self.delete_all().await;
588 }
589 }
590
591 let crypto = self.crypto();
593 crypto.table_store_setup(self).await?;
594
595 Ok(())
596 }
597
598 #[cfg_attr(
599 feature = "instrument",
600 instrument(level = "trace", target = "tstore", skip_all)
601 )]
602 #[allow(clippy::unused_async)]
603 async fn post_init_async(&self) -> EyreResult<()> {
604 let tick_subscription = impl_subscribe_event_bus_async!(self, Self, tick_event_handler);
606
607 let mut inner = self.inner.lock();
608
609 inner.tick_subscription = Some(tick_subscription);
611
612 Ok(())
613 }
614
615 #[cfg_attr(
616 feature = "instrument",
617 instrument(level = "trace", target = "tstore", skip_all)
618 )]
619 async fn pre_terminate_async(&self) {
620 {
622 let mut inner = self.inner.lock();
623 if let Some(sub) = inner.tick_subscription.take() {
624 self.event_bus().unsubscribe(sub);
625 }
626 }
627
628 self.cancel_tasks().await;
630 }
631
632 #[cfg_attr(
633 feature = "instrument",
634 instrument(level = "trace", target = "tstore", skip_all)
635 )]
636 async fn terminate_async(&self) {
637 let _async_guard = self.async_lock.lock().await;
638
639 self.flush().await;
640
641 let mut inner = self.inner.lock();
642 inner.opened.shrink_to_fit();
643 if !inner.opened.is_empty() {
644 veilid_log!(self warn
645 "all open databases should have been closed: {:?}",
646 inner.opened
647 );
648 inner.opened.clear();
649 }
650 inner.all_tables_db = None;
651 inner.all_table_names.clear();
652 inner.encryption_key = None;
653 }
654
655 #[cfg_attr(
658 feature = "instrument",
659 instrument(level = "trace", target = "tstore", skip_all)
660 )]
661 pub async fn open(&self, name: &str, column_count: u32) -> VeilidAPIResult<TableDB> {
662 self.open_pooled(name, column_count, 1).await
663 }
664
665 #[cfg_attr(
666 feature = "instrument",
667 instrument(level = "trace", target = "tstore", skip_all)
668 )]
669 pub async fn open_pooled(
670 &self,
671 name: &str,
672 column_count: u32,
673 concurrency: usize,
674 ) -> VeilidAPIResult<TableDB> {
675 let _async_guard = self.async_lock.lock().await;
676
677 {
679 let inner = self.inner.lock();
680 if inner.all_tables_db.is_none() {
681 apibail_not_initialized!();
682 }
683 }
684
685 let table_name = self.name_get_or_create(name)?;
686
687 {
689 let inner = self.inner.lock();
690 if let Some(table_db_unlocked_inner) = inner.opened.get(&table_name) {
691 let tdb = TableDB::new_from_unlocked_inner(table_db_unlocked_inner, column_count);
692
693 let existing_col_count = tdb.get_column_count()?;
695 if column_count > existing_col_count {
696 return Err(VeilidAPIError::generic(format!(
697 "database must be closed before increasing column count {} -> {}",
698 existing_col_count, column_count,
699 )));
700 }
701
702 return Ok(tdb);
703 }
704 }
705
706 let mut db = match self
708 .table_store_driver
709 .open(&table_name, column_count, concurrency)
710 .await
711 {
712 Ok(db) => db,
713 Err(e) => {
714 self.name_delete(name).expect_or_log("removing name failed");
715 self.flush().await;
716 return Err(e);
717 }
718 };
719
720 self.flush().await;
722
723 let existing_col_count = db.num_columns().map_err(VeilidAPIError::from)?;
725 if existing_col_count > column_count {
726 drop(db);
727 db = match self
728 .table_store_driver
729 .open(&table_name, existing_col_count, concurrency)
730 .await
731 {
732 Ok(db) => db,
733 Err(e) => {
734 self.name_delete(name).expect_or_log("removing name failed");
735 self.flush().await;
736 return Err(e);
737 }
738 };
739 }
740
741 let mut inner = self.inner.lock();
743 let table_db = TableDB::new(
744 table_name.clone(),
745 self.registry(),
746 db,
747 inner.encryption_key.clone(),
748 inner.encryption_key.clone(),
749 column_count,
750 );
751
752 inner
754 .opened
755 .insert(table_name.clone(), table_db.unlocked_inner());
756
757 Ok(table_db)
758 }
759
760 #[cfg_attr(
762 feature = "instrument",
763 instrument(level = "trace", target = "tstore", skip_all)
764 )]
765 pub async fn delete(&self, name: &str) -> VeilidAPIResult<bool> {
766 let _async_guard = self.async_lock.lock().await;
767 {
769 let inner = self.inner.lock();
770 if inner.all_tables_db.is_none() {
771 apibail_not_initialized!();
772 }
773 }
774
775 let Some(table_name) = self.name_get(name)? else {
776 return Ok(false);
778 };
779
780 {
782 let inner = self.inner.lock();
783 if inner.opened.contains_key(&table_name) {
784 apibail_generic!("Not deleting table that is still opened");
785 }
786 }
787
788 let deleted = self.table_store_driver.delete(&table_name).await?;
790 if !deleted {
791 veilid_log!(self warn
793 "table existed in name table but not in storage: {} : {}",
794 name, table_name
795 );
796 }
797 self.name_delete(name)
798 .expect_or_log("failed to delete name");
799 self.flush().await;
800
801 Ok(true)
802 }
803
804 #[cfg_attr(
806 feature = "instrument",
807 instrument(level = "trace", target = "tstore", skip_all)
808 )]
809 pub async fn info(&self, name: &str) -> VeilidAPIResult<Option<TableInfo>> {
810 let tdb = self.open(name, 0).await?;
812 let internal_name = tdb.table_name();
813 let io_stats_since_previous = tdb.io_stats(IoStatsKind::SincePrevious);
814 let io_stats_overall = tdb.io_stats(IoStatsKind::Overall);
815 let column_count = tdb.get_column_count()?;
816 let mut columns = Vec::<ColumnInfo>::with_capacity(column_count as usize);
817 for col in 0..column_count {
818 let key_count = tdb.get_key_count(col).await?;
819 columns.push(ColumnInfo {
820 key_count: AlignedU64::new(key_count),
821 })
822 }
823 Ok(Some(TableInfo {
824 table_name: internal_name,
825 io_stats_since_previous: IOStatsInfo {
826 transactions: AlignedU64::new(io_stats_since_previous.transactions),
827 reads: AlignedU64::new(io_stats_since_previous.reads),
828 cache_reads: AlignedU64::new(io_stats_since_previous.cache_reads),
829 writes: AlignedU64::new(io_stats_since_previous.writes),
830 bytes_read: ByteCount::new(io_stats_since_previous.bytes_read),
831 cache_read_bytes: ByteCount::new(io_stats_since_previous.cache_read_bytes),
832 bytes_written: ByteCount::new(io_stats_since_previous.bytes_written),
833 started: Timestamp::new(
834 io_stats_since_previous
835 .started
836 .duration_since(std::time::SystemTime::UNIX_EPOCH)
837 .unwrap_or_default()
838 .as_micros() as u64,
839 ),
840 span: TimestampDuration::new(io_stats_since_previous.span.as_micros() as u64),
841 },
842 io_stats_overall: IOStatsInfo {
843 transactions: AlignedU64::new(io_stats_overall.transactions),
844 reads: AlignedU64::new(io_stats_overall.reads),
845 cache_reads: AlignedU64::new(io_stats_overall.cache_reads),
846 writes: AlignedU64::new(io_stats_overall.writes),
847 bytes_read: ByteCount::new(io_stats_overall.bytes_read),
848 cache_read_bytes: ByteCount::new(io_stats_overall.cache_read_bytes),
849 bytes_written: ByteCount::new(io_stats_overall.bytes_written),
850 started: Timestamp::new(
851 io_stats_overall
852 .started
853 .duration_since(std::time::SystemTime::UNIX_EPOCH)
854 .unwrap_or_default()
855 .as_micros() as u64,
856 ),
857 span: TimestampDuration::new(io_stats_overall.span.as_micros() as u64),
858 },
859 column_count,
860 columns,
861 }))
862 }
863
864 #[cfg_attr(
866 feature = "instrument",
867 instrument(level = "trace", target = "tstore", skip_all)
868 )]
869 pub async fn rename(&self, old_name: &str, new_name: &str) -> VeilidAPIResult<()> {
870 let _async_guard = self.async_lock.lock().await;
871 {
873 let inner = self.inner.lock();
874 if inner.all_tables_db.is_none() {
875 apibail_not_initialized!();
876 }
877 }
878 veilid_log!(self debug "TableStore::rename {} -> {}", old_name, new_name);
879 self.name_rename(old_name, new_name)?;
880 self.flush().await;
881 Ok(())
882 }
883
884 async fn tick_event_handler(&self, evt: Arc<TickEvent>) {
885 let lag = evt.last_tick_ts.map(|x| evt.cur_tick_ts.duration_since(x));
886 if let Err(e) = self.tick(lag).await {
887 error!("Error in table store tick: {}", e);
888 }
889 }
890}