Skip to main content

reddb_server/
api.rs

1//! Public API layer for the RedDB crate.
2//!
3//! This module is the first layer to consume from applications:
4//! - stable options and contracts
5//! - capability declarations
6//! - typed errors and lightweight metadata snapshots
7//! - cross-layer traits for catalog/operations observability
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::io;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::auth::AuthConfig;
17use crate::replication::ReplicationConfig;
18
19pub const DEFAULT_SNAPSHOT_RETENTION: usize = 16;
20pub const DEFAULT_EXPORT_RETENTION: usize = 16;
21
22pub const REDDB_PROTOCOL_VERSION: &str = "reddb-v2";
23pub const REDDB_FORMAT_VERSION: u32 = 2;
24/// Default group-commit window.
25///
26/// `0` = "no wait" — the background flusher fsyncs as soon as any
27/// writer's pending commit arrives. Under single-writer workloads a
28/// non-zero window would be pure latency (no one to batch with),
29/// capping individual commit throughput at ~1000/s for window=1.
30/// Concurrent writers still batch naturally via the `Mutex<WalWriter>`
31/// contention path without needing an explicit timer.
32///
33/// Operators with many concurrent clients can raise this (e.g. 1-5ms)
34/// to amortise fsync cost across a bigger batch — at the cost of p99
35/// tail latency going up by the window size.
36pub const DEFAULT_GROUP_COMMIT_WINDOW_MS: u64 = 0;
37pub const DEFAULT_GROUP_COMMIT_MAX_STATEMENTS: usize = 128;
38pub const DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES: u64 = 1024 * 1024;
39pub(crate) const EPHEMERAL_RUNTIME_METADATA_KEY: &str = "__reddb_ephemeral_runtime";
40
41pub type RedDBResult<T> = Result<T, RedDBError>;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
44pub enum StorageMode {
45    /// Durable, file-backed database with WAL + checkpointing.
46    #[default]
47    Persistent,
48}
49
50impl StorageMode {
51    pub const fn is_persistent(self) -> bool {
52        matches!(self, Self::Persistent)
53    }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
57pub enum DurabilityMode {
58    #[default]
59    Strict,
60    WalDurableGrouped,
61    /// Fire-and-forget. Writers return as soon as the WAL record is
62    /// in the in-memory ring buffer — they do NOT wait for fsync.
63    /// The group-commit background thread still flushes on its
64    /// cadence, so committed-but-unflushed work is bounded by
65    /// `GroupCommitOptions::window_ms`. A crash inside the window
66    /// drops whatever wasn't flushed — matches PG's
67    /// `synchronous_commit=off` contract.
68    Async,
69}
70
71impl DurabilityMode {
72    pub const fn as_str(self) -> &'static str {
73        match self {
74            Self::Strict => "strict",
75            Self::WalDurableGrouped => "wal_durable_grouped",
76            Self::Async => "async",
77        }
78    }
79
80    pub fn from_str(value: &str) -> Option<Self> {
81        let normalized = value.trim().to_ascii_lowercase();
82        match normalized.as_str() {
83            // Legacy / opt-out form. Every commit pays its own fsync.
84            "strict" => Some(Self::Strict),
85            // Group-commit sync path — the perf-parity default. Matches
86            // PostgreSQL's `synchronous_commit=on` behaviour: the
87            // writer waits for durability, but fsyncs are batched
88            // across concurrent writers so a burst of N commits pays
89            // ~O(1) fsyncs instead of O(N).
90            "sync"
91            | "wal_durable_grouped"
92            | "wal-durable-grouped"
93            | "grouped"
94            | "wal_grouped"
95            | "wal-grouped" => Some(Self::WalDurableGrouped),
96            // Fire-and-forget async: writers return as soon as the
97            // WAL buffer accepts the record; background flusher runs
98            // on its configured cadence.
99            "async" | "fire_and_forget" | "fire-and-forget" => Some(Self::Async),
100            _ => None,
101        }
102    }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct GroupCommitOptions {
107    pub window_ms: u64,
108    pub max_statements: usize,
109    pub max_wal_bytes: u64,
110}
111
112impl Default for GroupCommitOptions {
113    fn default() -> Self {
114        Self {
115            window_ms: DEFAULT_GROUP_COMMIT_WINDOW_MS,
116            max_statements: DEFAULT_GROUP_COMMIT_MAX_STATEMENTS,
117            max_wal_bytes: DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES,
118        }
119    }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
123pub enum Capability {
124    /// Structured row storage.
125    Table,
126    /// Graph nodes/edges.
127    Graph,
128    /// Vector collections and ANN search.
129    Vector,
130    /// Full-text / lexical search.
131    FullText,
132    /// Text/metadata security and enrichment modules.
133    Security,
134    /// Encryption at rest.
135    Encryption,
136}
137
138impl Capability {
139    pub const fn as_str(self) -> &'static str {
140        match self {
141            Self::Table => "table",
142            Self::Graph => "graph",
143            Self::Vector => "vector",
144            Self::FullText => "fulltext",
145            Self::Security => "security",
146            Self::Encryption => "encryption",
147        }
148    }
149}
150
151#[derive(Debug, Clone, Default)]
152pub struct CapabilitySet {
153    items: BTreeSet<Capability>,
154}
155
156impl CapabilitySet {
157    pub fn new() -> Self {
158        Self::default()
159    }
160
161    pub fn with(mut self, capability: Capability) -> Self {
162        self.items.insert(capability);
163        self
164    }
165
166    pub fn with_all(mut self, capabilities: &[Capability]) -> Self {
167        capabilities.iter().copied().for_each(|capability| {
168            self.items.insert(capability);
169        });
170        self
171    }
172
173    pub fn has(&self, capability: Capability) -> bool {
174        self.items.contains(&capability)
175    }
176
177    pub fn as_slice(&self) -> Vec<Capability> {
178        self.items.iter().copied().collect()
179    }
180}
181
182pub struct RedDBOptions {
183    pub mode: StorageMode,
184    pub data_path: Option<PathBuf>,
185    pub read_only: bool,
186    pub create_if_missing: bool,
187    pub verify_checksums: bool,
188    pub durability_mode: DurabilityMode,
189    pub group_commit: GroupCommitOptions,
190    pub auto_checkpoint_pages: u32,
191    pub cache_pages: usize,
192    pub snapshot_retention: usize,
193    pub export_retention: usize,
194    pub feature_gates: CapabilitySet,
195    pub force_create: bool,
196    pub metadata: BTreeMap<String, String>,
197    /// Optional remote storage backend for snapshot transport.
198    pub remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
199    /// Optional CAS-capable handle to the same backend, populated by
200    /// the factory when the configured backend implements
201    /// `AtomicRemoteBackend` (S3/local always; HTTP only when
202    /// `RED_HTTP_CONDITIONAL_WRITES=true`). `None` for backends that
203    /// do not provide compare-and-swap (Turso, D1, plain HTTP).
204    /// `LeaseStore` and any future CAS consumer pull from this field.
205    pub remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
206    /// Remote object key used by the remote backend.
207    pub remote_key: Option<String>,
208    /// Replication configuration.
209    pub replication: ReplicationConfig,
210    /// Authentication & authorization configuration.
211    pub auth: AuthConfig,
212    /// Control Event Ledger configuration (issue #652). Read from
213    /// `REDDB_COMPLIANCE_MODE` at boot.
214    pub control_events: crate::runtime::control_events::ControlEventConfig,
215    /// Scoped data-plane query audit configuration. Disabled by
216    /// default; the regulated preset enables the stream without
217    /// adding catch-all rules.
218    pub query_audit: crate::runtime::query_audit::QueryAuditConfig,
219    /// Auto-create a HASH index on a user `id` column the first time a
220    /// row carrying that column is inserted into a collection. See
221    /// `UnifiedStoreConfig::auto_index_id`. Defaults to `true`; set to
222    /// `false` to opt out per workload (e.g. ingest pipelines that
223    /// don't need point-lookups by `id`).
224    pub auto_index_id: bool,
225    /// Tiered storage layout preset. Drives the defaults of the six
226    /// tier-flag toggles (`.meta.json` sidecar, seq-N catalog journal,
227    /// `-shm` provisioning, audit/slow log destinations, `fold_pager_meta`,
228    /// `fold_dwb_into_wal`). Explicit per-feature setters still win over
229    /// the tier default. See `crate::storage::layout::StorageLayout`.
230    pub layout: crate::storage::layout::StorageLayout,
231    /// Per-feature overrides applied on top of the resolved layout preset.
232    pub layout_overrides: crate::storage::layout::LayoutOverrides,
233    /// Operator-facing storage/deploy profile contract. This records the
234    /// selected posture before the physical directory layout is expanded.
235    pub storage_profile: crate::storage::profile::StorageProfileSelection,
236    /// True only when the caller explicitly selected a layout via
237    /// `with_layout`. When false, `apply_tier_defaults` short-circuits so
238    /// pre-existing process-global toggles (set by tests / env hatches
239    /// before opening a runtime) are not clobbered by the implicit
240    /// `Standard` default. The new tier-driven behavior is opt-in.
241    pub layout_explicit: bool,
242}
243
244impl fmt::Debug for RedDBOptions {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        let backend_name = self.remote_backend.as_ref().map(|b| b.name().to_string());
247        f.debug_struct("RedDBOptions")
248            .field("mode", &self.mode)
249            .field("data_path", &self.data_path)
250            .field("read_only", &self.read_only)
251            .field("create_if_missing", &self.create_if_missing)
252            .field("verify_checksums", &self.verify_checksums)
253            .field("durability_mode", &self.durability_mode)
254            .field("group_commit", &self.group_commit)
255            .field("auto_checkpoint_pages", &self.auto_checkpoint_pages)
256            .field("cache_pages", &self.cache_pages)
257            .field("snapshot_retention", &self.snapshot_retention)
258            .field("export_retention", &self.export_retention)
259            .field("feature_gates", &self.feature_gates)
260            .field("force_create", &self.force_create)
261            .field("metadata", &self.metadata)
262            .field("remote_backend", &backend_name)
263            .field("remote_key", &self.remote_key)
264            .field("replication", &self.replication)
265            .field("auth", &self.auth)
266            .field("control_events", &self.control_events)
267            .field("query_audit", &self.query_audit)
268            .field("layout", &self.layout)
269            .field("layout_overrides", &self.layout_overrides)
270            .field("storage_profile", &self.storage_profile)
271            .finish()
272    }
273}
274
275impl Clone for RedDBOptions {
276    fn clone(&self) -> Self {
277        Self {
278            mode: self.mode,
279            data_path: self.data_path.clone(),
280            read_only: self.read_only,
281            create_if_missing: self.create_if_missing,
282            verify_checksums: self.verify_checksums,
283            durability_mode: self.durability_mode,
284            group_commit: self.group_commit,
285            auto_checkpoint_pages: self.auto_checkpoint_pages,
286            cache_pages: self.cache_pages,
287            snapshot_retention: self.snapshot_retention,
288            export_retention: self.export_retention,
289            feature_gates: self.feature_gates.clone(),
290            force_create: self.force_create,
291            metadata: self.metadata.clone(),
292            remote_backend: self.remote_backend.clone(),
293            remote_backend_atomic: self.remote_backend_atomic.clone(),
294            remote_key: self.remote_key.clone(),
295            replication: self.replication.clone(),
296            auth: self.auth.clone(),
297            control_events: self.control_events,
298            query_audit: self.query_audit.clone(),
299            auto_index_id: self.auto_index_id,
300            layout: self.layout,
301            layout_overrides: self.layout_overrides.clone(),
302            storage_profile: self.storage_profile,
303            layout_explicit: self.layout_explicit,
304        }
305    }
306}
307
308impl Default for RedDBOptions {
309    fn default() -> Self {
310        Self {
311            mode: StorageMode::Persistent,
312            data_path: None,
313            read_only: false,
314            create_if_missing: true,
315            verify_checksums: true,
316            // Perf-parity default — `WalDurableGrouped` matches
317            // PostgreSQL's `synchronous_commit=on` behaviour while
318            // amortising fsync cost across concurrent writers. The
319            // legacy `Strict` tier (per-commit fsync) stays available
320            // via `durability.mode = "strict"` / `REDDB_DURABILITY=strict`.
321            durability_mode: DurabilityMode::WalDurableGrouped,
322            group_commit: GroupCommitOptions::default(),
323            auto_checkpoint_pages: 1000,
324            cache_pages: 10_000,
325            snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
326            export_retention: DEFAULT_EXPORT_RETENTION,
327            feature_gates: CapabilitySet::new()
328                .with(Capability::Table)
329                .with(Capability::Graph)
330                .with(Capability::Vector),
331            force_create: true,
332            metadata: BTreeMap::new(),
333            remote_backend: None,
334            remote_backend_atomic: None,
335            remote_key: None,
336            replication: ReplicationConfig::standalone(),
337            auth: AuthConfig::default(),
338            control_events: crate::runtime::control_events::ControlEventConfig::default(),
339            query_audit: crate::runtime::query_audit::QueryAuditConfig::default(),
340            auto_index_id: true,
341            layout: crate::storage::layout::StorageLayout::default(),
342            layout_overrides: crate::storage::layout::LayoutOverrides::default(),
343            storage_profile: crate::storage::profile::StorageProfileSelection::embedded_single_file(
344            ),
345            layout_explicit: false,
346        }
347    }
348}
349
350impl RedDBOptions {
351    pub fn persistent<P: Into<PathBuf>>(path: P) -> Self {
352        Self {
353            mode: StorageMode::Persistent,
354            data_path: Some(path.into()),
355            ..Default::default()
356        }
357    }
358
359    /// Ephemeral, tempfile-backed database.
360    ///
361    /// The underlying storage is a real persistent file placed under the system
362    /// temp directory with a unique name — there is no longer a true in-memory
363    /// execution mode. Prefer [`RedDBOptions::persistent`] when the data should
364    /// outlive the process.
365    pub fn in_memory() -> Self {
366        static NEXT_EPHEMERAL_ID: std::sync::atomic::AtomicU64 =
367            std::sync::atomic::AtomicU64::new(0);
368
369        let now_nanos = std::time::SystemTime::now()
370            .duration_since(std::time::UNIX_EPOCH)
371            .map(|duration| duration.as_nanos())
372            .unwrap_or(0);
373        let unique = NEXT_EPHEMERAL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
374        // Each ephemeral runtime gets its OWN directory, not just a unique
375        // filename in the shared temp dir. Sibling artifacts (the audit log,
376        // WAL, snapshots) are derived from this data path's PARENT, so a
377        // shared parent collapses every ephemeral runtime's `.audit.log`
378        // onto one file — which parallel test processes (nextest runs each
379        // test in its own process, frequently under one shared `TMPDIR`)
380        // then truncate/remove out from under one another. A per-instance
381        // directory keeps each runtime's siblings self-contained.
382        let dir = std::env::temp_dir().join(format!(
383            "reddb-ephemeral-{}-{}-{}",
384            std::process::id(),
385            now_nanos,
386            unique
387        ));
388        let _ = std::fs::create_dir_all(&dir);
389        let path = dir.join("db.rdb");
390        let _ = std::fs::remove_file(&path);
391        let mut metadata = BTreeMap::new();
392        metadata.insert(
393            EPHEMERAL_RUNTIME_METADATA_KEY.to_string(),
394            "true".to_string(),
395        );
396        Self {
397            mode: StorageMode::Persistent,
398            data_path: Some(path),
399            auto_checkpoint_pages: 0,
400            cache_pages: 2_000,
401            snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
402            export_retention: DEFAULT_EXPORT_RETENTION,
403            read_only: false,
404            force_create: true,
405            metadata,
406            ..Default::default()
407        }
408    }
409
410    pub fn with_mode(mut self, mode: StorageMode) -> Self {
411        self.mode = mode;
412        self
413    }
414
415    pub fn with_data_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
416        // `in_memory()` eagerly creates a `reddb-ephemeral-*` dir and points
417        // `data_path` at it. Overriding the path abandons that dir — and the
418        // per-runtime cleanup keys off the FINAL `data_path`, so the orphan
419        // would leak in TMPDIR (caught by scripts/check-temp-residue.sh). Remove
420        // it now if the previous path was such an eagerly-created ephemeral dir.
421        if let Some(old) = self.data_path.take() {
422            let tmp = std::env::temp_dir();
423            if let Some(parent) = old.parent() {
424                let orphaned = parent
425                    .file_name()
426                    .and_then(|name| name.to_str())
427                    .is_some_and(|name| name.starts_with("reddb-ephemeral-"))
428                    && parent.parent() == Some(tmp.as_path());
429                if orphaned {
430                    let _ = std::fs::remove_dir_all(parent);
431                }
432            }
433        }
434        self.data_path = Some(path.into());
435        self
436    }
437
438    pub fn with_read_only(mut self, read_only: bool) -> Self {
439        self.read_only = read_only;
440        self
441    }
442
443    pub fn with_auto_checkpoint(mut self, pages: u32) -> Self {
444        self.auto_checkpoint_pages = pages;
445        self
446    }
447
448    pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
449        self.durability_mode = mode;
450        self
451    }
452
453    pub fn with_group_commit_window_ms(mut self, window_ms: u64) -> Self {
454        // `0` is a legitimate setting — "no wait, fsync on every wakeup".
455        // See `DEFAULT_GROUP_COMMIT_WINDOW_MS` docs for the tradeoff.
456        self.group_commit.window_ms = window_ms;
457        self
458    }
459
460    pub fn with_group_commit_max_statements(mut self, max_statements: usize) -> Self {
461        self.group_commit.max_statements = max_statements.max(1);
462        self
463    }
464
465    pub fn with_group_commit_max_wal_bytes(mut self, max_wal_bytes: u64) -> Self {
466        self.group_commit.max_wal_bytes = max_wal_bytes.max(1);
467        self
468    }
469
470    pub fn with_cache_pages(mut self, pages: usize) -> Self {
471        self.cache_pages = pages.max(2);
472        self
473    }
474
475    pub fn with_snapshot_retention(mut self, limit: usize) -> Self {
476        self.snapshot_retention = limit.max(1);
477        self
478    }
479
480    pub fn with_export_retention(mut self, limit: usize) -> Self {
481        self.export_retention = limit.max(1);
482        self
483    }
484
485    pub fn with_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
486        self.metadata.insert(key.into(), value.into());
487        self
488    }
489
490    /// Toggle the implicit HASH index on user `id` columns at first
491    /// insert (#112). Defaults to enabled — pass `false` to fall back
492    /// to the legacy "scan unless `CREATE INDEX` is issued" behaviour.
493    pub fn with_auto_index_id(mut self, enabled: bool) -> Self {
494        self.auto_index_id = enabled;
495        self
496    }
497
498    pub fn with_capability(mut self, capability: Capability) -> Self {
499        self.feature_gates = self.feature_gates.with(capability);
500        self
501    }
502
503    /// Attach a remote storage backend for snapshot transport.
504    ///
505    /// On open, the database snapshot is downloaded from the remote `key`
506    /// to the local data path. On flush, the local file is uploaded back
507    /// to the remote backend under the same key.
508    pub fn with_remote_backend(
509        mut self,
510        backend: Arc<dyn crate::storage::backend::RemoteBackend>,
511        key: impl Into<String>,
512    ) -> Self {
513        self.remote_backend = Some(backend);
514        self.remote_key = Some(key.into());
515        self
516    }
517
518    /// Attach a CAS-capable backend handle. Pass the same `Arc` as
519    /// `with_remote_backend` (factories should construct the backend
520    /// once and call both setters); this method exists so the type
521    /// system, not runtime config, decides whether `LeaseStore` is
522    /// reachable.
523    pub fn with_atomic_remote_backend(
524        mut self,
525        backend: Arc<dyn crate::storage::backend::AtomicRemoteBackend>,
526    ) -> Self {
527        self.remote_backend_atomic = Some(backend);
528        self
529    }
530
531    pub fn with_replication(mut self, config: ReplicationConfig) -> Self {
532        self.replication = config;
533        self
534    }
535
536    pub fn with_auth(mut self, config: AuthConfig) -> Self {
537        self.auth = config;
538        self
539    }
540
541    pub fn resolved_path(&self, fallback: impl AsRef<Path>) -> PathBuf {
542        self.data_path
543            .clone()
544            .unwrap_or_else(|| fallback.as_ref().to_path_buf())
545    }
546
547    pub fn remote_namespace_prefix(&self) -> String {
548        let Some(remote_key) = &self.remote_key else {
549            return String::new();
550        };
551        let normalized = remote_key.trim_matches('/');
552        if normalized.is_empty() {
553            return String::new();
554        }
555        match normalized.rsplit_once('/') {
556            Some((parent, _)) if !parent.is_empty() => format!("{parent}/"),
557            _ => String::new(),
558        }
559    }
560
561    pub fn default_backup_head_key(&self) -> String {
562        if let Some(value) = self.metadata.get("red.config.backup.head_key") {
563            return value.clone();
564        }
565        reddb_file::backup_head_key(&self.remote_namespace_prefix())
566    }
567
568    pub fn default_snapshot_prefix(&self) -> String {
569        if let Some(value) = self.metadata.get("red.config.backup.snapshot_prefix") {
570            return value.clone();
571        }
572        reddb_file::backup_snapshot_prefix(&self.remote_namespace_prefix())
573    }
574
575    pub fn default_wal_archive_prefix(&self) -> String {
576        if let Some(value) = self.metadata.get("red.config.wal.archive.prefix") {
577            return value.clone();
578        }
579        reddb_file::backup_wal_prefix(&self.remote_namespace_prefix())
580    }
581
582    pub fn has_capability(&self, capability: Capability) -> bool {
583        self.feature_gates.has(capability)
584    }
585
586    /// Select the active storage-layout preset. Drives tier defaults for
587    /// the six tier-flag toggles. Per-feature setters still win.
588    pub fn with_layout(mut self, layout: crate::storage::layout::StorageLayout) -> Self {
589        self.layout = layout;
590        self.layout_explicit = true;
591        self
592    }
593
594    /// Override individual layout knobs (dedicated dirs + log routing) on
595    /// top of the active preset.
596    pub fn with_layout_overrides(
597        mut self,
598        overrides: crate::storage::layout::LayoutOverrides,
599    ) -> Self {
600        self.layout_overrides = overrides;
601        self
602    }
603
604    pub fn with_storage_profile(
605        mut self,
606        selection: crate::storage::profile::StorageProfileSelection,
607    ) -> Result<Self, String> {
608        self.storage_profile = selection.validate()?;
609        Ok(self)
610    }
611
612    /// Resolve `(data_path, TieredLayoutPaths)` for this options bundle.
613    /// Returns `None` when `data_path` is unset (in-memory ephemeral
614    /// instances don't materialise a support tree).
615    pub fn resolve_tiered_layout(
616        &self,
617    ) -> Option<(PathBuf, crate::storage::layout::TieredLayoutPaths)> {
618        let data_path = self.data_path.clone()?;
619        let paths = crate::storage::layout::TieredLayoutPaths::new(
620            &data_path,
621            self.layout,
622            self.layout_overrides.clone(),
623        );
624        Some((data_path, paths))
625    }
626
627    /// Flip the process-global tier-flag toggles to match this options
628    /// bundle's layout, and stash the resolved [`TieredLayoutPaths`] for
629    /// status surfaces. Idempotent. Per-feature env escape hatches
630    /// (`REDDB_META_JSON_SIDECAR=...` and friends) still override what
631    /// this method sets — they are read inside the per-toggle getter, not
632    /// here.
633    ///
634    /// Tier defaults:
635    ///
636    /// | toggle                     | minimal | standard | performance | max |
637    /// |----------------------------|:-------:|:--------:|:-----------:|:---:|
638    /// | `.meta.json` sidecar       |   off   |    off   |     off     |  on |
639    /// | seq-N catalog journal      |   off   |    off   |     off     |  on |
640    /// | `-shm` provisioning        |   off   | **on**   |   **on**    |  on |
641    /// | `fold_pager_meta`          |   off   |    off   |     off     |  on |
642    /// | `fold_dwb_into_wal`        |   off   |    off   |     off     |  on |
643    /// | audit/slow log destination | stderr  |  stderr  |  file       | file|
644    ///
645    /// Retention for the seq-N journal: 32 at `Max`, 4 when the operator
646    /// opts in on a lower tier via `REDDB_SEQN_JOURNAL=1`.
647    pub fn apply_tier_defaults(&self) {
648        use crate::storage::layout::StorageLayout;
649
650        // Opt-in: only flip the process-global toggles when the operator
651        // explicitly chose a layout via `with_layout`. Tests and env
652        // hatches that pre-set toggles before opening a runtime must not
653        // be silently overridden by the implicit `Standard` default.
654        if !self.layout_explicit {
655            if let Some((_, paths)) = self.resolve_tiered_layout() {
656                tier_wiring::stash_layout_paths(paths);
657            }
658            return;
659        }
660
661        let layout = self.layout;
662        // .meta.json sidecar — Max only.
663        crate::physical::set_meta_json_sidecar_enabled(matches!(layout, StorageLayout::Max));
664
665        // Seq-N catalog journal — Max only. Retention = 32 for Max,
666        // OPT_IN baseline (4) for lower tiers (the value is consulted
667        // when an env hatch flips the toggle on outside Max).
668        crate::physical::set_seqn_journal_enabled(matches!(layout, StorageLayout::Max));
669        crate::physical::set_seqn_journal_retention(match layout {
670            StorageLayout::Max => crate::physical::DEFAULT_METADATA_JOURNAL_RETENTION,
671            _ => crate::physical::OPT_IN_METADATA_JOURNAL_RETENTION,
672        });
673
674        // `-shm` provisioning — anything ≥ Standard (gh-475 acceptance).
675        crate::physical::set_shm_provisioning_enabled(matches!(
676            layout,
677            StorageLayout::Standard | StorageLayout::Performance | StorageLayout::Max
678        ));
679
680        // Fold pager meta + fold DWB into WAL — Max only (single
681        // datafile + single recovery substrate is the Max story).
682        crate::physical::set_fold_pager_meta_enabled(matches!(layout, StorageLayout::Max));
683        crate::physical::set_fold_dwb_into_wal_enabled(matches!(layout, StorageLayout::Max));
684
685        // Cache resolved layout paths for `red status` / diagnostics.
686        if let Some((_, paths)) = self.resolve_tiered_layout() {
687            tier_wiring::stash_layout_paths(paths);
688        }
689    }
690}
691
692/// Process-global cache of the most recently applied [`TieredLayoutPaths`].
693/// Read by `red status` to surface resolved log destinations. Written by
694/// `RedDBOptions::apply_tier_defaults` after each open.
695pub mod tier_wiring {
696    use std::sync::Mutex;
697
698    use crate::storage::layout::{LogDestination, TieredLayoutPaths};
699
700    static CURRENT_LAYOUT_PATHS: Mutex<Option<TieredLayoutPaths>> = Mutex::new(None);
701
702    pub fn stash_layout_paths(paths: TieredLayoutPaths) {
703        if let Ok(mut slot) = CURRENT_LAYOUT_PATHS.lock() {
704            *slot = Some(paths);
705        }
706    }
707
708    pub fn current_layout_paths() -> Option<TieredLayoutPaths> {
709        CURRENT_LAYOUT_PATHS
710            .lock()
711            .ok()
712            .and_then(|slot| slot.clone())
713    }
714
715    /// `(audit_log, slow_log)` destinations resolved from the active
716    /// layout. Falls back to `(Stderr, Stderr)` when no layout has been
717    /// applied yet (e.g. ephemeral in-memory paths).
718    pub fn current_log_destinations() -> (LogDestination, LogDestination) {
719        match current_layout_paths() {
720            Some(p) => (p.audit_log_destination, p.slow_log_destination),
721            None => (LogDestination::Stderr, LogDestination::Stderr),
722        }
723    }
724}
725
726#[derive(Debug, Clone, Default)]
727pub struct CollectionStats {
728    pub entities: usize,
729    pub cross_refs: usize,
730    pub segments: usize,
731}
732
733#[derive(Debug, Clone)]
734pub struct CatalogSnapshot {
735    pub name: String,
736    pub total_entities: usize,
737    pub total_collections: usize,
738    pub stats_by_collection: BTreeMap<String, CollectionStats>,
739    pub updated_at: SystemTime,
740}
741
742impl Default for CatalogSnapshot {
743    fn default() -> Self {
744        Self {
745            name: String::new(),
746            total_entities: 0,
747            total_collections: 0,
748            stats_by_collection: BTreeMap::new(),
749            updated_at: UNIX_EPOCH,
750        }
751    }
752}
753
754#[derive(Debug, Clone)]
755pub struct SchemaManifest {
756    pub format_version: u32,
757    pub created_at_unix_ms: u128,
758    pub updated_at_unix_ms: u128,
759    pub options: RedDBOptions,
760    pub collection_count: usize,
761}
762
763impl SchemaManifest {
764    pub fn now(options: RedDBOptions, collection_count: usize) -> Self {
765        let now = SystemTime::now()
766            .duration_since(UNIX_EPOCH)
767            .unwrap_or_default()
768            .as_millis();
769        Self {
770            format_version: REDDB_FORMAT_VERSION,
771            created_at_unix_ms: now,
772            updated_at_unix_ms: now,
773            options,
774            collection_count,
775        }
776    }
777}
778
779#[derive(Debug)]
780pub enum RedDBError {
781    InvalidConfig(String),
782    SchemaVersionMismatch {
783        expected: u32,
784        found: u32,
785    },
786    FeatureNotEnabled(String),
787    NotFound(String),
788    ReadOnly(String),
789    InvalidOperation(String),
790    Engine(String),
791    Catalog(String),
792    Query(String),
793    Validation {
794        message: String,
795        validation: crate::json::Value,
796    },
797    Io(io::Error),
798    VersionUnavailable,
799    /// Operator-pinned cap exceeded (PLAN.md Phase 4.1). The string
800    /// payload should follow the `quota_exceeded:<limit_name>:<current>:<max>`
801    /// shape so HTTP / wire surfaces can map to the right status
802    /// (507 for storage, 429 for rate, 504 for duration, 413 for
803    /// payload).
804    QuotaExceeded(String),
805    /// Issue #769 (PRD #759 / S10) — an aggregating executor
806    /// (`aggregation`, `sort`, `window`) exceeded
807    /// `stream.executor.max_materialized_rows` in its in-memory state.
808    /// Carries the executor that fired, the configured ceiling and the
809    /// live row count at the breach so the client can decide whether to
810    /// redesign the query, raise the limit, or pre-aggregate. Surfaces
811    /// as HTTP 507 with a structured `materialization_limit_exceeded`
812    /// envelope; NDJSON streams emit the same code mid-stream.
813    MaterializationLimitExceeded {
814        executor: &'static str,
815        limit: usize,
816        current: usize,
817    },
818    Internal(String),
819}
820
821impl fmt::Display for RedDBError {
822    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
823        match self {
824            Self::InvalidConfig(msg) => write!(f, "invalid config: {msg}"),
825            Self::SchemaVersionMismatch { expected, found } => {
826                write!(
827                    f,
828                    "schema version mismatch: expected {expected}, found {found}"
829                )
830            }
831            Self::FeatureNotEnabled(msg) => write!(f, "feature disabled: {msg}"),
832            Self::NotFound(msg) => write!(f, "not found: {msg}"),
833            Self::ReadOnly(msg) => write!(f, "read-only violation: {msg}"),
834            Self::InvalidOperation(msg) => write!(f, "INVALID_OPERATION: {msg}"),
835            Self::Engine(msg) => write!(f, "engine error: {msg}"),
836            Self::Catalog(msg) => write!(f, "catalog error: {msg}"),
837            Self::Query(msg) => write!(f, "query error: {msg}"),
838            Self::Validation { message, .. } => write!(f, "validation error: {message}"),
839            Self::Io(err) => write!(f, "io error: {err}"),
840            Self::VersionUnavailable => write!(f, "version information unavailable"),
841            Self::QuotaExceeded(msg) => write!(f, "quota exceeded: {msg}"),
842            Self::MaterializationLimitExceeded {
843                executor,
844                limit,
845                current,
846            } => write!(
847                f,
848                "materialization limit exceeded: executor={executor} current={current} limit={limit}"
849            ),
850            Self::Internal(msg) => write!(f, "internal error: {msg}"),
851        }
852    }
853}
854
855impl std::error::Error for RedDBError {}
856
857impl From<io::Error> for RedDBError {
858    fn from(err: io::Error) -> Self {
859        Self::Io(err)
860    }
861}
862
863impl From<crate::storage::engine::DatabaseError> for RedDBError {
864    fn from(err: crate::storage::engine::DatabaseError) -> Self {
865        Self::Engine(err.to_string())
866    }
867}
868
869impl From<crate::storage::wal::TxError> for RedDBError {
870    fn from(err: crate::storage::wal::TxError) -> Self {
871        Self::Engine(err.to_string())
872    }
873}
874
875impl From<crate::storage::StoreError> for RedDBError {
876    fn from(err: crate::storage::StoreError) -> Self {
877        Self::Catalog(err.to_string())
878    }
879}
880
881impl From<crate::storage::unified::devx::DevXError> for RedDBError {
882    fn from(err: crate::storage::unified::devx::DevXError) -> Self {
883        match err {
884            crate::storage::unified::devx::DevXError::Validation(msg) => Self::InvalidConfig(msg),
885            crate::storage::unified::devx::DevXError::Storage(msg) => Self::Engine(msg),
886            crate::storage::unified::devx::DevXError::NotFound(msg) => Self::NotFound(msg),
887        }
888    }
889}
890
891pub trait CatalogService {
892    fn list_collections(&self) -> Vec<String>;
893    fn collection_stats(&self, collection: &str) -> Option<CollectionStats>;
894    fn catalog_snapshot(&self) -> CatalogSnapshot;
895}
896
897pub trait QueryPlanner {
898    fn plan_cost(&self, query: &str) -> Option<f64>;
899}
900
901pub trait DataOps {
902    fn execute_query(&self, query: &str) -> RedDBResult<()>;
903}
904
905pub mod prelude {
906    pub use super::{
907        Capability, CapabilitySet, CatalogService, CatalogSnapshot, CollectionStats, DataOps,
908        QueryPlanner, RedDBError, RedDBOptions, RedDBResult, SchemaManifest, StorageMode,
909        REDDB_FORMAT_VERSION, REDDB_PROTOCOL_VERSION,
910    };
911}