Skip to main content

reddb_server/
api.rs

1//! Public API layer for the RedDB crate.
2//!
3//! This module is the first layer to consume from applications:
4//! - stable options and contracts
5//! - capability declarations
6//! - typed errors and lightweight metadata snapshots
7//! - cross-layer traits for catalog/operations observability
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::io;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::auth::AuthConfig;
17use crate::replication::ReplicationConfig;
18
19pub const DEFAULT_SNAPSHOT_RETENTION: usize = 16;
20pub const DEFAULT_EXPORT_RETENTION: usize = 16;
21
22pub const REDDB_PROTOCOL_VERSION: &str = "reddb-v2";
23pub const REDDB_FORMAT_VERSION: u32 = 2;
24/// Default group-commit window.
25///
26/// `0` = "no wait" — the background flusher fsyncs as soon as any
27/// writer's pending commit arrives. Under single-writer workloads a
28/// non-zero window would be pure latency (no one to batch with),
29/// capping individual commit throughput at ~1000/s for window=1.
30/// Concurrent writers still batch naturally via the `Mutex<WalWriter>`
31/// contention path without needing an explicit timer.
32///
33/// Operators with many concurrent clients can raise this (e.g. 1-5ms)
34/// to amortise fsync cost across a bigger batch — at the cost of p99
35/// tail latency going up by the window size.
36pub const DEFAULT_GROUP_COMMIT_WINDOW_MS: u64 = 0;
37pub const DEFAULT_GROUP_COMMIT_MAX_STATEMENTS: usize = 128;
38pub const DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES: u64 = 1024 * 1024;
39
40pub type RedDBResult<T> = Result<T, RedDBError>;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
43pub enum StorageMode {
44    /// Durable, file-backed database with WAL + checkpointing.
45    #[default]
46    Persistent,
47}
48
49impl StorageMode {
50    pub const fn is_persistent(self) -> bool {
51        matches!(self, Self::Persistent)
52    }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
56pub enum DurabilityMode {
57    #[default]
58    Strict,
59    WalDurableGrouped,
60    /// Fire-and-forget. Writers return as soon as the WAL record is
61    /// in the in-memory ring buffer — they do NOT wait for fsync.
62    /// The group-commit background thread still flushes on its
63    /// cadence, so committed-but-unflushed work is bounded by
64    /// `GroupCommitOptions::window_ms`. A crash inside the window
65    /// drops whatever wasn't flushed — matches PG's
66    /// `synchronous_commit=off` contract.
67    Async,
68}
69
70impl DurabilityMode {
71    pub const fn as_str(self) -> &'static str {
72        match self {
73            Self::Strict => "strict",
74            Self::WalDurableGrouped => "wal_durable_grouped",
75            Self::Async => "async",
76        }
77    }
78
79    pub fn from_str(value: &str) -> Option<Self> {
80        let normalized = value.trim().to_ascii_lowercase();
81        match normalized.as_str() {
82            // Legacy / opt-out form. Every commit pays its own fsync.
83            "strict" => Some(Self::Strict),
84            // Group-commit sync path — the perf-parity default. Matches
85            // PostgreSQL's `synchronous_commit=on` behaviour: the
86            // writer waits for durability, but fsyncs are batched
87            // across concurrent writers so a burst of N commits pays
88            // ~O(1) fsyncs instead of O(N).
89            "sync"
90            | "wal_durable_grouped"
91            | "wal-durable-grouped"
92            | "grouped"
93            | "wal_grouped"
94            | "wal-grouped" => Some(Self::WalDurableGrouped),
95            // Fire-and-forget async: writers return as soon as the
96            // WAL buffer accepts the record; background flusher runs
97            // on its configured cadence.
98            "async" | "fire_and_forget" | "fire-and-forget" => Some(Self::Async),
99            _ => None,
100        }
101    }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct GroupCommitOptions {
106    pub window_ms: u64,
107    pub max_statements: usize,
108    pub max_wal_bytes: u64,
109}
110
111impl Default for GroupCommitOptions {
112    fn default() -> Self {
113        Self {
114            window_ms: DEFAULT_GROUP_COMMIT_WINDOW_MS,
115            max_statements: DEFAULT_GROUP_COMMIT_MAX_STATEMENTS,
116            max_wal_bytes: DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES,
117        }
118    }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
122pub enum Capability {
123    /// Structured row storage.
124    Table,
125    /// Graph nodes/edges.
126    Graph,
127    /// Vector collections and ANN search.
128    Vector,
129    /// Full-text / lexical search.
130    FullText,
131    /// Text/metadata security and enrichment modules.
132    Security,
133    /// Encryption at rest.
134    Encryption,
135}
136
137impl Capability {
138    pub const fn as_str(self) -> &'static str {
139        match self {
140            Self::Table => "table",
141            Self::Graph => "graph",
142            Self::Vector => "vector",
143            Self::FullText => "fulltext",
144            Self::Security => "security",
145            Self::Encryption => "encryption",
146        }
147    }
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct CapabilitySet {
152    items: BTreeSet<Capability>,
153}
154
155impl CapabilitySet {
156    pub fn new() -> Self {
157        Self::default()
158    }
159
160    pub fn with(mut self, capability: Capability) -> Self {
161        self.items.insert(capability);
162        self
163    }
164
165    pub fn with_all(mut self, capabilities: &[Capability]) -> Self {
166        capabilities.iter().copied().for_each(|capability| {
167            self.items.insert(capability);
168        });
169        self
170    }
171
172    pub fn has(&self, capability: Capability) -> bool {
173        self.items.contains(&capability)
174    }
175
176    pub fn as_slice(&self) -> Vec<Capability> {
177        self.items.iter().copied().collect()
178    }
179}
180
181pub struct RedDBOptions {
182    pub mode: StorageMode,
183    pub data_path: Option<PathBuf>,
184    pub read_only: bool,
185    pub create_if_missing: bool,
186    pub verify_checksums: bool,
187    pub durability_mode: DurabilityMode,
188    pub group_commit: GroupCommitOptions,
189    pub auto_checkpoint_pages: u32,
190    pub cache_pages: usize,
191    pub snapshot_retention: usize,
192    pub export_retention: usize,
193    pub feature_gates: CapabilitySet,
194    pub force_create: bool,
195    pub metadata: BTreeMap<String, String>,
196    /// Optional remote storage backend for snapshot transport.
197    pub remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
198    /// Optional CAS-capable handle to the same backend, populated by
199    /// the factory when the configured backend implements
200    /// `AtomicRemoteBackend` (S3/local always; HTTP only when
201    /// `RED_HTTP_CONDITIONAL_WRITES=true`). `None` for backends that
202    /// do not provide compare-and-swap (Turso, D1, plain HTTP).
203    /// `LeaseStore` and any future CAS consumer pull from this field.
204    pub remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
205    /// Remote object key used by the remote backend.
206    pub remote_key: Option<String>,
207    /// Replication configuration.
208    pub replication: ReplicationConfig,
209    /// Authentication & authorization configuration.
210    pub auth: AuthConfig,
211    /// Auto-create a HASH index on a user `id` column the first time a
212    /// row carrying that column is inserted into a collection. See
213    /// `UnifiedStoreConfig::auto_index_id`. Defaults to `true`; set to
214    /// `false` to opt out per workload (e.g. ingest pipelines that
215    /// don't need point-lookups by `id`).
216    pub auto_index_id: bool,
217}
218
219impl fmt::Debug for RedDBOptions {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        let backend_name = self.remote_backend.as_ref().map(|b| b.name().to_string());
222        f.debug_struct("RedDBOptions")
223            .field("mode", &self.mode)
224            .field("data_path", &self.data_path)
225            .field("read_only", &self.read_only)
226            .field("create_if_missing", &self.create_if_missing)
227            .field("verify_checksums", &self.verify_checksums)
228            .field("durability_mode", &self.durability_mode)
229            .field("group_commit", &self.group_commit)
230            .field("auto_checkpoint_pages", &self.auto_checkpoint_pages)
231            .field("cache_pages", &self.cache_pages)
232            .field("snapshot_retention", &self.snapshot_retention)
233            .field("export_retention", &self.export_retention)
234            .field("feature_gates", &self.feature_gates)
235            .field("force_create", &self.force_create)
236            .field("metadata", &self.metadata)
237            .field("remote_backend", &backend_name)
238            .field("remote_key", &self.remote_key)
239            .field("replication", &self.replication)
240            .field("auth", &self.auth)
241            .finish()
242    }
243}
244
245impl Clone for RedDBOptions {
246    fn clone(&self) -> Self {
247        Self {
248            mode: self.mode,
249            data_path: self.data_path.clone(),
250            read_only: self.read_only,
251            create_if_missing: self.create_if_missing,
252            verify_checksums: self.verify_checksums,
253            durability_mode: self.durability_mode,
254            group_commit: self.group_commit,
255            auto_checkpoint_pages: self.auto_checkpoint_pages,
256            cache_pages: self.cache_pages,
257            snapshot_retention: self.snapshot_retention,
258            export_retention: self.export_retention,
259            feature_gates: self.feature_gates.clone(),
260            force_create: self.force_create,
261            metadata: self.metadata.clone(),
262            remote_backend: self.remote_backend.clone(),
263            remote_backend_atomic: self.remote_backend_atomic.clone(),
264            remote_key: self.remote_key.clone(),
265            replication: self.replication.clone(),
266            auth: self.auth.clone(),
267            auto_index_id: self.auto_index_id,
268        }
269    }
270}
271
272impl Default for RedDBOptions {
273    fn default() -> Self {
274        Self {
275            mode: StorageMode::Persistent,
276            data_path: None,
277            read_only: false,
278            create_if_missing: true,
279            verify_checksums: true,
280            // Perf-parity default — `WalDurableGrouped` matches
281            // PostgreSQL's `synchronous_commit=on` behaviour while
282            // amortising fsync cost across concurrent writers. The
283            // legacy `Strict` tier (per-commit fsync) stays available
284            // via `durability.mode = "strict"` / `REDDB_DURABILITY=strict`.
285            durability_mode: DurabilityMode::WalDurableGrouped,
286            group_commit: GroupCommitOptions::default(),
287            auto_checkpoint_pages: 1000,
288            cache_pages: 10_000,
289            snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
290            export_retention: DEFAULT_EXPORT_RETENTION,
291            feature_gates: CapabilitySet::new()
292                .with(Capability::Table)
293                .with(Capability::Graph)
294                .with(Capability::Vector),
295            force_create: true,
296            metadata: BTreeMap::new(),
297            remote_backend: None,
298            remote_backend_atomic: None,
299            remote_key: None,
300            replication: ReplicationConfig::standalone(),
301            auth: AuthConfig::default(),
302            auto_index_id: true,
303        }
304    }
305}
306
307impl RedDBOptions {
308    pub fn persistent<P: Into<PathBuf>>(path: P) -> Self {
309        Self {
310            mode: StorageMode::Persistent,
311            data_path: Some(path.into()),
312            ..Default::default()
313        }
314    }
315
316    /// Ephemeral, tempfile-backed database.
317    ///
318    /// The underlying storage is a real persistent file placed under the system
319    /// temp directory with a unique name — there is no longer a true in-memory
320    /// execution mode. Prefer [`RedDBOptions::persistent`] when the data should
321    /// outlive the process.
322    pub fn in_memory() -> Self {
323        static NEXT_EPHEMERAL_ID: std::sync::atomic::AtomicU64 =
324            std::sync::atomic::AtomicU64::new(0);
325
326        let now_nanos = std::time::SystemTime::now()
327            .duration_since(std::time::UNIX_EPOCH)
328            .map(|duration| duration.as_nanos())
329            .unwrap_or(0);
330        let unique = NEXT_EPHEMERAL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
331        let path = std::env::temp_dir().join(format!(
332            "reddb-ephemeral-{}-{}-{}.rdb",
333            std::process::id(),
334            now_nanos,
335            unique
336        ));
337        let _ = std::fs::remove_file(&path);
338        Self {
339            mode: StorageMode::Persistent,
340            data_path: Some(path),
341            auto_checkpoint_pages: 0,
342            cache_pages: 2_000,
343            snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
344            export_retention: DEFAULT_EXPORT_RETENTION,
345            read_only: false,
346            force_create: true,
347            ..Default::default()
348        }
349    }
350
351    pub fn with_mode(mut self, mode: StorageMode) -> Self {
352        self.mode = mode;
353        self
354    }
355
356    pub fn with_data_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
357        self.data_path = Some(path.into());
358        self
359    }
360
361    pub fn with_read_only(mut self, read_only: bool) -> Self {
362        self.read_only = read_only;
363        self
364    }
365
366    pub fn with_auto_checkpoint(mut self, pages: u32) -> Self {
367        self.auto_checkpoint_pages = pages;
368        self
369    }
370
371    pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
372        self.durability_mode = mode;
373        self
374    }
375
376    pub fn with_group_commit_window_ms(mut self, window_ms: u64) -> Self {
377        // `0` is a legitimate setting — "no wait, fsync on every wakeup".
378        // See `DEFAULT_GROUP_COMMIT_WINDOW_MS` docs for the tradeoff.
379        self.group_commit.window_ms = window_ms;
380        self
381    }
382
383    pub fn with_group_commit_max_statements(mut self, max_statements: usize) -> Self {
384        self.group_commit.max_statements = max_statements.max(1);
385        self
386    }
387
388    pub fn with_group_commit_max_wal_bytes(mut self, max_wal_bytes: u64) -> Self {
389        self.group_commit.max_wal_bytes = max_wal_bytes.max(1);
390        self
391    }
392
393    pub fn with_cache_pages(mut self, pages: usize) -> Self {
394        self.cache_pages = pages.max(2);
395        self
396    }
397
398    pub fn with_snapshot_retention(mut self, limit: usize) -> Self {
399        self.snapshot_retention = limit.max(1);
400        self
401    }
402
403    pub fn with_export_retention(mut self, limit: usize) -> Self {
404        self.export_retention = limit.max(1);
405        self
406    }
407
408    pub fn with_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
409        self.metadata.insert(key.into(), value.into());
410        self
411    }
412
413    /// Toggle the implicit HASH index on user `id` columns at first
414    /// insert (#112). Defaults to enabled — pass `false` to fall back
415    /// to the legacy "scan unless `CREATE INDEX` is issued" behaviour.
416    pub fn with_auto_index_id(mut self, enabled: bool) -> Self {
417        self.auto_index_id = enabled;
418        self
419    }
420
421    pub fn with_capability(mut self, capability: Capability) -> Self {
422        self.feature_gates = self.feature_gates.with(capability);
423        self
424    }
425
426    /// Attach a remote storage backend for snapshot transport.
427    ///
428    /// On open, the database snapshot is downloaded from the remote `key`
429    /// to the local data path. On flush, the local file is uploaded back
430    /// to the remote backend under the same key.
431    pub fn with_remote_backend(
432        mut self,
433        backend: Arc<dyn crate::storage::backend::RemoteBackend>,
434        key: impl Into<String>,
435    ) -> Self {
436        self.remote_backend = Some(backend);
437        self.remote_key = Some(key.into());
438        self
439    }
440
441    /// Attach a CAS-capable backend handle. Pass the same `Arc` as
442    /// `with_remote_backend` (factories should construct the backend
443    /// once and call both setters); this method exists so the type
444    /// system, not runtime config, decides whether `LeaseStore` is
445    /// reachable.
446    pub fn with_atomic_remote_backend(
447        mut self,
448        backend: Arc<dyn crate::storage::backend::AtomicRemoteBackend>,
449    ) -> Self {
450        self.remote_backend_atomic = Some(backend);
451        self
452    }
453
454    pub fn with_replication(mut self, config: ReplicationConfig) -> Self {
455        self.replication = config;
456        self
457    }
458
459    pub fn with_auth(mut self, config: AuthConfig) -> Self {
460        self.auth = config;
461        self
462    }
463
464    pub fn resolved_path(&self, fallback: impl AsRef<Path>) -> PathBuf {
465        self.data_path
466            .clone()
467            .unwrap_or_else(|| fallback.as_ref().to_path_buf())
468    }
469
470    pub fn remote_namespace_prefix(&self) -> String {
471        let Some(remote_key) = &self.remote_key else {
472            return String::new();
473        };
474        let normalized = remote_key.trim_matches('/');
475        if normalized.is_empty() {
476            return String::new();
477        }
478        match normalized.rsplit_once('/') {
479            Some((parent, _)) if !parent.is_empty() => format!("{parent}/"),
480            _ => String::new(),
481        }
482    }
483
484    pub fn default_backup_head_key(&self) -> String {
485        if let Some(value) = self.metadata.get("red.config.backup.head_key") {
486            return value.clone();
487        }
488        format!("{}manifests/head.json", self.remote_namespace_prefix())
489    }
490
491    pub fn default_snapshot_prefix(&self) -> String {
492        if let Some(value) = self.metadata.get("red.config.backup.snapshot_prefix") {
493            return value.clone();
494        }
495        format!("{}snapshots/", self.remote_namespace_prefix())
496    }
497
498    pub fn default_wal_archive_prefix(&self) -> String {
499        if let Some(value) = self.metadata.get("red.config.wal.archive.prefix") {
500            return value.clone();
501        }
502        format!("{}wal/", self.remote_namespace_prefix())
503    }
504
505    pub fn has_capability(&self, capability: Capability) -> bool {
506        self.feature_gates.has(capability)
507    }
508}
509
510#[derive(Debug, Clone, Default)]
511pub struct CollectionStats {
512    pub entities: usize,
513    pub cross_refs: usize,
514    pub segments: usize,
515}
516
517#[derive(Debug, Clone)]
518pub struct CatalogSnapshot {
519    pub name: String,
520    pub total_entities: usize,
521    pub total_collections: usize,
522    pub stats_by_collection: BTreeMap<String, CollectionStats>,
523    pub updated_at: SystemTime,
524}
525
526impl Default for CatalogSnapshot {
527    fn default() -> Self {
528        Self {
529            name: String::new(),
530            total_entities: 0,
531            total_collections: 0,
532            stats_by_collection: BTreeMap::new(),
533            updated_at: UNIX_EPOCH,
534        }
535    }
536}
537
538#[derive(Debug, Clone)]
539pub struct SchemaManifest {
540    pub format_version: u32,
541    pub created_at_unix_ms: u128,
542    pub updated_at_unix_ms: u128,
543    pub options: RedDBOptions,
544    pub collection_count: usize,
545}
546
547impl SchemaManifest {
548    pub fn now(options: RedDBOptions, collection_count: usize) -> Self {
549        let now = SystemTime::now()
550            .duration_since(UNIX_EPOCH)
551            .unwrap_or_default()
552            .as_millis();
553        Self {
554            format_version: REDDB_FORMAT_VERSION,
555            created_at_unix_ms: now,
556            updated_at_unix_ms: now,
557            options,
558            collection_count,
559        }
560    }
561}
562
563#[derive(Debug)]
564pub enum RedDBError {
565    InvalidConfig(String),
566    SchemaVersionMismatch {
567        expected: u32,
568        found: u32,
569    },
570    FeatureNotEnabled(String),
571    NotFound(String),
572    ReadOnly(String),
573    InvalidOperation(String),
574    Engine(String),
575    Catalog(String),
576    Query(String),
577    Io(io::Error),
578    VersionUnavailable,
579    /// Operator-pinned cap exceeded (PLAN.md Phase 4.1). The string
580    /// payload should follow the `quota_exceeded:<limit_name>:<current>:<max>`
581    /// shape so HTTP / wire surfaces can map to the right status
582    /// (507 for storage, 429 for rate, 504 for duration, 413 for
583    /// payload).
584    QuotaExceeded(String),
585    Internal(String),
586}
587
588impl fmt::Display for RedDBError {
589    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
590        match self {
591            Self::InvalidConfig(msg) => write!(f, "invalid config: {msg}"),
592            Self::SchemaVersionMismatch { expected, found } => {
593                write!(
594                    f,
595                    "schema version mismatch: expected {expected}, found {found}"
596                )
597            }
598            Self::FeatureNotEnabled(msg) => write!(f, "feature disabled: {msg}"),
599            Self::NotFound(msg) => write!(f, "not found: {msg}"),
600            Self::ReadOnly(msg) => write!(f, "read-only violation: {msg}"),
601            Self::InvalidOperation(msg) => write!(f, "INVALID_OPERATION: {msg}"),
602            Self::Engine(msg) => write!(f, "engine error: {msg}"),
603            Self::Catalog(msg) => write!(f, "catalog error: {msg}"),
604            Self::Query(msg) => write!(f, "query error: {msg}"),
605            Self::Io(err) => write!(f, "io error: {err}"),
606            Self::VersionUnavailable => write!(f, "version information unavailable"),
607            Self::QuotaExceeded(msg) => write!(f, "quota exceeded: {msg}"),
608            Self::Internal(msg) => write!(f, "internal error: {msg}"),
609        }
610    }
611}
612
613impl std::error::Error for RedDBError {}
614
615impl From<io::Error> for RedDBError {
616    fn from(err: io::Error) -> Self {
617        Self::Io(err)
618    }
619}
620
621impl From<crate::storage::engine::DatabaseError> for RedDBError {
622    fn from(err: crate::storage::engine::DatabaseError) -> Self {
623        Self::Engine(err.to_string())
624    }
625}
626
627impl From<crate::storage::wal::TxError> for RedDBError {
628    fn from(err: crate::storage::wal::TxError) -> Self {
629        Self::Engine(err.to_string())
630    }
631}
632
633impl From<crate::storage::StoreError> for RedDBError {
634    fn from(err: crate::storage::StoreError) -> Self {
635        Self::Catalog(err.to_string())
636    }
637}
638
639impl From<crate::storage::unified::devx::DevXError> for RedDBError {
640    fn from(err: crate::storage::unified::devx::DevXError) -> Self {
641        match err {
642            crate::storage::unified::devx::DevXError::Validation(msg) => Self::InvalidConfig(msg),
643            crate::storage::unified::devx::DevXError::Storage(msg) => Self::Engine(msg),
644            crate::storage::unified::devx::DevXError::NotFound(msg) => Self::NotFound(msg),
645        }
646    }
647}
648
649pub trait CatalogService {
650    fn list_collections(&self) -> Vec<String>;
651    fn collection_stats(&self, collection: &str) -> Option<CollectionStats>;
652    fn catalog_snapshot(&self) -> CatalogSnapshot;
653}
654
655pub trait QueryPlanner {
656    fn plan_cost(&self, query: &str) -> Option<f64>;
657}
658
659pub trait DataOps {
660    fn execute_query(&self, query: &str) -> RedDBResult<()>;
661}
662
663pub mod prelude {
664    pub use super::{
665        Capability, CapabilitySet, CatalogService, CatalogSnapshot, CollectionStats, DataOps,
666        QueryPlanner, RedDBError, RedDBOptions, RedDBResult, SchemaManifest, StorageMode,
667        REDDB_FORMAT_VERSION, REDDB_PROTOCOL_VERSION,
668    };
669}