1use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::io;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::auth::AuthConfig;
17use crate::replication::ReplicationConfig;
18
19pub const DEFAULT_SNAPSHOT_RETENTION: usize = 16;
20pub const DEFAULT_EXPORT_RETENTION: usize = 16;
21
22pub const REDDB_PROTOCOL_VERSION: &str = "reddb-v2";
23pub const REDDB_FORMAT_VERSION: u32 = 2;
24pub const DEFAULT_GROUP_COMMIT_WINDOW_MS: u64 = 0;
37pub const DEFAULT_GROUP_COMMIT_MAX_STATEMENTS: usize = 128;
38pub const DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES: u64 = 1024 * 1024;
39
40pub type RedDBResult<T> = Result<T, RedDBError>;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
43pub enum StorageMode {
44 #[default]
46 Persistent,
47}
48
49impl StorageMode {
50 pub const fn is_persistent(self) -> bool {
51 matches!(self, Self::Persistent)
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
56pub enum DurabilityMode {
57 #[default]
58 Strict,
59 WalDurableGrouped,
60 Async,
68}
69
70impl DurabilityMode {
71 pub const fn as_str(self) -> &'static str {
72 match self {
73 Self::Strict => "strict",
74 Self::WalDurableGrouped => "wal_durable_grouped",
75 Self::Async => "async",
76 }
77 }
78
79 pub fn from_str(value: &str) -> Option<Self> {
80 let normalized = value.trim().to_ascii_lowercase();
81 match normalized.as_str() {
82 "strict" => Some(Self::Strict),
84 "sync"
90 | "wal_durable_grouped"
91 | "wal-durable-grouped"
92 | "grouped"
93 | "wal_grouped"
94 | "wal-grouped" => Some(Self::WalDurableGrouped),
95 "async" | "fire_and_forget" | "fire-and-forget" => Some(Self::Async),
99 _ => None,
100 }
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct GroupCommitOptions {
106 pub window_ms: u64,
107 pub max_statements: usize,
108 pub max_wal_bytes: u64,
109}
110
111impl Default for GroupCommitOptions {
112 fn default() -> Self {
113 Self {
114 window_ms: DEFAULT_GROUP_COMMIT_WINDOW_MS,
115 max_statements: DEFAULT_GROUP_COMMIT_MAX_STATEMENTS,
116 max_wal_bytes: DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES,
117 }
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
122pub enum Capability {
123 Table,
125 Graph,
127 Vector,
129 FullText,
131 Security,
133 Encryption,
135}
136
137impl Capability {
138 pub const fn as_str(self) -> &'static str {
139 match self {
140 Self::Table => "table",
141 Self::Graph => "graph",
142 Self::Vector => "vector",
143 Self::FullText => "fulltext",
144 Self::Security => "security",
145 Self::Encryption => "encryption",
146 }
147 }
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct CapabilitySet {
152 items: BTreeSet<Capability>,
153}
154
155impl CapabilitySet {
156 pub fn new() -> Self {
157 Self::default()
158 }
159
160 pub fn with(mut self, capability: Capability) -> Self {
161 self.items.insert(capability);
162 self
163 }
164
165 pub fn with_all(mut self, capabilities: &[Capability]) -> Self {
166 capabilities.iter().copied().for_each(|capability| {
167 self.items.insert(capability);
168 });
169 self
170 }
171
172 pub fn has(&self, capability: Capability) -> bool {
173 self.items.contains(&capability)
174 }
175
176 pub fn as_slice(&self) -> Vec<Capability> {
177 self.items.iter().copied().collect()
178 }
179}
180
181pub struct RedDBOptions {
182 pub mode: StorageMode,
183 pub data_path: Option<PathBuf>,
184 pub read_only: bool,
185 pub create_if_missing: bool,
186 pub verify_checksums: bool,
187 pub durability_mode: DurabilityMode,
188 pub group_commit: GroupCommitOptions,
189 pub auto_checkpoint_pages: u32,
190 pub cache_pages: usize,
191 pub snapshot_retention: usize,
192 pub export_retention: usize,
193 pub feature_gates: CapabilitySet,
194 pub force_create: bool,
195 pub metadata: BTreeMap<String, String>,
196 pub remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
198 pub remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
205 pub remote_key: Option<String>,
207 pub replication: ReplicationConfig,
209 pub auth: AuthConfig,
211 pub control_events: crate::runtime::control_events::ControlEventConfig,
214 pub query_audit: crate::runtime::query_audit::QueryAuditConfig,
218 pub auto_index_id: bool,
224 pub layout: crate::storage::layout::StorageLayout,
230 pub layout_overrides: crate::storage::layout::LayoutOverrides,
232 pub layout_explicit: bool,
238}
239
240impl fmt::Debug for RedDBOptions {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 let backend_name = self.remote_backend.as_ref().map(|b| b.name().to_string());
243 f.debug_struct("RedDBOptions")
244 .field("mode", &self.mode)
245 .field("data_path", &self.data_path)
246 .field("read_only", &self.read_only)
247 .field("create_if_missing", &self.create_if_missing)
248 .field("verify_checksums", &self.verify_checksums)
249 .field("durability_mode", &self.durability_mode)
250 .field("group_commit", &self.group_commit)
251 .field("auto_checkpoint_pages", &self.auto_checkpoint_pages)
252 .field("cache_pages", &self.cache_pages)
253 .field("snapshot_retention", &self.snapshot_retention)
254 .field("export_retention", &self.export_retention)
255 .field("feature_gates", &self.feature_gates)
256 .field("force_create", &self.force_create)
257 .field("metadata", &self.metadata)
258 .field("remote_backend", &backend_name)
259 .field("remote_key", &self.remote_key)
260 .field("replication", &self.replication)
261 .field("auth", &self.auth)
262 .field("control_events", &self.control_events)
263 .field("query_audit", &self.query_audit)
264 .field("layout", &self.layout)
265 .field("layout_overrides", &self.layout_overrides)
266 .finish()
267 }
268}
269
270impl Clone for RedDBOptions {
271 fn clone(&self) -> Self {
272 Self {
273 mode: self.mode,
274 data_path: self.data_path.clone(),
275 read_only: self.read_only,
276 create_if_missing: self.create_if_missing,
277 verify_checksums: self.verify_checksums,
278 durability_mode: self.durability_mode,
279 group_commit: self.group_commit,
280 auto_checkpoint_pages: self.auto_checkpoint_pages,
281 cache_pages: self.cache_pages,
282 snapshot_retention: self.snapshot_retention,
283 export_retention: self.export_retention,
284 feature_gates: self.feature_gates.clone(),
285 force_create: self.force_create,
286 metadata: self.metadata.clone(),
287 remote_backend: self.remote_backend.clone(),
288 remote_backend_atomic: self.remote_backend_atomic.clone(),
289 remote_key: self.remote_key.clone(),
290 replication: self.replication.clone(),
291 auth: self.auth.clone(),
292 control_events: self.control_events,
293 query_audit: self.query_audit.clone(),
294 auto_index_id: self.auto_index_id,
295 layout: self.layout,
296 layout_overrides: self.layout_overrides.clone(),
297 layout_explicit: self.layout_explicit,
298 }
299 }
300}
301
302impl Default for RedDBOptions {
303 fn default() -> Self {
304 Self {
305 mode: StorageMode::Persistent,
306 data_path: None,
307 read_only: false,
308 create_if_missing: true,
309 verify_checksums: true,
310 durability_mode: DurabilityMode::WalDurableGrouped,
316 group_commit: GroupCommitOptions::default(),
317 auto_checkpoint_pages: 1000,
318 cache_pages: 10_000,
319 snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
320 export_retention: DEFAULT_EXPORT_RETENTION,
321 feature_gates: CapabilitySet::new()
322 .with(Capability::Table)
323 .with(Capability::Graph)
324 .with(Capability::Vector),
325 force_create: true,
326 metadata: BTreeMap::new(),
327 remote_backend: None,
328 remote_backend_atomic: None,
329 remote_key: None,
330 replication: ReplicationConfig::standalone(),
331 auth: AuthConfig::default(),
332 control_events: crate::runtime::control_events::ControlEventConfig::default(),
333 query_audit: crate::runtime::query_audit::QueryAuditConfig::default(),
334 auto_index_id: true,
335 layout: crate::storage::layout::StorageLayout::default(),
336 layout_overrides: crate::storage::layout::LayoutOverrides::default(),
337 layout_explicit: false,
338 }
339 }
340}
341
342impl RedDBOptions {
343 pub fn persistent<P: Into<PathBuf>>(path: P) -> Self {
344 Self {
345 mode: StorageMode::Persistent,
346 data_path: Some(path.into()),
347 ..Default::default()
348 }
349 }
350
351 pub fn in_memory() -> Self {
358 static NEXT_EPHEMERAL_ID: std::sync::atomic::AtomicU64 =
359 std::sync::atomic::AtomicU64::new(0);
360
361 let now_nanos = std::time::SystemTime::now()
362 .duration_since(std::time::UNIX_EPOCH)
363 .map(|duration| duration.as_nanos())
364 .unwrap_or(0);
365 let unique = NEXT_EPHEMERAL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
366 let path = std::env::temp_dir().join(format!(
367 "reddb-ephemeral-{}-{}-{}.rdb",
368 std::process::id(),
369 now_nanos,
370 unique
371 ));
372 let _ = std::fs::remove_file(&path);
373 Self {
374 mode: StorageMode::Persistent,
375 data_path: Some(path),
376 auto_checkpoint_pages: 0,
377 cache_pages: 2_000,
378 snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
379 export_retention: DEFAULT_EXPORT_RETENTION,
380 read_only: false,
381 force_create: true,
382 ..Default::default()
383 }
384 }
385
386 pub fn with_mode(mut self, mode: StorageMode) -> Self {
387 self.mode = mode;
388 self
389 }
390
391 pub fn with_data_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
392 self.data_path = Some(path.into());
393 self
394 }
395
396 pub fn with_read_only(mut self, read_only: bool) -> Self {
397 self.read_only = read_only;
398 self
399 }
400
401 pub fn with_auto_checkpoint(mut self, pages: u32) -> Self {
402 self.auto_checkpoint_pages = pages;
403 self
404 }
405
406 pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
407 self.durability_mode = mode;
408 self
409 }
410
411 pub fn with_group_commit_window_ms(mut self, window_ms: u64) -> Self {
412 self.group_commit.window_ms = window_ms;
415 self
416 }
417
418 pub fn with_group_commit_max_statements(mut self, max_statements: usize) -> Self {
419 self.group_commit.max_statements = max_statements.max(1);
420 self
421 }
422
423 pub fn with_group_commit_max_wal_bytes(mut self, max_wal_bytes: u64) -> Self {
424 self.group_commit.max_wal_bytes = max_wal_bytes.max(1);
425 self
426 }
427
428 pub fn with_cache_pages(mut self, pages: usize) -> Self {
429 self.cache_pages = pages.max(2);
430 self
431 }
432
433 pub fn with_snapshot_retention(mut self, limit: usize) -> Self {
434 self.snapshot_retention = limit.max(1);
435 self
436 }
437
438 pub fn with_export_retention(mut self, limit: usize) -> Self {
439 self.export_retention = limit.max(1);
440 self
441 }
442
443 pub fn with_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
444 self.metadata.insert(key.into(), value.into());
445 self
446 }
447
448 pub fn with_auto_index_id(mut self, enabled: bool) -> Self {
452 self.auto_index_id = enabled;
453 self
454 }
455
456 pub fn with_capability(mut self, capability: Capability) -> Self {
457 self.feature_gates = self.feature_gates.with(capability);
458 self
459 }
460
461 pub fn with_remote_backend(
467 mut self,
468 backend: Arc<dyn crate::storage::backend::RemoteBackend>,
469 key: impl Into<String>,
470 ) -> Self {
471 self.remote_backend = Some(backend);
472 self.remote_key = Some(key.into());
473 self
474 }
475
476 pub fn with_atomic_remote_backend(
482 mut self,
483 backend: Arc<dyn crate::storage::backend::AtomicRemoteBackend>,
484 ) -> Self {
485 self.remote_backend_atomic = Some(backend);
486 self
487 }
488
489 pub fn with_replication(mut self, config: ReplicationConfig) -> Self {
490 self.replication = config;
491 self
492 }
493
494 pub fn with_auth(mut self, config: AuthConfig) -> Self {
495 self.auth = config;
496 self
497 }
498
499 pub fn resolved_path(&self, fallback: impl AsRef<Path>) -> PathBuf {
500 self.data_path
501 .clone()
502 .unwrap_or_else(|| fallback.as_ref().to_path_buf())
503 }
504
505 pub fn remote_namespace_prefix(&self) -> String {
506 let Some(remote_key) = &self.remote_key else {
507 return String::new();
508 };
509 let normalized = remote_key.trim_matches('/');
510 if normalized.is_empty() {
511 return String::new();
512 }
513 match normalized.rsplit_once('/') {
514 Some((parent, _)) if !parent.is_empty() => format!("{parent}/"),
515 _ => String::new(),
516 }
517 }
518
519 pub fn default_backup_head_key(&self) -> String {
520 if let Some(value) = self.metadata.get("red.config.backup.head_key") {
521 return value.clone();
522 }
523 format!("{}manifests/head.json", self.remote_namespace_prefix())
524 }
525
526 pub fn default_snapshot_prefix(&self) -> String {
527 if let Some(value) = self.metadata.get("red.config.backup.snapshot_prefix") {
528 return value.clone();
529 }
530 format!("{}snapshots/", self.remote_namespace_prefix())
531 }
532
533 pub fn default_wal_archive_prefix(&self) -> String {
534 if let Some(value) = self.metadata.get("red.config.wal.archive.prefix") {
535 return value.clone();
536 }
537 format!("{}wal/", self.remote_namespace_prefix())
538 }
539
540 pub fn has_capability(&self, capability: Capability) -> bool {
541 self.feature_gates.has(capability)
542 }
543
544 pub fn with_layout(mut self, layout: crate::storage::layout::StorageLayout) -> Self {
547 self.layout = layout;
548 self.layout_explicit = true;
549 self
550 }
551
552 pub fn with_layout_overrides(
555 mut self,
556 overrides: crate::storage::layout::LayoutOverrides,
557 ) -> Self {
558 self.layout_overrides = overrides;
559 self
560 }
561
562 pub fn resolve_tiered_layout(
566 &self,
567 ) -> Option<(PathBuf, crate::storage::layout::TieredLayoutPaths)> {
568 let data_path = self.data_path.clone()?;
569 let paths = crate::storage::layout::TieredLayoutPaths::new(
570 &data_path,
571 self.layout,
572 self.layout_overrides.clone(),
573 );
574 Some((data_path, paths))
575 }
576
577 pub fn apply_tier_defaults(&self) {
598 use crate::storage::layout::StorageLayout;
599
600 if !self.layout_explicit {
605 if let Some((_, paths)) = self.resolve_tiered_layout() {
606 tier_wiring::stash_layout_paths(paths);
607 }
608 return;
609 }
610
611 let layout = self.layout;
612 crate::physical::set_meta_json_sidecar_enabled(matches!(layout, StorageLayout::Max));
614
615 crate::physical::set_seqn_journal_enabled(matches!(layout, StorageLayout::Max));
619 crate::physical::set_seqn_journal_retention(match layout {
620 StorageLayout::Max => crate::physical::DEFAULT_METADATA_JOURNAL_RETENTION,
621 _ => crate::physical::OPT_IN_METADATA_JOURNAL_RETENTION,
622 });
623
624 crate::physical::set_shm_provisioning_enabled(matches!(
626 layout,
627 StorageLayout::Standard | StorageLayout::Performance | StorageLayout::Max
628 ));
629
630 crate::physical::set_fold_pager_meta_enabled(matches!(layout, StorageLayout::Max));
633 crate::physical::set_fold_dwb_into_wal_enabled(matches!(layout, StorageLayout::Max));
634
635 if let Some((_, paths)) = self.resolve_tiered_layout() {
637 tier_wiring::stash_layout_paths(paths);
638 }
639 }
640}
641
642pub mod tier_wiring {
646 use std::sync::Mutex;
647
648 use crate::storage::layout::{LogDestination, TieredLayoutPaths};
649
650 static CURRENT_LAYOUT_PATHS: Mutex<Option<TieredLayoutPaths>> = Mutex::new(None);
651
652 pub fn stash_layout_paths(paths: TieredLayoutPaths) {
653 if let Ok(mut slot) = CURRENT_LAYOUT_PATHS.lock() {
654 *slot = Some(paths);
655 }
656 }
657
658 pub fn current_layout_paths() -> Option<TieredLayoutPaths> {
659 CURRENT_LAYOUT_PATHS
660 .lock()
661 .ok()
662 .and_then(|slot| slot.clone())
663 }
664
665 pub fn current_log_destinations() -> (LogDestination, LogDestination) {
669 match current_layout_paths() {
670 Some(p) => (p.audit_log_destination, p.slow_log_destination),
671 None => (LogDestination::Stderr, LogDestination::Stderr),
672 }
673 }
674}
675
676#[derive(Debug, Clone, Default)]
677pub struct CollectionStats {
678 pub entities: usize,
679 pub cross_refs: usize,
680 pub segments: usize,
681}
682
683#[derive(Debug, Clone)]
684pub struct CatalogSnapshot {
685 pub name: String,
686 pub total_entities: usize,
687 pub total_collections: usize,
688 pub stats_by_collection: BTreeMap<String, CollectionStats>,
689 pub updated_at: SystemTime,
690}
691
692impl Default for CatalogSnapshot {
693 fn default() -> Self {
694 Self {
695 name: String::new(),
696 total_entities: 0,
697 total_collections: 0,
698 stats_by_collection: BTreeMap::new(),
699 updated_at: UNIX_EPOCH,
700 }
701 }
702}
703
704#[derive(Debug, Clone)]
705pub struct SchemaManifest {
706 pub format_version: u32,
707 pub created_at_unix_ms: u128,
708 pub updated_at_unix_ms: u128,
709 pub options: RedDBOptions,
710 pub collection_count: usize,
711}
712
713impl SchemaManifest {
714 pub fn now(options: RedDBOptions, collection_count: usize) -> Self {
715 let now = SystemTime::now()
716 .duration_since(UNIX_EPOCH)
717 .unwrap_or_default()
718 .as_millis();
719 Self {
720 format_version: REDDB_FORMAT_VERSION,
721 created_at_unix_ms: now,
722 updated_at_unix_ms: now,
723 options,
724 collection_count,
725 }
726 }
727}
728
729#[derive(Debug)]
730pub enum RedDBError {
731 InvalidConfig(String),
732 SchemaVersionMismatch {
733 expected: u32,
734 found: u32,
735 },
736 FeatureNotEnabled(String),
737 NotFound(String),
738 ReadOnly(String),
739 InvalidOperation(String),
740 Engine(String),
741 Catalog(String),
742 Query(String),
743 Validation {
744 message: String,
745 validation: crate::json::Value,
746 },
747 Io(io::Error),
748 VersionUnavailable,
749 QuotaExceeded(String),
755 MaterializationLimitExceeded {
764 executor: &'static str,
765 limit: usize,
766 current: usize,
767 },
768 Internal(String),
769}
770
771impl fmt::Display for RedDBError {
772 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773 match self {
774 Self::InvalidConfig(msg) => write!(f, "invalid config: {msg}"),
775 Self::SchemaVersionMismatch { expected, found } => {
776 write!(
777 f,
778 "schema version mismatch: expected {expected}, found {found}"
779 )
780 }
781 Self::FeatureNotEnabled(msg) => write!(f, "feature disabled: {msg}"),
782 Self::NotFound(msg) => write!(f, "not found: {msg}"),
783 Self::ReadOnly(msg) => write!(f, "read-only violation: {msg}"),
784 Self::InvalidOperation(msg) => write!(f, "INVALID_OPERATION: {msg}"),
785 Self::Engine(msg) => write!(f, "engine error: {msg}"),
786 Self::Catalog(msg) => write!(f, "catalog error: {msg}"),
787 Self::Query(msg) => write!(f, "query error: {msg}"),
788 Self::Validation { message, .. } => write!(f, "validation error: {message}"),
789 Self::Io(err) => write!(f, "io error: {err}"),
790 Self::VersionUnavailable => write!(f, "version information unavailable"),
791 Self::QuotaExceeded(msg) => write!(f, "quota exceeded: {msg}"),
792 Self::MaterializationLimitExceeded {
793 executor,
794 limit,
795 current,
796 } => write!(
797 f,
798 "materialization limit exceeded: executor={executor} current={current} limit={limit}"
799 ),
800 Self::Internal(msg) => write!(f, "internal error: {msg}"),
801 }
802 }
803}
804
805impl std::error::Error for RedDBError {}
806
807impl From<io::Error> for RedDBError {
808 fn from(err: io::Error) -> Self {
809 Self::Io(err)
810 }
811}
812
813impl From<crate::storage::engine::DatabaseError> for RedDBError {
814 fn from(err: crate::storage::engine::DatabaseError) -> Self {
815 Self::Engine(err.to_string())
816 }
817}
818
819impl From<crate::storage::wal::TxError> for RedDBError {
820 fn from(err: crate::storage::wal::TxError) -> Self {
821 Self::Engine(err.to_string())
822 }
823}
824
825impl From<crate::storage::StoreError> for RedDBError {
826 fn from(err: crate::storage::StoreError) -> Self {
827 Self::Catalog(err.to_string())
828 }
829}
830
831impl From<crate::storage::unified::devx::DevXError> for RedDBError {
832 fn from(err: crate::storage::unified::devx::DevXError) -> Self {
833 match err {
834 crate::storage::unified::devx::DevXError::Validation(msg) => Self::InvalidConfig(msg),
835 crate::storage::unified::devx::DevXError::Storage(msg) => Self::Engine(msg),
836 crate::storage::unified::devx::DevXError::NotFound(msg) => Self::NotFound(msg),
837 }
838 }
839}
840
841pub trait CatalogService {
842 fn list_collections(&self) -> Vec<String>;
843 fn collection_stats(&self, collection: &str) -> Option<CollectionStats>;
844 fn catalog_snapshot(&self) -> CatalogSnapshot;
845}
846
847pub trait QueryPlanner {
848 fn plan_cost(&self, query: &str) -> Option<f64>;
849}
850
851pub trait DataOps {
852 fn execute_query(&self, query: &str) -> RedDBResult<()>;
853}
854
855pub mod prelude {
856 pub use super::{
857 Capability, CapabilitySet, CatalogService, CatalogSnapshot, CollectionStats, DataOps,
858 QueryPlanner, RedDBError, RedDBOptions, RedDBResult, SchemaManifest, StorageMode,
859 REDDB_FORMAT_VERSION, REDDB_PROTOCOL_VERSION,
860 };
861}