yeti_types/backend/manager.rs
1//! Per-table backend registry: `TableInfo`, `BackendManager`, and the
2//! `unix_timestamp` / `now_secs` helpers.
3//!
4//! # Errors
5//!
6//! Result-returning methods on `BackendManager` return
7//! `Result<T, YetiError>` and surface these failure classes:
8//!
9//! - **Table not registered** — `get_backend_for_table` /
10//! `get_table_info` return `YetiError::Internal` when the named
11//! table has no entry in the registry. Indicates an app-config or
12//! schema mismatch.
13//! - **Backend init / open** — `RocksDB` column-family open failure
14//! (corrupt LSM, permission denied, locked DB). Returned as
15//! `YetiError::Internal`.
16//! - **Metadata I/O** — encode/decode of stored `TableInfo` records
17//! (under `__yeti_table_backend:` prefix). `YetiError::Internal`
18//! from msgpack or `RocksDB`.
19//! - **Clock skew** — `unix_timestamp` returns
20//! `YetiError::Internal("system clock before UNIX epoch")` if the
21//! host clock is set before 1970-01-01.
22#![allow(clippy::missing_errors_doc)]
23
24use lru::LruCache;
25use std::collections::HashMap;
26use std::num::NonZeroUsize;
27use std::sync::{Arc, RwLock};
28
29use super::config::BackendType;
30use super::config::ConsistencyMode;
31use super::traits::KvBackend;
32use crate::error::{Result, ResultExt, YetiError};
33
34/// Metadata key prefix for storing backend information in the database.
35const METADATA_PREFIX: &str = "__yeti_table_backend:";
36
37/// Default LRU cache capacity for table backend mappings.
38const DEFAULT_CACHE_CAPACITY: usize = 10_000;
39
40/// Compile-time `NonZeroUsize::new(1)`. Constructed via const-match so the
41/// non-zero invariant is proven at compile time — no runtime `expect`.
42const ONE_NONZERO: NonZeroUsize = match NonZeroUsize::new(1) {
43 Some(n) => n,
44 // SAFETY: the constant `1` is non-zero. Unreachable.
45 None => unreachable!(),
46};
47
48/// Compile-time `NonZeroUsize::new(DEFAULT_CACHE_CAPACITY)`.
49const DEFAULT_CACHE_CAPACITY_NONZERO: NonZeroUsize = match NonZeroUsize::new(DEFAULT_CACHE_CAPACITY)
50{
51 Some(n) => n,
52 // SAFETY: `DEFAULT_CACHE_CAPACITY` is the literal `10_000`. Unreachable.
53 None => unreachable!(),
54};
55
56// ============================================================================
57// TableInfo
58// ============================================================================
59
60/// Table information for backend initialization.
61#[derive(Debug, Clone)]
62pub struct TableInfo {
63 /// Owning application id. Stamped onto every transaction-log
64 /// entry's `table.app` field so audit / replication observers
65 /// can attribute writes to the right app even when multiple apps
66 /// share a database (`@table(database: "...")` declares cross-
67 /// app sharing). Empty string is the default for test fixtures
68 /// that don't care about app attribution; in production every
69 /// `TableInfo` is built from an `AppMetadata` with a populated
70 /// `id`.
71 pub app_id: String,
72 /// Table name (used as column family name)
73 pub name: String,
74 /// Database name (default: "data")
75 pub database: String,
76 /// Storage backend type
77 pub storage: BackendType,
78 /// Per-table consistency mode override
79 pub consistency: Option<ConsistencyMode>,
80 /// Per-field CRDT type declarations
81 pub crdt_fields: HashMap<String, String>,
82 /// Residency mode
83 pub residency: Option<String>,
84 /// Per-record TTL expiration from `@table(expiration: N)` — `Some(secs)`
85 /// activates the 9-byte record header + `RocksDB` `ExpirationFilter`.
86 /// `None` means the column family is written without a header.
87 pub expiration_secs: Option<u64>,
88 /// True when the schema declared `@audit(capture_state: true)`
89 /// on this table. The `LoggingBackend` wrapper uses this flag to
90 /// decide whether to read the existing value before each write
91 /// and stamp it onto `LogEntry::prev_value` for `AuditEntry.before`.
92 /// Costs one extra storage read per write — hence per-table
93 /// opt-in.
94 pub audit_capture_state: bool,
95 /// Per-table durability from `@store(durability:)`. `None` inherits the
96 /// database-level default (the WAL setting the `BackendManager` was
97 /// opened with). `Some(tier)` overrides this table's `WriteOptions`:
98 /// `Lossy` skips the WAL (throughput), `Soft`/`Strong` write through it,
99 /// `Strong` additionally fsyncs each write. `RocksDB` applies `disableWAL`
100 /// and `sync` per write, so column families in one database can differ.
101 pub durability: Option<crate::schema::DurabilityTier>,
102 /// The full source [`TableDefinition`](crate::schema::TableDefinition)
103 /// this `TableInfo` was projected from. Carried so downstream consumers
104 /// (replication's `@distribute`, the router's `@export`/transport flags)
105 /// read one source of truth instead of side-channel maps or re-parsing
106 /// the schema. `None` for hand-built test fixtures, which only exercise
107 /// the flattened backend fields.
108 pub def: Option<std::sync::Arc<crate::schema::TableDefinition>>,
109}
110
111impl Default for TableInfo {
112 fn default() -> Self {
113 Self {
114 app_id: String::new(),
115 name: String::new(),
116 database: "data".to_owned(),
117 storage: BackendType::Disk,
118 consistency: None,
119 crdt_fields: HashMap::new(),
120 residency: None,
121 expiration_secs: None,
122 audit_capture_state: false,
123 durability: None,
124 def: None,
125 }
126 }
127}
128
129// ============================================================================
130// BackendManager
131// ============================================================================
132
133/// Manages storage backends with per-table backend selection.
134///
135/// Each table gets its own backend instance. Multiple applications safely share
136/// databases. Uses LRU cache for bounded memory on backend type validation.
137pub struct BackendManager {
138 /// Map of table name to backend instance
139 table_backends: HashMap<String, Arc<dyn KvBackend>>,
140 /// Pre-built lowercase → canonical name index for case-insensitive lookup.
141 /// Populated at construction time so `get_backend_for_table()` avoids
142 /// linear scans with per-call `to_lowercase()` allocations.
143 lowercase_index: HashMap<String, String>,
144 /// LRU cache tracking which backend type each table uses
145 backend_type_cache: RwLock<LruCache<String, BackendType>>,
146 /// Metadata key prefix
147 metadata_prefix: String,
148 /// `PubSub` manager for real-time table change notifications.
149 /// Set lazily after construction via `set_pubsub()` — avoids cloning
150 /// the `table_backends` `HashMap` just to attach `PubSub`.
151 pubsub: std::sync::OnceLock<Arc<crate::pubsub::PubSubManager>>,
152 /// Per-table source `TableDefinition`s (by table name). Set once at
153 /// startup; the single source of truth for table metadata that the
154 /// flattened backend fields don't carry (`@distribute`, `@export`, …).
155 table_defs: std::sync::OnceLock<
156 Arc<std::collections::HashMap<String, Arc<crate::schema::TableDefinition>>>,
157 >,
158 /// Per-app vector capabilities (hook, batcher, mappings, cache).
159 /// Set once at startup after vector hooks are configured.
160 vector: std::sync::OnceLock<Arc<crate::plugins::VectorContext>>,
161 /// Per-table secondary-index registry. Populated at app load with one
162 /// `Arc<IndexManager>` per table that has any `@indexed` field. Stored
163 /// type-erased here because `IndexManager` lives in `yeti-sdk`
164 /// (which depends on `yeti-types`); the SDK provides the typed accessor
165 /// that downcasts this slot. SDK-side `Table::put_if` consults the
166 /// registry to keep hash/range/fulltext indexes in sync with CAS writes
167 /// — without this, the `@indexed` directive is a no-op for SDK writes
168 /// and worker pools that scan by status pay a full-table scan.
169 index_registry: std::sync::OnceLock<Arc<dyn std::any::Any + Send + Sync>>,
170 /// Unified transaction log — set at startup by yeti-server when
171 /// the deployment opens its transaction-log backend at
172 /// `{root_directory}/logs/transactions/{deployment_hash}/`.
173 /// Observed by both audit (range-by-time queries) and replication
174 /// (tail-by-HLC). `None` when logging is disabled (test fixtures,
175 /// dry-run modes).
176 transaction_log: std::sync::OnceLock<crate::backend::TransactionLogHandle>,
177 /// Shared [`ObserverRegistry`] — every per-table `LoggingBackend`
178 /// holds a clone of the same `Arc`, so a single
179 /// [`register_observer`](Self::register_observer) call reaches
180 /// every commit path. `None` when observer fan-out is disabled.
181 ///
182 /// [`ObserverRegistry`]: crate::backend::ObserverRegistry
183 observer_registry: std::sync::OnceLock<Arc<crate::backend::ObserverRegistry>>,
184}
185
186impl std::fmt::Debug for BackendManager {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 f.debug_struct("BackendManager")
189 .field(
190 "table_backends",
191 &self.table_backends.keys().collect::<Vec<_>>(),
192 )
193 .field("metadata_prefix", &self.metadata_prefix)
194 .field("has_pubsub", &self.pubsub.get().is_some())
195 .field("has_vector", &self.vector.get().is_some())
196 .field("has_transaction_log", &self.transaction_log.get().is_some())
197 .field(
198 "has_observer_registry",
199 &self.observer_registry.get().is_some(),
200 )
201 // `lowercase_index`, `backend_type_cache`, dyn-trait values elided
202 .finish_non_exhaustive()
203 }
204}
205
206impl BackendManager {
207 /// Create an empty `BackendManager` for testing.
208 #[must_use]
209 pub fn empty() -> Self {
210 Self {
211 table_backends: HashMap::new(),
212 lowercase_index: HashMap::new(),
213 backend_type_cache: RwLock::new(LruCache::new(ONE_NONZERO)),
214 metadata_prefix: METADATA_PREFIX.to_owned(),
215 pubsub: std::sync::OnceLock::new(),
216 vector: std::sync::OnceLock::new(),
217 index_registry: std::sync::OnceLock::new(),
218 transaction_log: std::sync::OnceLock::new(),
219 observer_registry: std::sync::OnceLock::new(),
220 table_defs: std::sync::OnceLock::new(),
221 }
222 }
223
224 /// Create from pre-built backends.
225 pub fn from_backends(table_backends: HashMap<String, Arc<dyn KvBackend>>) -> Result<Self> {
226 let cache_capacity = NonZeroUsize::new(DEFAULT_CACHE_CAPACITY).ok_or_else(|| {
227 YetiError::Internal("DEFAULT_CACHE_CAPACITY must be non-zero".to_owned())
228 })?;
229
230 let lowercase_index: HashMap<String, String> = table_backends
231 .keys()
232 .map(|k| (k.to_lowercase(), k.clone()))
233 .collect();
234
235 Ok(Self {
236 table_backends,
237 lowercase_index,
238 backend_type_cache: RwLock::new(LruCache::new(cache_capacity)),
239 metadata_prefix: METADATA_PREFIX.to_owned(),
240 pubsub: std::sync::OnceLock::new(),
241 vector: std::sync::OnceLock::new(),
242 index_registry: std::sync::OnceLock::new(),
243 transaction_log: std::sync::OnceLock::new(),
244 observer_registry: std::sync::OnceLock::new(),
245 table_defs: std::sync::OnceLock::new(),
246 })
247 }
248
249 /// Set the unified transaction log. Can only be called once
250 /// (uses `OnceLock`). Called at startup once the deployment's
251 /// transaction-log backend
252 /// (`{root_directory}/logs/transactions/{deployment_hash}/`)
253 /// has been opened and a [`TransactionLog`] impl built on top.
254 ///
255 /// [`TransactionLog`]: crate::backend::TransactionLog
256 pub fn set_transaction_log(&self, log: crate::backend::TransactionLogHandle) {
257 let _ = self.transaction_log.set(log);
258 }
259
260 /// Get the unified transaction log, if set. Audit readers,
261 /// replication senders, and any future log consumer use this to
262 /// reach the durable record of every committed write.
263 #[must_use]
264 pub fn transaction_log(&self) -> Option<&crate::backend::TransactionLogHandle> {
265 self.transaction_log.get()
266 }
267
268 /// Set the shared [`ObserverRegistry`]. Can only be called once
269 /// (uses `OnceLock`). Called at startup once the same `Arc` has
270 /// been threaded into every per-table `LoggingBackend` via
271 /// `LogWrapping.observers`, so this is the manager-side handle
272 /// to the same instance.
273 ///
274 /// [`ObserverRegistry`]: crate::backend::ObserverRegistry
275 pub fn set_observer_registry(&self, registry: Arc<crate::backend::ObserverRegistry>) {
276 let _ = self.observer_registry.set(registry);
277 }
278
279 /// Get the shared observer registry, if set. Plugins (replication,
280 /// audit, etc.) register their [`TransactionObserver`] here during
281 /// startup; every per-table `LoggingBackend` reads the same
282 /// snapshot on its commit path.
283 ///
284 /// [`TransactionObserver`]: crate::backend::TransactionObserver
285 #[must_use]
286 pub fn observer_registry(&self) -> Option<&Arc<crate::backend::ObserverRegistry>> {
287 self.observer_registry.get()
288 }
289
290 /// Convenience: register an observer on the shared registry. No-op
291 /// when no registry has been attached.
292 pub fn register_observer(&self, observer: crate::backend::ObserverHandle) {
293 if let Some(registry) = self.observer_registry.get() {
294 registry.register(observer);
295 }
296 }
297
298 /// Set the `PubSub` manager. Can only be called once (uses `OnceLock`).
299 /// Called at startup after `BackendManager` is created and wrapped in Arc.
300 pub fn set_pubsub(&self, pubsub: Arc<crate::pubsub::PubSubManager>) {
301 let _ = self.pubsub.set(pubsub);
302 }
303
304 /// Get the `PubSub` manager, if set.
305 pub fn pubsub(&self) -> Option<&Arc<crate::pubsub::PubSubManager>> {
306 self.pubsub.get()
307 }
308
309 /// Attach the per-table source [`TableDefinition`](crate::schema::TableDefinition)s
310 /// (keyed by table name). Set once at construction so downstream
311 /// consumers — replication's `@distribute` filter, the router's
312 /// `@export` flags — read this single source of truth rather than a
313 /// side-channel map. Can only be set once (uses `OnceLock`).
314 pub fn set_table_defs(
315 &self,
316 defs: std::collections::HashMap<String, Arc<crate::schema::TableDefinition>>,
317 ) {
318 let _ = self.table_defs.set(Arc::new(defs));
319 }
320
321 /// The source `TableDefinition` for `table_name`, if known. `None` when
322 /// defs weren't attached (test fixtures) or the table is unknown.
323 #[must_use]
324 pub fn table_def(&self, table_name: &str) -> Option<Arc<crate::schema::TableDefinition>> {
325 self.table_defs.get()?.get(table_name).map(Arc::clone)
326 }
327
328 /// Set the per-app vector context. Can only be called once (uses `OnceLock`).
329 /// Called at startup after vector hooks and batcher are configured.
330 pub fn set_vector(&self, vector: Arc<crate::plugins::VectorContext>) {
331 let _ = self.vector.set(vector);
332 }
333
334 /// Get the per-app vector context, if set.
335 pub fn vector(&self) -> Option<&Arc<crate::plugins::VectorContext>> {
336 self.vector.get()
337 }
338
339 /// Create a new manager with this manager's tables plus additional tables.
340 #[must_use]
341 pub fn with_merged_tables(&self, other: &Self) -> Self {
342 let mut table_backends = self.table_backends.clone();
343 for (name, backend) in &other.table_backends {
344 table_backends
345 .entry(name.clone())
346 .or_insert_with(|| Arc::clone(backend));
347 }
348 let lowercase_index: HashMap<String, String> = table_backends
349 .keys()
350 .map(|k| (k.to_lowercase(), k.clone()))
351 .collect();
352 Self {
353 table_backends,
354 lowercase_index,
355 backend_type_cache: RwLock::new(LruCache::new(DEFAULT_CACHE_CAPACITY_NONZERO)),
356 metadata_prefix: self.metadata_prefix.clone(),
357 pubsub: self
358 .pubsub
359 .get()
360 .cloned()
361 .map_or_else(std::sync::OnceLock::new, |ps| {
362 let lock = std::sync::OnceLock::new();
363 let _ = lock.set(ps);
364 lock
365 }),
366 vector: self
367 .vector
368 .get()
369 .cloned()
370 .map_or_else(std::sync::OnceLock::new, |vc| {
371 let lock = std::sync::OnceLock::new();
372 let _ = lock.set(vc);
373 lock
374 }),
375 index_registry: self.index_registry.get().cloned().map_or_else(
376 std::sync::OnceLock::new,
377 |r| {
378 let lock = std::sync::OnceLock::new();
379 let _ = lock.set(r);
380 lock
381 },
382 ),
383 transaction_log: self.transaction_log.get().cloned().map_or_else(
384 std::sync::OnceLock::new,
385 |l| {
386 let lock = std::sync::OnceLock::new();
387 let _ = lock.set(l);
388 lock
389 },
390 ),
391 observer_registry: self.observer_registry.get().cloned().map_or_else(
392 std::sync::OnceLock::new,
393 |r| {
394 let lock = std::sync::OnceLock::new();
395 let _ = lock.set(r);
396 lock
397 },
398 ),
399 table_defs: self
400 .table_defs
401 .get()
402 .cloned()
403 .map_or_else(std::sync::OnceLock::new, |d| {
404 let lock = std::sync::OnceLock::new();
405 let _ = lock.set(d);
406 lock
407 }),
408 }
409 }
410
411 /// Merge another manager's tables with database-qualified keys.
412 #[must_use]
413 pub fn with_qualified_merge(&self, other: &Self, database: &str) -> Self {
414 let mut table_backends = self.table_backends.clone();
415 for (name, backend) in &other.table_backends {
416 let qualified = format!("{database}.{name}");
417 table_backends
418 .entry(qualified)
419 .or_insert_with(|| Arc::clone(backend));
420 table_backends
421 .entry(name.clone())
422 .or_insert_with(|| Arc::clone(backend));
423 }
424 let lowercase_index: HashMap<String, String> = table_backends
425 .keys()
426 .map(|k| (k.to_lowercase(), k.clone()))
427 .collect();
428 Self {
429 table_backends,
430 lowercase_index,
431 backend_type_cache: RwLock::new(LruCache::new(DEFAULT_CACHE_CAPACITY_NONZERO)),
432 metadata_prefix: self.metadata_prefix.clone(),
433 pubsub: self
434 .pubsub
435 .get()
436 .cloned()
437 .map_or_else(std::sync::OnceLock::new, |ps| {
438 let lock = std::sync::OnceLock::new();
439 let _ = lock.set(ps);
440 lock
441 }),
442 vector: self
443 .vector
444 .get()
445 .cloned()
446 .map_or_else(std::sync::OnceLock::new, |vc| {
447 let lock = std::sync::OnceLock::new();
448 let _ = lock.set(vc);
449 lock
450 }),
451 index_registry: self.index_registry.get().cloned().map_or_else(
452 std::sync::OnceLock::new,
453 |r| {
454 let lock = std::sync::OnceLock::new();
455 let _ = lock.set(r);
456 lock
457 },
458 ),
459 transaction_log: self.transaction_log.get().cloned().map_or_else(
460 std::sync::OnceLock::new,
461 |l| {
462 let lock = std::sync::OnceLock::new();
463 let _ = lock.set(l);
464 lock
465 },
466 ),
467 observer_registry: self.observer_registry.get().cloned().map_or_else(
468 std::sync::OnceLock::new,
469 |r| {
470 let lock = std::sync::OnceLock::new();
471 let _ = lock.set(r);
472 lock
473 },
474 ),
475 table_defs: self
476 .table_defs
477 .get()
478 .cloned()
479 .map_or_else(std::sync::OnceLock::new, |d| {
480 let lock = std::sync::OnceLock::new();
481 let _ = lock.set(d);
482 lock
483 }),
484 }
485 }
486
487 /// Set the per-app secondary-index registry. Type-erased so this crate
488 /// stays free of yeti-sdk imports — yeti-sdk's `Tables::get` does the
489 /// downcast. Can only be called once (uses `OnceLock`). Called at
490 /// startup once `IndexManager`s are built from the loaded schema.
491 pub fn set_index_registry(&self, registry: Arc<dyn std::any::Any + Send + Sync>) {
492 let _ = self.index_registry.set(registry);
493 }
494
495 /// Get the per-app secondary-index registry, if set. Callers (yeti-sdk)
496 /// downcast to their concrete `HashMap<String, Arc<IndexManager>>`.
497 pub fn index_registry(&self) -> Option<&Arc<dyn std::any::Any + Send + Sync>> {
498 self.index_registry.get()
499 }
500
501 /// List all table names managed by this `BackendManager`.
502 pub fn table_names(&self) -> Vec<String> {
503 self.table_backends.keys().cloned().collect()
504 }
505
506 /// Get the storage backend for a specific table by name.
507 pub fn get_backend_for_table(&self, table_name: &str) -> Result<Arc<dyn KvBackend>> {
508 // Exact match first (no allocation)
509 if let Some(backend) = self.table_backends.get(table_name) {
510 return Ok(Arc::clone(backend));
511 }
512 // Case-insensitive via pre-built index (single to_lowercase, no linear scan)
513 let lower = table_name.to_lowercase();
514 if let Some(canonical) = self.lowercase_index.get(&lower)
515 && let Some(backend) = self.table_backends.get(canonical)
516 {
517 return Ok(Arc::clone(backend));
518 }
519 Err(YetiError::NotFound {
520 resource_type: "backend".to_owned(),
521 id: table_name.to_owned(),
522 })
523 }
524
525 /// Probe storage health by performing lightweight reads across all backends.
526 pub async fn health_probe(&self) -> Result<()> {
527 if self.table_backends.is_empty() {
528 return Err(YetiError::Internal(
529 "No table backends registered".to_owned(),
530 ));
531 }
532 for backend in self.table_backends.values() {
533 backend.get(b"__health_probe__").await?;
534 }
535 Ok(())
536 }
537
538 /// Validate that a table is using the correct backend.
539 pub async fn validate_table_backend(
540 &self,
541 table_name: &str,
542 key_prefix: &str,
543 expected_backend: BackendType,
544 ) -> Result<()> {
545 let metadata_key = format!("{}{}:{}", self.metadata_prefix, key_prefix, table_name);
546
547 {
548 let mut cache = self
549 .backend_type_cache
550 .write()
551 .with_context(|| "table backends cache lock".to_owned())?;
552 if let Some(&existing_backend) = cache.get(table_name) {
553 if existing_backend != expected_backend {
554 return Err(YetiError::Validation(format!(
555 "Table '{table_name}' already exists with {existing_backend} backend but schema specifies {expected_backend} backend. \
556 Storage backend migration is not yet supported."
557 )));
558 }
559 return Ok(());
560 }
561 }
562
563 let backend = self.get_backend_for_table(table_name)?;
564
565 if let Some(stored_value) = backend.get(metadata_key.as_bytes()).await? {
566 let stored_backend_str = String::from_utf8_lossy(&stored_value);
567 let stored_backend: BackendType = stored_backend_str.parse()?;
568
569 if stored_backend != expected_backend {
570 return Err(YetiError::Validation(format!(
571 "Table '{table_name}' has stored backend metadata indicating {stored_backend}, \
572 but schema specifies {expected_backend}. Storage backend migration is not yet supported."
573 )));
574 }
575
576 self.backend_type_cache
577 .write()
578 .with_context(|| "table backends cache lock".to_owned())?
579 .put(table_name.to_owned(), stored_backend);
580 return Ok(());
581 }
582
583 backend
584 .put(
585 metadata_key.as_bytes(),
586 expected_backend.to_string().as_bytes(),
587 )
588 .await?;
589
590 self.backend_type_cache
591 .write()
592 .with_context(|| "table backends cache lock".to_owned())?
593 .put(table_name.to_owned(), expected_backend);
594
595 tracing::debug!(
596 " Initialized new table '{}' with {} backend",
597 table_name,
598 expected_backend
599 );
600
601 Ok(())
602 }
603}
604
605/// Get current Unix timestamp in seconds.
606pub fn unix_timestamp() -> Result<u64> {
607 std::time::SystemTime::now()
608 .duration_since(std::time::UNIX_EPOCH)
609 .map(|d| d.as_secs())
610 .map_err(|_| {
611 YetiError::Internal("System time is before UNIX epoch - check system clock".to_owned())
612 })
613}
614
615/// Get current Unix timestamp in seconds with safe fallback.
616#[inline]
617#[must_use]
618pub fn now_secs() -> u64 {
619 std::time::SystemTime::now()
620 .duration_since(std::time::UNIX_EPOCH)
621 .map_or(0, |d| d.as_secs())
622}