Skip to main content

yeti_types/backend/
manager.rs

1//! Per-table backend registry: `TableInfo`, `BackendManager`, and the
2//! `unix_timestamp` / `now_secs` helpers.
3//!
4//! # Errors
5//!
6//! Result-returning methods on `BackendManager` return
7//! `Result<T, YetiError>` and surface these failure classes:
8//!
9//! - **Table not registered** — `get_backend_for_table` /
10//!   `get_table_info` return `YetiError::Internal` when the named
11//!   table has no entry in the registry. Indicates an app-config or
12//!   schema mismatch.
13//! - **Backend init / open** — `RocksDB` column-family open failure
14//!   (corrupt LSM, permission denied, locked DB). Returned as
15//!   `YetiError::Internal`.
16//! - **Metadata I/O** — encode/decode of stored `TableInfo` records
17//!   (under `__yeti_table_backend:` prefix). `YetiError::Internal`
18//!   from msgpack or `RocksDB`.
19//! - **Clock skew** — `unix_timestamp` returns
20//!   `YetiError::Internal("system clock before UNIX epoch")` if the
21//!   host clock is set before 1970-01-01.
22#![allow(clippy::missing_errors_doc)]
23
24use lru::LruCache;
25use std::collections::HashMap;
26use std::num::NonZeroUsize;
27use std::sync::{Arc, RwLock};
28
29use super::config::BackendType;
30use super::config::ConsistencyMode;
31use super::traits::KvBackend;
32use crate::error::{Result, ResultExt, YetiError};
33
34/// Metadata key prefix for storing backend information in the database.
35const METADATA_PREFIX: &str = "__yeti_table_backend:";
36
37/// Default LRU cache capacity for table backend mappings.
38const DEFAULT_CACHE_CAPACITY: usize = 10_000;
39
40/// Compile-time `NonZeroUsize::new(1)`. Constructed via const-match so the
41/// non-zero invariant is proven at compile time — no runtime `expect`.
42const ONE_NONZERO: NonZeroUsize = match NonZeroUsize::new(1) {
43    Some(n) => n,
44    // SAFETY: the constant `1` is non-zero. Unreachable.
45    None => unreachable!(),
46};
47
48/// Compile-time `NonZeroUsize::new(DEFAULT_CACHE_CAPACITY)`.
49const DEFAULT_CACHE_CAPACITY_NONZERO: NonZeroUsize = match NonZeroUsize::new(DEFAULT_CACHE_CAPACITY)
50{
51    Some(n) => n,
52    // SAFETY: `DEFAULT_CACHE_CAPACITY` is the literal `10_000`. Unreachable.
53    None => unreachable!(),
54};
55
56// ============================================================================
57// TableInfo
58// ============================================================================
59
60/// Table information for backend initialization.
61#[derive(Debug, Clone)]
62pub struct TableInfo {
63    /// Owning application id. Stamped onto every transaction-log
64    /// entry's `table.app` field so audit / replication observers
65    /// can attribute writes to the right app even when multiple apps
66    /// share a database (`@table(database: "...")` declares cross-
67    /// app sharing). Empty string is the default for test fixtures
68    /// that don't care about app attribution; in production every
69    /// `TableInfo` is built from an `AppMetadata` with a populated
70    /// `id`.
71    pub app_id: String,
72    /// Table name (used as column family name)
73    pub name: String,
74    /// Database name (default: "data")
75    pub database: String,
76    /// Storage backend type
77    pub storage: BackendType,
78    /// Per-table consistency mode override
79    pub consistency: Option<ConsistencyMode>,
80    /// Per-field CRDT type declarations
81    pub crdt_fields: HashMap<String, String>,
82    /// Residency mode
83    pub residency: Option<String>,
84    /// Per-record TTL expiration from `@table(expiration: N)` — `Some(secs)`
85    /// activates the 9-byte record header + `RocksDB` `ExpirationFilter`.
86    /// `None` means the column family is written without a header.
87    pub expiration_secs: Option<u64>,
88    /// True when the schema declared `@audit(capture_state: true)`
89    /// on this table. The `LoggingBackend` wrapper uses this flag to
90    /// decide whether to read the existing value before each write
91    /// and stamp it onto `LogEntry::prev_value` for `AuditEntry.before`.
92    /// Costs one extra storage read per write — hence per-table
93    /// opt-in.
94    pub audit_capture_state: bool,
95    /// Per-table durability from `@store(durability:)`. `None` inherits the
96    /// database-level default (the WAL setting the `BackendManager` was
97    /// opened with). `Some(tier)` overrides this table's `WriteOptions`:
98    /// `Lossy` skips the WAL (throughput), `Soft`/`Strong` write through it,
99    /// `Strong` additionally fsyncs each write. `RocksDB` applies `disableWAL`
100    /// and `sync` per write, so column families in one database can differ.
101    pub durability: Option<crate::schema::DurabilityTier>,
102    /// The full source [`TableDefinition`](crate::schema::TableDefinition)
103    /// this `TableInfo` was projected from. Carried so downstream consumers
104    /// (replication's `@distribute`, the router's `@export`/transport flags)
105    /// read one source of truth instead of side-channel maps or re-parsing
106    /// the schema. `None` for hand-built test fixtures, which only exercise
107    /// the flattened backend fields.
108    pub def: Option<std::sync::Arc<crate::schema::TableDefinition>>,
109}
110
111impl Default for TableInfo {
112    fn default() -> Self {
113        Self {
114            app_id: String::new(),
115            name: String::new(),
116            database: "data".to_owned(),
117            storage: BackendType::Disk,
118            consistency: None,
119            crdt_fields: HashMap::new(),
120            residency: None,
121            expiration_secs: None,
122            audit_capture_state: false,
123            durability: None,
124            def: None,
125        }
126    }
127}
128
129// ============================================================================
130// BackendManager
131// ============================================================================
132
133/// Manages storage backends with per-table backend selection.
134///
135/// Each table gets its own backend instance. Multiple applications safely share
136/// databases. Uses LRU cache for bounded memory on backend type validation.
137pub struct BackendManager {
138    /// Map of table name to backend instance
139    table_backends: HashMap<String, Arc<dyn KvBackend>>,
140    /// Pre-built lowercase → canonical name index for case-insensitive lookup.
141    /// Populated at construction time so `get_backend_for_table()` avoids
142    /// linear scans with per-call `to_lowercase()` allocations.
143    lowercase_index: HashMap<String, String>,
144    /// LRU cache tracking which backend type each table uses
145    backend_type_cache: RwLock<LruCache<String, BackendType>>,
146    /// Metadata key prefix
147    metadata_prefix: String,
148    /// `PubSub` manager for real-time table change notifications.
149    /// Set lazily after construction via `set_pubsub()` — avoids cloning
150    /// the `table_backends` `HashMap` just to attach `PubSub`.
151    pubsub: std::sync::OnceLock<Arc<crate::pubsub::PubSubManager>>,
152    /// Per-table source `TableDefinition`s (by table name). Set once at
153    /// startup; the single source of truth for table metadata that the
154    /// flattened backend fields don't carry (`@distribute`, `@export`, …).
155    table_defs: std::sync::OnceLock<
156        Arc<std::collections::HashMap<String, Arc<crate::schema::TableDefinition>>>,
157    >,
158    /// Per-app vector capabilities (hook, batcher, mappings, cache).
159    /// Set once at startup after vector hooks are configured.
160    vector: std::sync::OnceLock<Arc<crate::plugins::VectorContext>>,
161    /// Per-table secondary-index registry. Populated at app load with one
162    /// `Arc<IndexManager>` per table that has any `@indexed` field. Stored
163    /// type-erased here because `IndexManager` lives in `yeti-sdk`
164    /// (which depends on `yeti-types`); the SDK provides the typed accessor
165    /// that downcasts this slot. SDK-side `Table::put_if` consults the
166    /// registry to keep hash/range/fulltext indexes in sync with CAS writes
167    /// — without this, the `@indexed` directive is a no-op for SDK writes
168    /// and worker pools that scan by status pay a full-table scan.
169    index_registry: std::sync::OnceLock<Arc<dyn std::any::Any + Send + Sync>>,
170    /// Unified transaction log — set at startup by yeti-server when
171    /// the deployment opens its transaction-log backend at
172    /// `{root_directory}/logs/transactions/{deployment_hash}/`.
173    /// Observed by both audit (range-by-time queries) and replication
174    /// (tail-by-HLC). `None` when logging is disabled (test fixtures,
175    /// dry-run modes).
176    transaction_log: std::sync::OnceLock<crate::backend::TransactionLogHandle>,
177    /// Shared [`ObserverRegistry`] — every per-table `LoggingBackend`
178    /// holds a clone of the same `Arc`, so a single
179    /// [`register_observer`](Self::register_observer) call reaches
180    /// every commit path. `None` when observer fan-out is disabled.
181    ///
182    /// [`ObserverRegistry`]: crate::backend::ObserverRegistry
183    observer_registry: std::sync::OnceLock<Arc<crate::backend::ObserverRegistry>>,
184}
185
186impl std::fmt::Debug for BackendManager {
187    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188        f.debug_struct("BackendManager")
189            .field(
190                "table_backends",
191                &self.table_backends.keys().collect::<Vec<_>>(),
192            )
193            .field("metadata_prefix", &self.metadata_prefix)
194            .field("has_pubsub", &self.pubsub.get().is_some())
195            .field("has_vector", &self.vector.get().is_some())
196            .field("has_transaction_log", &self.transaction_log.get().is_some())
197            .field(
198                "has_observer_registry",
199                &self.observer_registry.get().is_some(),
200            )
201            // `lowercase_index`, `backend_type_cache`, dyn-trait values elided
202            .finish_non_exhaustive()
203    }
204}
205
206impl BackendManager {
207    /// Create an empty `BackendManager` for testing.
208    #[must_use]
209    pub fn empty() -> Self {
210        Self {
211            table_backends: HashMap::new(),
212            lowercase_index: HashMap::new(),
213            backend_type_cache: RwLock::new(LruCache::new(ONE_NONZERO)),
214            metadata_prefix: METADATA_PREFIX.to_owned(),
215            pubsub: std::sync::OnceLock::new(),
216            vector: std::sync::OnceLock::new(),
217            index_registry: std::sync::OnceLock::new(),
218            transaction_log: std::sync::OnceLock::new(),
219            observer_registry: std::sync::OnceLock::new(),
220            table_defs: std::sync::OnceLock::new(),
221        }
222    }
223
224    /// Create from pre-built backends.
225    pub fn from_backends(table_backends: HashMap<String, Arc<dyn KvBackend>>) -> Result<Self> {
226        let cache_capacity = NonZeroUsize::new(DEFAULT_CACHE_CAPACITY).ok_or_else(|| {
227            YetiError::Internal("DEFAULT_CACHE_CAPACITY must be non-zero".to_owned())
228        })?;
229
230        let lowercase_index: HashMap<String, String> = table_backends
231            .keys()
232            .map(|k| (k.to_lowercase(), k.clone()))
233            .collect();
234
235        Ok(Self {
236            table_backends,
237            lowercase_index,
238            backend_type_cache: RwLock::new(LruCache::new(cache_capacity)),
239            metadata_prefix: METADATA_PREFIX.to_owned(),
240            pubsub: std::sync::OnceLock::new(),
241            vector: std::sync::OnceLock::new(),
242            index_registry: std::sync::OnceLock::new(),
243            transaction_log: std::sync::OnceLock::new(),
244            observer_registry: std::sync::OnceLock::new(),
245            table_defs: std::sync::OnceLock::new(),
246        })
247    }
248
249    /// Set the unified transaction log. Can only be called once
250    /// (uses `OnceLock`). Called at startup once the deployment's
251    /// transaction-log backend
252    /// (`{root_directory}/logs/transactions/{deployment_hash}/`)
253    /// has been opened and a [`TransactionLog`] impl built on top.
254    ///
255    /// [`TransactionLog`]: crate::backend::TransactionLog
256    pub fn set_transaction_log(&self, log: crate::backend::TransactionLogHandle) {
257        let _ = self.transaction_log.set(log);
258    }
259
260    /// Get the unified transaction log, if set. Audit readers,
261    /// replication senders, and any future log consumer use this to
262    /// reach the durable record of every committed write.
263    #[must_use]
264    pub fn transaction_log(&self) -> Option<&crate::backend::TransactionLogHandle> {
265        self.transaction_log.get()
266    }
267
268    /// Set the shared [`ObserverRegistry`]. Can only be called once
269    /// (uses `OnceLock`). Called at startup once the same `Arc` has
270    /// been threaded into every per-table `LoggingBackend` via
271    /// `LogWrapping.observers`, so this is the manager-side handle
272    /// to the same instance.
273    ///
274    /// [`ObserverRegistry`]: crate::backend::ObserverRegistry
275    pub fn set_observer_registry(&self, registry: Arc<crate::backend::ObserverRegistry>) {
276        let _ = self.observer_registry.set(registry);
277    }
278
279    /// Get the shared observer registry, if set. Plugins (replication,
280    /// audit, etc.) register their [`TransactionObserver`] here during
281    /// startup; every per-table `LoggingBackend` reads the same
282    /// snapshot on its commit path.
283    ///
284    /// [`TransactionObserver`]: crate::backend::TransactionObserver
285    #[must_use]
286    pub fn observer_registry(&self) -> Option<&Arc<crate::backend::ObserverRegistry>> {
287        self.observer_registry.get()
288    }
289
290    /// Convenience: register an observer on the shared registry. No-op
291    /// when no registry has been attached.
292    pub fn register_observer(&self, observer: crate::backend::ObserverHandle) {
293        if let Some(registry) = self.observer_registry.get() {
294            registry.register(observer);
295        }
296    }
297
298    /// Set the `PubSub` manager. Can only be called once (uses `OnceLock`).
299    /// Called at startup after `BackendManager` is created and wrapped in Arc.
300    pub fn set_pubsub(&self, pubsub: Arc<crate::pubsub::PubSubManager>) {
301        let _ = self.pubsub.set(pubsub);
302    }
303
304    /// Get the `PubSub` manager, if set.
305    pub fn pubsub(&self) -> Option<&Arc<crate::pubsub::PubSubManager>> {
306        self.pubsub.get()
307    }
308
309    /// Attach the per-table source [`TableDefinition`](crate::schema::TableDefinition)s
310    /// (keyed by table name). Set once at construction so downstream
311    /// consumers — replication's `@distribute` filter, the router's
312    /// `@export` flags — read this single source of truth rather than a
313    /// side-channel map. Can only be set once (uses `OnceLock`).
314    pub fn set_table_defs(
315        &self,
316        defs: std::collections::HashMap<String, Arc<crate::schema::TableDefinition>>,
317    ) {
318        let _ = self.table_defs.set(Arc::new(defs));
319    }
320
321    /// The source `TableDefinition` for `table_name`, if known. `None` when
322    /// defs weren't attached (test fixtures) or the table is unknown.
323    #[must_use]
324    pub fn table_def(&self, table_name: &str) -> Option<Arc<crate::schema::TableDefinition>> {
325        self.table_defs.get()?.get(table_name).map(Arc::clone)
326    }
327
328    /// Set the per-app vector context. Can only be called once (uses `OnceLock`).
329    /// Called at startup after vector hooks and batcher are configured.
330    pub fn set_vector(&self, vector: Arc<crate::plugins::VectorContext>) {
331        let _ = self.vector.set(vector);
332    }
333
334    /// Get the per-app vector context, if set.
335    pub fn vector(&self) -> Option<&Arc<crate::plugins::VectorContext>> {
336        self.vector.get()
337    }
338
339    /// Create a new manager with this manager's tables plus additional tables.
340    #[must_use]
341    pub fn with_merged_tables(&self, other: &Self) -> Self {
342        let mut table_backends = self.table_backends.clone();
343        for (name, backend) in &other.table_backends {
344            table_backends
345                .entry(name.clone())
346                .or_insert_with(|| Arc::clone(backend));
347        }
348        let lowercase_index: HashMap<String, String> = table_backends
349            .keys()
350            .map(|k| (k.to_lowercase(), k.clone()))
351            .collect();
352        Self {
353            table_backends,
354            lowercase_index,
355            backend_type_cache: RwLock::new(LruCache::new(DEFAULT_CACHE_CAPACITY_NONZERO)),
356            metadata_prefix: self.metadata_prefix.clone(),
357            pubsub: self
358                .pubsub
359                .get()
360                .cloned()
361                .map_or_else(std::sync::OnceLock::new, |ps| {
362                    let lock = std::sync::OnceLock::new();
363                    let _ = lock.set(ps);
364                    lock
365                }),
366            vector: self
367                .vector
368                .get()
369                .cloned()
370                .map_or_else(std::sync::OnceLock::new, |vc| {
371                    let lock = std::sync::OnceLock::new();
372                    let _ = lock.set(vc);
373                    lock
374                }),
375            index_registry: self.index_registry.get().cloned().map_or_else(
376                std::sync::OnceLock::new,
377                |r| {
378                    let lock = std::sync::OnceLock::new();
379                    let _ = lock.set(r);
380                    lock
381                },
382            ),
383            transaction_log: self.transaction_log.get().cloned().map_or_else(
384                std::sync::OnceLock::new,
385                |l| {
386                    let lock = std::sync::OnceLock::new();
387                    let _ = lock.set(l);
388                    lock
389                },
390            ),
391            observer_registry: self.observer_registry.get().cloned().map_or_else(
392                std::sync::OnceLock::new,
393                |r| {
394                    let lock = std::sync::OnceLock::new();
395                    let _ = lock.set(r);
396                    lock
397                },
398            ),
399            table_defs: self
400                .table_defs
401                .get()
402                .cloned()
403                .map_or_else(std::sync::OnceLock::new, |d| {
404                    let lock = std::sync::OnceLock::new();
405                    let _ = lock.set(d);
406                    lock
407                }),
408        }
409    }
410
411    /// Merge another manager's tables with database-qualified keys.
412    #[must_use]
413    pub fn with_qualified_merge(&self, other: &Self, database: &str) -> Self {
414        let mut table_backends = self.table_backends.clone();
415        for (name, backend) in &other.table_backends {
416            let qualified = format!("{database}.{name}");
417            table_backends
418                .entry(qualified)
419                .or_insert_with(|| Arc::clone(backend));
420            table_backends
421                .entry(name.clone())
422                .or_insert_with(|| Arc::clone(backend));
423        }
424        let lowercase_index: HashMap<String, String> = table_backends
425            .keys()
426            .map(|k| (k.to_lowercase(), k.clone()))
427            .collect();
428        Self {
429            table_backends,
430            lowercase_index,
431            backend_type_cache: RwLock::new(LruCache::new(DEFAULT_CACHE_CAPACITY_NONZERO)),
432            metadata_prefix: self.metadata_prefix.clone(),
433            pubsub: self
434                .pubsub
435                .get()
436                .cloned()
437                .map_or_else(std::sync::OnceLock::new, |ps| {
438                    let lock = std::sync::OnceLock::new();
439                    let _ = lock.set(ps);
440                    lock
441                }),
442            vector: self
443                .vector
444                .get()
445                .cloned()
446                .map_or_else(std::sync::OnceLock::new, |vc| {
447                    let lock = std::sync::OnceLock::new();
448                    let _ = lock.set(vc);
449                    lock
450                }),
451            index_registry: self.index_registry.get().cloned().map_or_else(
452                std::sync::OnceLock::new,
453                |r| {
454                    let lock = std::sync::OnceLock::new();
455                    let _ = lock.set(r);
456                    lock
457                },
458            ),
459            transaction_log: self.transaction_log.get().cloned().map_or_else(
460                std::sync::OnceLock::new,
461                |l| {
462                    let lock = std::sync::OnceLock::new();
463                    let _ = lock.set(l);
464                    lock
465                },
466            ),
467            observer_registry: self.observer_registry.get().cloned().map_or_else(
468                std::sync::OnceLock::new,
469                |r| {
470                    let lock = std::sync::OnceLock::new();
471                    let _ = lock.set(r);
472                    lock
473                },
474            ),
475            table_defs: self
476                .table_defs
477                .get()
478                .cloned()
479                .map_or_else(std::sync::OnceLock::new, |d| {
480                    let lock = std::sync::OnceLock::new();
481                    let _ = lock.set(d);
482                    lock
483                }),
484        }
485    }
486
487    /// Set the per-app secondary-index registry. Type-erased so this crate
488    /// stays free of yeti-sdk imports — yeti-sdk's `Tables::get` does the
489    /// downcast. Can only be called once (uses `OnceLock`). Called at
490    /// startup once `IndexManager`s are built from the loaded schema.
491    pub fn set_index_registry(&self, registry: Arc<dyn std::any::Any + Send + Sync>) {
492        let _ = self.index_registry.set(registry);
493    }
494
495    /// Get the per-app secondary-index registry, if set. Callers (yeti-sdk)
496    /// downcast to their concrete `HashMap<String, Arc<IndexManager>>`.
497    pub fn index_registry(&self) -> Option<&Arc<dyn std::any::Any + Send + Sync>> {
498        self.index_registry.get()
499    }
500
501    /// List all table names managed by this `BackendManager`.
502    pub fn table_names(&self) -> Vec<String> {
503        self.table_backends.keys().cloned().collect()
504    }
505
506    /// Get the storage backend for a specific table by name.
507    pub fn get_backend_for_table(&self, table_name: &str) -> Result<Arc<dyn KvBackend>> {
508        // Exact match first (no allocation)
509        if let Some(backend) = self.table_backends.get(table_name) {
510            return Ok(Arc::clone(backend));
511        }
512        // Case-insensitive via pre-built index (single to_lowercase, no linear scan)
513        let lower = table_name.to_lowercase();
514        if let Some(canonical) = self.lowercase_index.get(&lower)
515            && let Some(backend) = self.table_backends.get(canonical)
516        {
517            return Ok(Arc::clone(backend));
518        }
519        Err(YetiError::NotFound {
520            resource_type: "backend".to_owned(),
521            id: table_name.to_owned(),
522        })
523    }
524
525    /// Probe storage health by performing lightweight reads across all backends.
526    pub async fn health_probe(&self) -> Result<()> {
527        if self.table_backends.is_empty() {
528            return Err(YetiError::Internal(
529                "No table backends registered".to_owned(),
530            ));
531        }
532        for backend in self.table_backends.values() {
533            backend.get(b"__health_probe__").await?;
534        }
535        Ok(())
536    }
537
538    /// Validate that a table is using the correct backend.
539    pub async fn validate_table_backend(
540        &self,
541        table_name: &str,
542        key_prefix: &str,
543        expected_backend: BackendType,
544    ) -> Result<()> {
545        let metadata_key = format!("{}{}:{}", self.metadata_prefix, key_prefix, table_name);
546
547        {
548            let mut cache = self
549                .backend_type_cache
550                .write()
551                .with_context(|| "table backends cache lock".to_owned())?;
552            if let Some(&existing_backend) = cache.get(table_name) {
553                if existing_backend != expected_backend {
554                    return Err(YetiError::Validation(format!(
555                        "Table '{table_name}' already exists with {existing_backend} backend but schema specifies {expected_backend} backend. \
556                         Storage backend migration is not yet supported."
557                    )));
558                }
559                return Ok(());
560            }
561        }
562
563        let backend = self.get_backend_for_table(table_name)?;
564
565        if let Some(stored_value) = backend.get(metadata_key.as_bytes()).await? {
566            let stored_backend_str = String::from_utf8_lossy(&stored_value);
567            let stored_backend: BackendType = stored_backend_str.parse()?;
568
569            if stored_backend != expected_backend {
570                return Err(YetiError::Validation(format!(
571                    "Table '{table_name}' has stored backend metadata indicating {stored_backend}, \
572                     but schema specifies {expected_backend}. Storage backend migration is not yet supported."
573                )));
574            }
575
576            self.backend_type_cache
577                .write()
578                .with_context(|| "table backends cache lock".to_owned())?
579                .put(table_name.to_owned(), stored_backend);
580            return Ok(());
581        }
582
583        backend
584            .put(
585                metadata_key.as_bytes(),
586                expected_backend.to_string().as_bytes(),
587            )
588            .await?;
589
590        self.backend_type_cache
591            .write()
592            .with_context(|| "table backends cache lock".to_owned())?
593            .put(table_name.to_owned(), expected_backend);
594
595        tracing::debug!(
596            "  Initialized new table '{}' with {} backend",
597            table_name,
598            expected_backend
599        );
600
601        Ok(())
602    }
603}
604
605/// Get current Unix timestamp in seconds.
606pub fn unix_timestamp() -> Result<u64> {
607    std::time::SystemTime::now()
608        .duration_since(std::time::UNIX_EPOCH)
609        .map(|d| d.as_secs())
610        .map_err(|_| {
611            YetiError::Internal("System time is before UNIX epoch - check system clock".to_owned())
612        })
613}
614
615/// Get current Unix timestamp in seconds with safe fallback.
616#[inline]
617#[must_use]
618pub fn now_secs() -> u64 {
619    std::time::SystemTime::now()
620        .duration_since(std::time::UNIX_EPOCH)
621        .map_or(0, |d| d.as_secs())
622}