Skip to main content

reddb_server/
api.rs

1//! Public API layer for the RedDB crate.
2//!
3//! This module is the first layer to consume from applications:
4//! - stable options and contracts
5//! - capability declarations
6//! - typed errors and lightweight metadata snapshots
7//! - cross-layer traits for catalog/operations observability
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::io;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::auth::AuthConfig;
17use crate::replication::ReplicationConfig;
18
19pub const DEFAULT_SNAPSHOT_RETENTION: usize = 16;
20pub const DEFAULT_EXPORT_RETENTION: usize = 16;
21
22pub const REDDB_PROTOCOL_VERSION: &str = "reddb-v2";
23pub const REDDB_FORMAT_VERSION: u32 = 2;
24/// Default group-commit window.
25///
26/// `0` = "no wait" — the background flusher fsyncs as soon as any
27/// writer's pending commit arrives. Under single-writer workloads a
28/// non-zero window would be pure latency (no one to batch with),
29/// capping individual commit throughput at ~1000/s for window=1.
30/// Concurrent writers still batch naturally via the `Mutex<WalWriter>`
31/// contention path without needing an explicit timer.
32///
33/// Operators with many concurrent clients can raise this (e.g. 1-5ms)
34/// to amortise fsync cost across a bigger batch — at the cost of p99
35/// tail latency going up by the window size.
36pub const DEFAULT_GROUP_COMMIT_WINDOW_MS: u64 = 0;
37pub const DEFAULT_GROUP_COMMIT_MAX_STATEMENTS: usize = 128;
38pub const DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES: u64 = 1024 * 1024;
39
40pub type RedDBResult<T> = Result<T, RedDBError>;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
43pub enum StorageMode {
44    /// Durable, file-backed database with WAL + checkpointing.
45    #[default]
46    Persistent,
47}
48
49impl StorageMode {
50    pub const fn is_persistent(self) -> bool {
51        matches!(self, Self::Persistent)
52    }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
56pub enum DurabilityMode {
57    #[default]
58    Strict,
59    WalDurableGrouped,
60    /// Fire-and-forget. Writers return as soon as the WAL record is
61    /// in the in-memory ring buffer — they do NOT wait for fsync.
62    /// The group-commit background thread still flushes on its
63    /// cadence, so committed-but-unflushed work is bounded by
64    /// `GroupCommitOptions::window_ms`. A crash inside the window
65    /// drops whatever wasn't flushed — matches PG's
66    /// `synchronous_commit=off` contract.
67    Async,
68}
69
70impl DurabilityMode {
71    pub const fn as_str(self) -> &'static str {
72        match self {
73            Self::Strict => "strict",
74            Self::WalDurableGrouped => "wal_durable_grouped",
75            Self::Async => "async",
76        }
77    }
78
79    pub fn from_str(value: &str) -> Option<Self> {
80        let normalized = value.trim().to_ascii_lowercase();
81        match normalized.as_str() {
82            // Legacy / opt-out form. Every commit pays its own fsync.
83            "strict" => Some(Self::Strict),
84            // Group-commit sync path — the perf-parity default. Matches
85            // PostgreSQL's `synchronous_commit=on` behaviour: the
86            // writer waits for durability, but fsyncs are batched
87            // across concurrent writers so a burst of N commits pays
88            // ~O(1) fsyncs instead of O(N).
89            "sync"
90            | "wal_durable_grouped"
91            | "wal-durable-grouped"
92            | "grouped"
93            | "wal_grouped"
94            | "wal-grouped" => Some(Self::WalDurableGrouped),
95            // Fire-and-forget async: writers return as soon as the
96            // WAL buffer accepts the record; background flusher runs
97            // on its configured cadence.
98            "async" | "fire_and_forget" | "fire-and-forget" => Some(Self::Async),
99            _ => None,
100        }
101    }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct GroupCommitOptions {
106    pub window_ms: u64,
107    pub max_statements: usize,
108    pub max_wal_bytes: u64,
109}
110
111impl Default for GroupCommitOptions {
112    fn default() -> Self {
113        Self {
114            window_ms: DEFAULT_GROUP_COMMIT_WINDOW_MS,
115            max_statements: DEFAULT_GROUP_COMMIT_MAX_STATEMENTS,
116            max_wal_bytes: DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES,
117        }
118    }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
122pub enum Capability {
123    /// Structured row storage.
124    Table,
125    /// Graph nodes/edges.
126    Graph,
127    /// Vector collections and ANN search.
128    Vector,
129    /// Full-text / lexical search.
130    FullText,
131    /// Text/metadata security and enrichment modules.
132    Security,
133    /// Encryption at rest.
134    Encryption,
135}
136
137impl Capability {
138    pub const fn as_str(self) -> &'static str {
139        match self {
140            Self::Table => "table",
141            Self::Graph => "graph",
142            Self::Vector => "vector",
143            Self::FullText => "fulltext",
144            Self::Security => "security",
145            Self::Encryption => "encryption",
146        }
147    }
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct CapabilitySet {
152    items: BTreeSet<Capability>,
153}
154
155impl CapabilitySet {
156    pub fn new() -> Self {
157        Self::default()
158    }
159
160    pub fn with(mut self, capability: Capability) -> Self {
161        self.items.insert(capability);
162        self
163    }
164
165    pub fn with_all(mut self, capabilities: &[Capability]) -> Self {
166        capabilities.iter().copied().for_each(|capability| {
167            self.items.insert(capability);
168        });
169        self
170    }
171
172    pub fn has(&self, capability: Capability) -> bool {
173        self.items.contains(&capability)
174    }
175
176    pub fn as_slice(&self) -> Vec<Capability> {
177        self.items.iter().copied().collect()
178    }
179}
180
181pub struct RedDBOptions {
182    pub mode: StorageMode,
183    pub data_path: Option<PathBuf>,
184    pub read_only: bool,
185    pub create_if_missing: bool,
186    pub verify_checksums: bool,
187    pub durability_mode: DurabilityMode,
188    pub group_commit: GroupCommitOptions,
189    pub auto_checkpoint_pages: u32,
190    pub cache_pages: usize,
191    pub snapshot_retention: usize,
192    pub export_retention: usize,
193    pub feature_gates: CapabilitySet,
194    pub force_create: bool,
195    pub metadata: BTreeMap<String, String>,
196    /// Optional remote storage backend for snapshot transport.
197    pub remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
198    /// Optional CAS-capable handle to the same backend, populated by
199    /// the factory when the configured backend implements
200    /// `AtomicRemoteBackend` (S3/local always; HTTP only when
201    /// `RED_HTTP_CONDITIONAL_WRITES=true`). `None` for backends that
202    /// do not provide compare-and-swap (Turso, D1, plain HTTP).
203    /// `LeaseStore` and any future CAS consumer pull from this field.
204    pub remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
205    /// Remote object key used by the remote backend.
206    pub remote_key: Option<String>,
207    /// Replication configuration.
208    pub replication: ReplicationConfig,
209    /// Authentication & authorization configuration.
210    pub auth: AuthConfig,
211    /// Control Event Ledger configuration (issue #652). Read from
212    /// `REDDB_COMPLIANCE_MODE` at boot.
213    pub control_events: crate::runtime::control_events::ControlEventConfig,
214    /// Scoped data-plane query audit configuration. Disabled by
215    /// default; the regulated preset enables the stream without
216    /// adding catch-all rules.
217    pub query_audit: crate::runtime::query_audit::QueryAuditConfig,
218    /// Auto-create a HASH index on a user `id` column the first time a
219    /// row carrying that column is inserted into a collection. See
220    /// `UnifiedStoreConfig::auto_index_id`. Defaults to `true`; set to
221    /// `false` to opt out per workload (e.g. ingest pipelines that
222    /// don't need point-lookups by `id`).
223    pub auto_index_id: bool,
224    /// Tiered storage layout preset. Drives the defaults of the six
225    /// tier-flag toggles (`.meta.json` sidecar, seq-N catalog journal,
226    /// `-shm` provisioning, audit/slow log destinations, `fold_pager_meta`,
227    /// `fold_dwb_into_wal`). Explicit per-feature setters still win over
228    /// the tier default. See `crate::storage::layout::StorageLayout`.
229    pub layout: crate::storage::layout::StorageLayout,
230    /// Per-feature overrides applied on top of the resolved layout preset.
231    pub layout_overrides: crate::storage::layout::LayoutOverrides,
232    /// True only when the caller explicitly selected a layout via
233    /// `with_layout`. When false, `apply_tier_defaults` short-circuits so
234    /// pre-existing process-global toggles (set by tests / env hatches
235    /// before opening a runtime) are not clobbered by the implicit
236    /// `Standard` default. The new tier-driven behavior is opt-in.
237    pub layout_explicit: bool,
238}
239
240impl fmt::Debug for RedDBOptions {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        let backend_name = self.remote_backend.as_ref().map(|b| b.name().to_string());
243        f.debug_struct("RedDBOptions")
244            .field("mode", &self.mode)
245            .field("data_path", &self.data_path)
246            .field("read_only", &self.read_only)
247            .field("create_if_missing", &self.create_if_missing)
248            .field("verify_checksums", &self.verify_checksums)
249            .field("durability_mode", &self.durability_mode)
250            .field("group_commit", &self.group_commit)
251            .field("auto_checkpoint_pages", &self.auto_checkpoint_pages)
252            .field("cache_pages", &self.cache_pages)
253            .field("snapshot_retention", &self.snapshot_retention)
254            .field("export_retention", &self.export_retention)
255            .field("feature_gates", &self.feature_gates)
256            .field("force_create", &self.force_create)
257            .field("metadata", &self.metadata)
258            .field("remote_backend", &backend_name)
259            .field("remote_key", &self.remote_key)
260            .field("replication", &self.replication)
261            .field("auth", &self.auth)
262            .field("control_events", &self.control_events)
263            .field("query_audit", &self.query_audit)
264            .field("layout", &self.layout)
265            .field("layout_overrides", &self.layout_overrides)
266            .finish()
267    }
268}
269
270impl Clone for RedDBOptions {
271    fn clone(&self) -> Self {
272        Self {
273            mode: self.mode,
274            data_path: self.data_path.clone(),
275            read_only: self.read_only,
276            create_if_missing: self.create_if_missing,
277            verify_checksums: self.verify_checksums,
278            durability_mode: self.durability_mode,
279            group_commit: self.group_commit,
280            auto_checkpoint_pages: self.auto_checkpoint_pages,
281            cache_pages: self.cache_pages,
282            snapshot_retention: self.snapshot_retention,
283            export_retention: self.export_retention,
284            feature_gates: self.feature_gates.clone(),
285            force_create: self.force_create,
286            metadata: self.metadata.clone(),
287            remote_backend: self.remote_backend.clone(),
288            remote_backend_atomic: self.remote_backend_atomic.clone(),
289            remote_key: self.remote_key.clone(),
290            replication: self.replication.clone(),
291            auth: self.auth.clone(),
292            control_events: self.control_events,
293            query_audit: self.query_audit.clone(),
294            auto_index_id: self.auto_index_id,
295            layout: self.layout,
296            layout_overrides: self.layout_overrides.clone(),
297            layout_explicit: self.layout_explicit,
298        }
299    }
300}
301
302impl Default for RedDBOptions {
303    fn default() -> Self {
304        Self {
305            mode: StorageMode::Persistent,
306            data_path: None,
307            read_only: false,
308            create_if_missing: true,
309            verify_checksums: true,
310            // Perf-parity default — `WalDurableGrouped` matches
311            // PostgreSQL's `synchronous_commit=on` behaviour while
312            // amortising fsync cost across concurrent writers. The
313            // legacy `Strict` tier (per-commit fsync) stays available
314            // via `durability.mode = "strict"` / `REDDB_DURABILITY=strict`.
315            durability_mode: DurabilityMode::WalDurableGrouped,
316            group_commit: GroupCommitOptions::default(),
317            auto_checkpoint_pages: 1000,
318            cache_pages: 10_000,
319            snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
320            export_retention: DEFAULT_EXPORT_RETENTION,
321            feature_gates: CapabilitySet::new()
322                .with(Capability::Table)
323                .with(Capability::Graph)
324                .with(Capability::Vector),
325            force_create: true,
326            metadata: BTreeMap::new(),
327            remote_backend: None,
328            remote_backend_atomic: None,
329            remote_key: None,
330            replication: ReplicationConfig::standalone(),
331            auth: AuthConfig::default(),
332            control_events: crate::runtime::control_events::ControlEventConfig::default(),
333            query_audit: crate::runtime::query_audit::QueryAuditConfig::default(),
334            auto_index_id: true,
335            layout: crate::storage::layout::StorageLayout::default(),
336            layout_overrides: crate::storage::layout::LayoutOverrides::default(),
337            layout_explicit: false,
338        }
339    }
340}
341
342impl RedDBOptions {
343    pub fn persistent<P: Into<PathBuf>>(path: P) -> Self {
344        Self {
345            mode: StorageMode::Persistent,
346            data_path: Some(path.into()),
347            ..Default::default()
348        }
349    }
350
351    /// Ephemeral, tempfile-backed database.
352    ///
353    /// The underlying storage is a real persistent file placed under the system
354    /// temp directory with a unique name — there is no longer a true in-memory
355    /// execution mode. Prefer [`RedDBOptions::persistent`] when the data should
356    /// outlive the process.
357    pub fn in_memory() -> Self {
358        static NEXT_EPHEMERAL_ID: std::sync::atomic::AtomicU64 =
359            std::sync::atomic::AtomicU64::new(0);
360
361        let now_nanos = std::time::SystemTime::now()
362            .duration_since(std::time::UNIX_EPOCH)
363            .map(|duration| duration.as_nanos())
364            .unwrap_or(0);
365        let unique = NEXT_EPHEMERAL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
366        let path = std::env::temp_dir().join(format!(
367            "reddb-ephemeral-{}-{}-{}.rdb",
368            std::process::id(),
369            now_nanos,
370            unique
371        ));
372        let _ = std::fs::remove_file(&path);
373        Self {
374            mode: StorageMode::Persistent,
375            data_path: Some(path),
376            auto_checkpoint_pages: 0,
377            cache_pages: 2_000,
378            snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
379            export_retention: DEFAULT_EXPORT_RETENTION,
380            read_only: false,
381            force_create: true,
382            ..Default::default()
383        }
384    }
385
386    pub fn with_mode(mut self, mode: StorageMode) -> Self {
387        self.mode = mode;
388        self
389    }
390
391    pub fn with_data_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
392        self.data_path = Some(path.into());
393        self
394    }
395
396    pub fn with_read_only(mut self, read_only: bool) -> Self {
397        self.read_only = read_only;
398        self
399    }
400
401    pub fn with_auto_checkpoint(mut self, pages: u32) -> Self {
402        self.auto_checkpoint_pages = pages;
403        self
404    }
405
406    pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
407        self.durability_mode = mode;
408        self
409    }
410
411    pub fn with_group_commit_window_ms(mut self, window_ms: u64) -> Self {
412        // `0` is a legitimate setting — "no wait, fsync on every wakeup".
413        // See `DEFAULT_GROUP_COMMIT_WINDOW_MS` docs for the tradeoff.
414        self.group_commit.window_ms = window_ms;
415        self
416    }
417
418    pub fn with_group_commit_max_statements(mut self, max_statements: usize) -> Self {
419        self.group_commit.max_statements = max_statements.max(1);
420        self
421    }
422
423    pub fn with_group_commit_max_wal_bytes(mut self, max_wal_bytes: u64) -> Self {
424        self.group_commit.max_wal_bytes = max_wal_bytes.max(1);
425        self
426    }
427
428    pub fn with_cache_pages(mut self, pages: usize) -> Self {
429        self.cache_pages = pages.max(2);
430        self
431    }
432
433    pub fn with_snapshot_retention(mut self, limit: usize) -> Self {
434        self.snapshot_retention = limit.max(1);
435        self
436    }
437
438    pub fn with_export_retention(mut self, limit: usize) -> Self {
439        self.export_retention = limit.max(1);
440        self
441    }
442
443    pub fn with_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
444        self.metadata.insert(key.into(), value.into());
445        self
446    }
447
448    /// Toggle the implicit HASH index on user `id` columns at first
449    /// insert (#112). Defaults to enabled — pass `false` to fall back
450    /// to the legacy "scan unless `CREATE INDEX` is issued" behaviour.
451    pub fn with_auto_index_id(mut self, enabled: bool) -> Self {
452        self.auto_index_id = enabled;
453        self
454    }
455
456    pub fn with_capability(mut self, capability: Capability) -> Self {
457        self.feature_gates = self.feature_gates.with(capability);
458        self
459    }
460
461    /// Attach a remote storage backend for snapshot transport.
462    ///
463    /// On open, the database snapshot is downloaded from the remote `key`
464    /// to the local data path. On flush, the local file is uploaded back
465    /// to the remote backend under the same key.
466    pub fn with_remote_backend(
467        mut self,
468        backend: Arc<dyn crate::storage::backend::RemoteBackend>,
469        key: impl Into<String>,
470    ) -> Self {
471        self.remote_backend = Some(backend);
472        self.remote_key = Some(key.into());
473        self
474    }
475
476    /// Attach a CAS-capable backend handle. Pass the same `Arc` as
477    /// `with_remote_backend` (factories should construct the backend
478    /// once and call both setters); this method exists so the type
479    /// system, not runtime config, decides whether `LeaseStore` is
480    /// reachable.
481    pub fn with_atomic_remote_backend(
482        mut self,
483        backend: Arc<dyn crate::storage::backend::AtomicRemoteBackend>,
484    ) -> Self {
485        self.remote_backend_atomic = Some(backend);
486        self
487    }
488
489    pub fn with_replication(mut self, config: ReplicationConfig) -> Self {
490        self.replication = config;
491        self
492    }
493
494    pub fn with_auth(mut self, config: AuthConfig) -> Self {
495        self.auth = config;
496        self
497    }
498
499    pub fn resolved_path(&self, fallback: impl AsRef<Path>) -> PathBuf {
500        self.data_path
501            .clone()
502            .unwrap_or_else(|| fallback.as_ref().to_path_buf())
503    }
504
505    pub fn remote_namespace_prefix(&self) -> String {
506        let Some(remote_key) = &self.remote_key else {
507            return String::new();
508        };
509        let normalized = remote_key.trim_matches('/');
510        if normalized.is_empty() {
511            return String::new();
512        }
513        match normalized.rsplit_once('/') {
514            Some((parent, _)) if !parent.is_empty() => format!("{parent}/"),
515            _ => String::new(),
516        }
517    }
518
519    pub fn default_backup_head_key(&self) -> String {
520        if let Some(value) = self.metadata.get("red.config.backup.head_key") {
521            return value.clone();
522        }
523        format!("{}manifests/head.json", self.remote_namespace_prefix())
524    }
525
526    pub fn default_snapshot_prefix(&self) -> String {
527        if let Some(value) = self.metadata.get("red.config.backup.snapshot_prefix") {
528            return value.clone();
529        }
530        format!("{}snapshots/", self.remote_namespace_prefix())
531    }
532
533    pub fn default_wal_archive_prefix(&self) -> String {
534        if let Some(value) = self.metadata.get("red.config.wal.archive.prefix") {
535            return value.clone();
536        }
537        format!("{}wal/", self.remote_namespace_prefix())
538    }
539
540    pub fn has_capability(&self, capability: Capability) -> bool {
541        self.feature_gates.has(capability)
542    }
543
544    /// Select the active storage-layout preset. Drives tier defaults for
545    /// the six tier-flag toggles. Per-feature setters still win.
546    pub fn with_layout(mut self, layout: crate::storage::layout::StorageLayout) -> Self {
547        self.layout = layout;
548        self.layout_explicit = true;
549        self
550    }
551
552    /// Override individual layout knobs (dedicated dirs + log routing) on
553    /// top of the active preset.
554    pub fn with_layout_overrides(
555        mut self,
556        overrides: crate::storage::layout::LayoutOverrides,
557    ) -> Self {
558        self.layout_overrides = overrides;
559        self
560    }
561
562    /// Resolve `(data_path, TieredLayoutPaths)` for this options bundle.
563    /// Returns `None` when `data_path` is unset (in-memory ephemeral
564    /// instances don't materialise a support tree).
565    pub fn resolve_tiered_layout(
566        &self,
567    ) -> Option<(PathBuf, crate::storage::layout::TieredLayoutPaths)> {
568        let data_path = self.data_path.clone()?;
569        let paths = crate::storage::layout::TieredLayoutPaths::new(
570            &data_path,
571            self.layout,
572            self.layout_overrides.clone(),
573        );
574        Some((data_path, paths))
575    }
576
577    /// Flip the process-global tier-flag toggles to match this options
578    /// bundle's layout, and stash the resolved [`TieredLayoutPaths`] for
579    /// status surfaces. Idempotent. Per-feature env escape hatches
580    /// (`REDDB_META_JSON_SIDECAR=...` and friends) still override what
581    /// this method sets — they are read inside the per-toggle getter, not
582    /// here.
583    ///
584    /// Tier defaults:
585    ///
586    /// | toggle                     | minimal | standard | performance | max |
587    /// |----------------------------|:-------:|:--------:|:-----------:|:---:|
588    /// | `.meta.json` sidecar       |   off   |    off   |     off     |  on |
589    /// | seq-N catalog journal      |   off   |    off   |     off     |  on |
590    /// | `-shm` provisioning        |   off   | **on**   |   **on**    |  on |
591    /// | `fold_pager_meta`          |   off   |    off   |     off     |  on |
592    /// | `fold_dwb_into_wal`        |   off   |    off   |     off     |  on |
593    /// | audit/slow log destination | stderr  |  stderr  |  file       | file|
594    ///
595    /// Retention for the seq-N journal: 32 at `Max`, 4 when the operator
596    /// opts in on a lower tier via `REDDB_SEQN_JOURNAL=1`.
597    pub fn apply_tier_defaults(&self) {
598        use crate::storage::layout::StorageLayout;
599
600        // Opt-in: only flip the process-global toggles when the operator
601        // explicitly chose a layout via `with_layout`. Tests and env
602        // hatches that pre-set toggles before opening a runtime must not
603        // be silently overridden by the implicit `Standard` default.
604        if !self.layout_explicit {
605            if let Some((_, paths)) = self.resolve_tiered_layout() {
606                tier_wiring::stash_layout_paths(paths);
607            }
608            return;
609        }
610
611        let layout = self.layout;
612        // .meta.json sidecar — Max only.
613        crate::physical::set_meta_json_sidecar_enabled(matches!(layout, StorageLayout::Max));
614
615        // Seq-N catalog journal — Max only. Retention = 32 for Max,
616        // OPT_IN baseline (4) for lower tiers (the value is consulted
617        // when an env hatch flips the toggle on outside Max).
618        crate::physical::set_seqn_journal_enabled(matches!(layout, StorageLayout::Max));
619        crate::physical::set_seqn_journal_retention(match layout {
620            StorageLayout::Max => crate::physical::DEFAULT_METADATA_JOURNAL_RETENTION,
621            _ => crate::physical::OPT_IN_METADATA_JOURNAL_RETENTION,
622        });
623
624        // `-shm` provisioning — anything ≥ Standard (gh-475 acceptance).
625        crate::physical::set_shm_provisioning_enabled(matches!(
626            layout,
627            StorageLayout::Standard | StorageLayout::Performance | StorageLayout::Max
628        ));
629
630        // Fold pager meta + fold DWB into WAL — Max only (single
631        // datafile + single recovery substrate is the Max story).
632        crate::physical::set_fold_pager_meta_enabled(matches!(layout, StorageLayout::Max));
633        crate::physical::set_fold_dwb_into_wal_enabled(matches!(layout, StorageLayout::Max));
634
635        // Cache resolved layout paths for `red status` / diagnostics.
636        if let Some((_, paths)) = self.resolve_tiered_layout() {
637            tier_wiring::stash_layout_paths(paths);
638        }
639    }
640}
641
642/// Process-global cache of the most recently applied [`TieredLayoutPaths`].
643/// Read by `red status` to surface resolved log destinations. Written by
644/// `RedDBOptions::apply_tier_defaults` after each open.
645pub mod tier_wiring {
646    use std::sync::Mutex;
647
648    use crate::storage::layout::{LogDestination, TieredLayoutPaths};
649
650    static CURRENT_LAYOUT_PATHS: Mutex<Option<TieredLayoutPaths>> = Mutex::new(None);
651
652    pub fn stash_layout_paths(paths: TieredLayoutPaths) {
653        if let Ok(mut slot) = CURRENT_LAYOUT_PATHS.lock() {
654            *slot = Some(paths);
655        }
656    }
657
658    pub fn current_layout_paths() -> Option<TieredLayoutPaths> {
659        CURRENT_LAYOUT_PATHS
660            .lock()
661            .ok()
662            .and_then(|slot| slot.clone())
663    }
664
665    /// `(audit_log, slow_log)` destinations resolved from the active
666    /// layout. Falls back to `(Stderr, Stderr)` when no layout has been
667    /// applied yet (e.g. ephemeral in-memory paths).
668    pub fn current_log_destinations() -> (LogDestination, LogDestination) {
669        match current_layout_paths() {
670            Some(p) => (p.audit_log_destination, p.slow_log_destination),
671            None => (LogDestination::Stderr, LogDestination::Stderr),
672        }
673    }
674}
675
676#[derive(Debug, Clone, Default)]
677pub struct CollectionStats {
678    pub entities: usize,
679    pub cross_refs: usize,
680    pub segments: usize,
681}
682
683#[derive(Debug, Clone)]
684pub struct CatalogSnapshot {
685    pub name: String,
686    pub total_entities: usize,
687    pub total_collections: usize,
688    pub stats_by_collection: BTreeMap<String, CollectionStats>,
689    pub updated_at: SystemTime,
690}
691
692impl Default for CatalogSnapshot {
693    fn default() -> Self {
694        Self {
695            name: String::new(),
696            total_entities: 0,
697            total_collections: 0,
698            stats_by_collection: BTreeMap::new(),
699            updated_at: UNIX_EPOCH,
700        }
701    }
702}
703
704#[derive(Debug, Clone)]
705pub struct SchemaManifest {
706    pub format_version: u32,
707    pub created_at_unix_ms: u128,
708    pub updated_at_unix_ms: u128,
709    pub options: RedDBOptions,
710    pub collection_count: usize,
711}
712
713impl SchemaManifest {
714    pub fn now(options: RedDBOptions, collection_count: usize) -> Self {
715        let now = SystemTime::now()
716            .duration_since(UNIX_EPOCH)
717            .unwrap_or_default()
718            .as_millis();
719        Self {
720            format_version: REDDB_FORMAT_VERSION,
721            created_at_unix_ms: now,
722            updated_at_unix_ms: now,
723            options,
724            collection_count,
725        }
726    }
727}
728
729#[derive(Debug)]
730pub enum RedDBError {
731    InvalidConfig(String),
732    SchemaVersionMismatch {
733        expected: u32,
734        found: u32,
735    },
736    FeatureNotEnabled(String),
737    NotFound(String),
738    ReadOnly(String),
739    InvalidOperation(String),
740    Engine(String),
741    Catalog(String),
742    Query(String),
743    Validation {
744        message: String,
745        validation: crate::json::Value,
746    },
747    Io(io::Error),
748    VersionUnavailable,
749    /// Operator-pinned cap exceeded (PLAN.md Phase 4.1). The string
750    /// payload should follow the `quota_exceeded:<limit_name>:<current>:<max>`
751    /// shape so HTTP / wire surfaces can map to the right status
752    /// (507 for storage, 429 for rate, 504 for duration, 413 for
753    /// payload).
754    QuotaExceeded(String),
755    /// Issue #769 (PRD #759 / S10) — an aggregating executor
756    /// (`aggregation`, `sort`, `window`) exceeded
757    /// `stream.executor.max_materialized_rows` in its in-memory state.
758    /// Carries the executor that fired, the configured ceiling and the
759    /// live row count at the breach so the client can decide whether to
760    /// redesign the query, raise the limit, or pre-aggregate. Surfaces
761    /// as HTTP 507 with a structured `materialization_limit_exceeded`
762    /// envelope; NDJSON streams emit the same code mid-stream.
763    MaterializationLimitExceeded {
764        executor: &'static str,
765        limit: usize,
766        current: usize,
767    },
768    Internal(String),
769}
770
771impl fmt::Display for RedDBError {
772    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773        match self {
774            Self::InvalidConfig(msg) => write!(f, "invalid config: {msg}"),
775            Self::SchemaVersionMismatch { expected, found } => {
776                write!(
777                    f,
778                    "schema version mismatch: expected {expected}, found {found}"
779                )
780            }
781            Self::FeatureNotEnabled(msg) => write!(f, "feature disabled: {msg}"),
782            Self::NotFound(msg) => write!(f, "not found: {msg}"),
783            Self::ReadOnly(msg) => write!(f, "read-only violation: {msg}"),
784            Self::InvalidOperation(msg) => write!(f, "INVALID_OPERATION: {msg}"),
785            Self::Engine(msg) => write!(f, "engine error: {msg}"),
786            Self::Catalog(msg) => write!(f, "catalog error: {msg}"),
787            Self::Query(msg) => write!(f, "query error: {msg}"),
788            Self::Validation { message, .. } => write!(f, "validation error: {message}"),
789            Self::Io(err) => write!(f, "io error: {err}"),
790            Self::VersionUnavailable => write!(f, "version information unavailable"),
791            Self::QuotaExceeded(msg) => write!(f, "quota exceeded: {msg}"),
792            Self::MaterializationLimitExceeded {
793                executor,
794                limit,
795                current,
796            } => write!(
797                f,
798                "materialization limit exceeded: executor={executor} current={current} limit={limit}"
799            ),
800            Self::Internal(msg) => write!(f, "internal error: {msg}"),
801        }
802    }
803}
804
805impl std::error::Error for RedDBError {}
806
807impl From<io::Error> for RedDBError {
808    fn from(err: io::Error) -> Self {
809        Self::Io(err)
810    }
811}
812
813impl From<crate::storage::engine::DatabaseError> for RedDBError {
814    fn from(err: crate::storage::engine::DatabaseError) -> Self {
815        Self::Engine(err.to_string())
816    }
817}
818
819impl From<crate::storage::wal::TxError> for RedDBError {
820    fn from(err: crate::storage::wal::TxError) -> Self {
821        Self::Engine(err.to_string())
822    }
823}
824
825impl From<crate::storage::StoreError> for RedDBError {
826    fn from(err: crate::storage::StoreError) -> Self {
827        Self::Catalog(err.to_string())
828    }
829}
830
831impl From<crate::storage::unified::devx::DevXError> for RedDBError {
832    fn from(err: crate::storage::unified::devx::DevXError) -> Self {
833        match err {
834            crate::storage::unified::devx::DevXError::Validation(msg) => Self::InvalidConfig(msg),
835            crate::storage::unified::devx::DevXError::Storage(msg) => Self::Engine(msg),
836            crate::storage::unified::devx::DevXError::NotFound(msg) => Self::NotFound(msg),
837        }
838    }
839}
840
841pub trait CatalogService {
842    fn list_collections(&self) -> Vec<String>;
843    fn collection_stats(&self, collection: &str) -> Option<CollectionStats>;
844    fn catalog_snapshot(&self) -> CatalogSnapshot;
845}
846
847pub trait QueryPlanner {
848    fn plan_cost(&self, query: &str) -> Option<f64>;
849}
850
851pub trait DataOps {
852    fn execute_query(&self, query: &str) -> RedDBResult<()>;
853}
854
855pub mod prelude {
856    pub use super::{
857        Capability, CapabilitySet, CatalogService, CatalogSnapshot, CollectionStats, DataOps,
858        QueryPlanner, RedDBError, RedDBOptions, RedDBResult, SchemaManifest, StorageMode,
859        REDDB_FORMAT_VERSION, REDDB_PROTOCOL_VERSION,
860    };
861}