1use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::io;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::auth::AuthConfig;
17use crate::replication::ReplicationConfig;
18
19pub const DEFAULT_SNAPSHOT_RETENTION: usize = 16;
20pub const DEFAULT_EXPORT_RETENTION: usize = 16;
21
22pub const REDDB_PROTOCOL_VERSION: &str = "reddb-v2";
23pub const REDDB_FORMAT_VERSION: u32 = 2;
24pub const DEFAULT_GROUP_COMMIT_WINDOW_MS: u64 = 0;
37pub const DEFAULT_GROUP_COMMIT_MAX_STATEMENTS: usize = 128;
38pub const DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES: u64 = 1024 * 1024;
39
40pub type RedDBResult<T> = Result<T, RedDBError>;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
43pub enum StorageMode {
44 #[default]
46 Persistent,
47}
48
49impl StorageMode {
50 pub const fn is_persistent(self) -> bool {
51 matches!(self, Self::Persistent)
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
56pub enum DurabilityMode {
57 #[default]
58 Strict,
59 WalDurableGrouped,
60 Async,
68}
69
70impl DurabilityMode {
71 pub const fn as_str(self) -> &'static str {
72 match self {
73 Self::Strict => "strict",
74 Self::WalDurableGrouped => "wal_durable_grouped",
75 Self::Async => "async",
76 }
77 }
78
79 pub fn from_str(value: &str) -> Option<Self> {
80 let normalized = value.trim().to_ascii_lowercase();
81 match normalized.as_str() {
82 "strict" => Some(Self::Strict),
84 "sync"
90 | "wal_durable_grouped"
91 | "wal-durable-grouped"
92 | "grouped"
93 | "wal_grouped"
94 | "wal-grouped" => Some(Self::WalDurableGrouped),
95 "async" | "fire_and_forget" | "fire-and-forget" => Some(Self::Async),
99 _ => None,
100 }
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct GroupCommitOptions {
106 pub window_ms: u64,
107 pub max_statements: usize,
108 pub max_wal_bytes: u64,
109}
110
111impl Default for GroupCommitOptions {
112 fn default() -> Self {
113 Self {
114 window_ms: DEFAULT_GROUP_COMMIT_WINDOW_MS,
115 max_statements: DEFAULT_GROUP_COMMIT_MAX_STATEMENTS,
116 max_wal_bytes: DEFAULT_GROUP_COMMIT_MAX_WAL_BYTES,
117 }
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
122pub enum Capability {
123 Table,
125 Graph,
127 Vector,
129 FullText,
131 Security,
133 Encryption,
135}
136
137impl Capability {
138 pub const fn as_str(self) -> &'static str {
139 match self {
140 Self::Table => "table",
141 Self::Graph => "graph",
142 Self::Vector => "vector",
143 Self::FullText => "fulltext",
144 Self::Security => "security",
145 Self::Encryption => "encryption",
146 }
147 }
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct CapabilitySet {
152 items: BTreeSet<Capability>,
153}
154
155impl CapabilitySet {
156 pub fn new() -> Self {
157 Self::default()
158 }
159
160 pub fn with(mut self, capability: Capability) -> Self {
161 self.items.insert(capability);
162 self
163 }
164
165 pub fn with_all(mut self, capabilities: &[Capability]) -> Self {
166 capabilities.iter().copied().for_each(|capability| {
167 self.items.insert(capability);
168 });
169 self
170 }
171
172 pub fn has(&self, capability: Capability) -> bool {
173 self.items.contains(&capability)
174 }
175
176 pub fn as_slice(&self) -> Vec<Capability> {
177 self.items.iter().copied().collect()
178 }
179}
180
181pub struct RedDBOptions {
182 pub mode: StorageMode,
183 pub data_path: Option<PathBuf>,
184 pub read_only: bool,
185 pub create_if_missing: bool,
186 pub verify_checksums: bool,
187 pub durability_mode: DurabilityMode,
188 pub group_commit: GroupCommitOptions,
189 pub auto_checkpoint_pages: u32,
190 pub cache_pages: usize,
191 pub snapshot_retention: usize,
192 pub export_retention: usize,
193 pub feature_gates: CapabilitySet,
194 pub force_create: bool,
195 pub metadata: BTreeMap<String, String>,
196 pub remote_backend: Option<Arc<dyn crate::storage::backend::RemoteBackend>>,
198 pub remote_backend_atomic: Option<Arc<dyn crate::storage::backend::AtomicRemoteBackend>>,
205 pub remote_key: Option<String>,
207 pub replication: ReplicationConfig,
209 pub auth: AuthConfig,
211 pub auto_index_id: bool,
217}
218
219impl fmt::Debug for RedDBOptions {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 let backend_name = self.remote_backend.as_ref().map(|b| b.name().to_string());
222 f.debug_struct("RedDBOptions")
223 .field("mode", &self.mode)
224 .field("data_path", &self.data_path)
225 .field("read_only", &self.read_only)
226 .field("create_if_missing", &self.create_if_missing)
227 .field("verify_checksums", &self.verify_checksums)
228 .field("durability_mode", &self.durability_mode)
229 .field("group_commit", &self.group_commit)
230 .field("auto_checkpoint_pages", &self.auto_checkpoint_pages)
231 .field("cache_pages", &self.cache_pages)
232 .field("snapshot_retention", &self.snapshot_retention)
233 .field("export_retention", &self.export_retention)
234 .field("feature_gates", &self.feature_gates)
235 .field("force_create", &self.force_create)
236 .field("metadata", &self.metadata)
237 .field("remote_backend", &backend_name)
238 .field("remote_key", &self.remote_key)
239 .field("replication", &self.replication)
240 .field("auth", &self.auth)
241 .finish()
242 }
243}
244
245impl Clone for RedDBOptions {
246 fn clone(&self) -> Self {
247 Self {
248 mode: self.mode,
249 data_path: self.data_path.clone(),
250 read_only: self.read_only,
251 create_if_missing: self.create_if_missing,
252 verify_checksums: self.verify_checksums,
253 durability_mode: self.durability_mode,
254 group_commit: self.group_commit,
255 auto_checkpoint_pages: self.auto_checkpoint_pages,
256 cache_pages: self.cache_pages,
257 snapshot_retention: self.snapshot_retention,
258 export_retention: self.export_retention,
259 feature_gates: self.feature_gates.clone(),
260 force_create: self.force_create,
261 metadata: self.metadata.clone(),
262 remote_backend: self.remote_backend.clone(),
263 remote_backend_atomic: self.remote_backend_atomic.clone(),
264 remote_key: self.remote_key.clone(),
265 replication: self.replication.clone(),
266 auth: self.auth.clone(),
267 auto_index_id: self.auto_index_id,
268 }
269 }
270}
271
272impl Default for RedDBOptions {
273 fn default() -> Self {
274 Self {
275 mode: StorageMode::Persistent,
276 data_path: None,
277 read_only: false,
278 create_if_missing: true,
279 verify_checksums: true,
280 durability_mode: DurabilityMode::WalDurableGrouped,
286 group_commit: GroupCommitOptions::default(),
287 auto_checkpoint_pages: 1000,
288 cache_pages: 10_000,
289 snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
290 export_retention: DEFAULT_EXPORT_RETENTION,
291 feature_gates: CapabilitySet::new()
292 .with(Capability::Table)
293 .with(Capability::Graph)
294 .with(Capability::Vector),
295 force_create: true,
296 metadata: BTreeMap::new(),
297 remote_backend: None,
298 remote_backend_atomic: None,
299 remote_key: None,
300 replication: ReplicationConfig::standalone(),
301 auth: AuthConfig::default(),
302 auto_index_id: true,
303 }
304 }
305}
306
307impl RedDBOptions {
308 pub fn persistent<P: Into<PathBuf>>(path: P) -> Self {
309 Self {
310 mode: StorageMode::Persistent,
311 data_path: Some(path.into()),
312 ..Default::default()
313 }
314 }
315
316 pub fn in_memory() -> Self {
323 static NEXT_EPHEMERAL_ID: std::sync::atomic::AtomicU64 =
324 std::sync::atomic::AtomicU64::new(0);
325
326 let now_nanos = std::time::SystemTime::now()
327 .duration_since(std::time::UNIX_EPOCH)
328 .map(|duration| duration.as_nanos())
329 .unwrap_or(0);
330 let unique = NEXT_EPHEMERAL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
331 let path = std::env::temp_dir().join(format!(
332 "reddb-ephemeral-{}-{}-{}.rdb",
333 std::process::id(),
334 now_nanos,
335 unique
336 ));
337 let _ = std::fs::remove_file(&path);
338 Self {
339 mode: StorageMode::Persistent,
340 data_path: Some(path),
341 auto_checkpoint_pages: 0,
342 cache_pages: 2_000,
343 snapshot_retention: DEFAULT_SNAPSHOT_RETENTION,
344 export_retention: DEFAULT_EXPORT_RETENTION,
345 read_only: false,
346 force_create: true,
347 ..Default::default()
348 }
349 }
350
351 pub fn with_mode(mut self, mode: StorageMode) -> Self {
352 self.mode = mode;
353 self
354 }
355
356 pub fn with_data_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
357 self.data_path = Some(path.into());
358 self
359 }
360
361 pub fn with_read_only(mut self, read_only: bool) -> Self {
362 self.read_only = read_only;
363 self
364 }
365
366 pub fn with_auto_checkpoint(mut self, pages: u32) -> Self {
367 self.auto_checkpoint_pages = pages;
368 self
369 }
370
371 pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
372 self.durability_mode = mode;
373 self
374 }
375
376 pub fn with_group_commit_window_ms(mut self, window_ms: u64) -> Self {
377 self.group_commit.window_ms = window_ms;
380 self
381 }
382
383 pub fn with_group_commit_max_statements(mut self, max_statements: usize) -> Self {
384 self.group_commit.max_statements = max_statements.max(1);
385 self
386 }
387
388 pub fn with_group_commit_max_wal_bytes(mut self, max_wal_bytes: u64) -> Self {
389 self.group_commit.max_wal_bytes = max_wal_bytes.max(1);
390 self
391 }
392
393 pub fn with_cache_pages(mut self, pages: usize) -> Self {
394 self.cache_pages = pages.max(2);
395 self
396 }
397
398 pub fn with_snapshot_retention(mut self, limit: usize) -> Self {
399 self.snapshot_retention = limit.max(1);
400 self
401 }
402
403 pub fn with_export_retention(mut self, limit: usize) -> Self {
404 self.export_retention = limit.max(1);
405 self
406 }
407
408 pub fn with_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
409 self.metadata.insert(key.into(), value.into());
410 self
411 }
412
413 pub fn with_auto_index_id(mut self, enabled: bool) -> Self {
417 self.auto_index_id = enabled;
418 self
419 }
420
421 pub fn with_capability(mut self, capability: Capability) -> Self {
422 self.feature_gates = self.feature_gates.with(capability);
423 self
424 }
425
426 pub fn with_remote_backend(
432 mut self,
433 backend: Arc<dyn crate::storage::backend::RemoteBackend>,
434 key: impl Into<String>,
435 ) -> Self {
436 self.remote_backend = Some(backend);
437 self.remote_key = Some(key.into());
438 self
439 }
440
441 pub fn with_atomic_remote_backend(
447 mut self,
448 backend: Arc<dyn crate::storage::backend::AtomicRemoteBackend>,
449 ) -> Self {
450 self.remote_backend_atomic = Some(backend);
451 self
452 }
453
454 pub fn with_replication(mut self, config: ReplicationConfig) -> Self {
455 self.replication = config;
456 self
457 }
458
459 pub fn with_auth(mut self, config: AuthConfig) -> Self {
460 self.auth = config;
461 self
462 }
463
464 pub fn resolved_path(&self, fallback: impl AsRef<Path>) -> PathBuf {
465 self.data_path
466 .clone()
467 .unwrap_or_else(|| fallback.as_ref().to_path_buf())
468 }
469
470 pub fn remote_namespace_prefix(&self) -> String {
471 let Some(remote_key) = &self.remote_key else {
472 return String::new();
473 };
474 let normalized = remote_key.trim_matches('/');
475 if normalized.is_empty() {
476 return String::new();
477 }
478 match normalized.rsplit_once('/') {
479 Some((parent, _)) if !parent.is_empty() => format!("{parent}/"),
480 _ => String::new(),
481 }
482 }
483
484 pub fn default_backup_head_key(&self) -> String {
485 if let Some(value) = self.metadata.get("red.config.backup.head_key") {
486 return value.clone();
487 }
488 format!("{}manifests/head.json", self.remote_namespace_prefix())
489 }
490
491 pub fn default_snapshot_prefix(&self) -> String {
492 if let Some(value) = self.metadata.get("red.config.backup.snapshot_prefix") {
493 return value.clone();
494 }
495 format!("{}snapshots/", self.remote_namespace_prefix())
496 }
497
498 pub fn default_wal_archive_prefix(&self) -> String {
499 if let Some(value) = self.metadata.get("red.config.wal.archive.prefix") {
500 return value.clone();
501 }
502 format!("{}wal/", self.remote_namespace_prefix())
503 }
504
505 pub fn has_capability(&self, capability: Capability) -> bool {
506 self.feature_gates.has(capability)
507 }
508}
509
510#[derive(Debug, Clone, Default)]
511pub struct CollectionStats {
512 pub entities: usize,
513 pub cross_refs: usize,
514 pub segments: usize,
515}
516
517#[derive(Debug, Clone)]
518pub struct CatalogSnapshot {
519 pub name: String,
520 pub total_entities: usize,
521 pub total_collections: usize,
522 pub stats_by_collection: BTreeMap<String, CollectionStats>,
523 pub updated_at: SystemTime,
524}
525
526impl Default for CatalogSnapshot {
527 fn default() -> Self {
528 Self {
529 name: String::new(),
530 total_entities: 0,
531 total_collections: 0,
532 stats_by_collection: BTreeMap::new(),
533 updated_at: UNIX_EPOCH,
534 }
535 }
536}
537
538#[derive(Debug, Clone)]
539pub struct SchemaManifest {
540 pub format_version: u32,
541 pub created_at_unix_ms: u128,
542 pub updated_at_unix_ms: u128,
543 pub options: RedDBOptions,
544 pub collection_count: usize,
545}
546
547impl SchemaManifest {
548 pub fn now(options: RedDBOptions, collection_count: usize) -> Self {
549 let now = SystemTime::now()
550 .duration_since(UNIX_EPOCH)
551 .unwrap_or_default()
552 .as_millis();
553 Self {
554 format_version: REDDB_FORMAT_VERSION,
555 created_at_unix_ms: now,
556 updated_at_unix_ms: now,
557 options,
558 collection_count,
559 }
560 }
561}
562
563#[derive(Debug)]
564pub enum RedDBError {
565 InvalidConfig(String),
566 SchemaVersionMismatch {
567 expected: u32,
568 found: u32,
569 },
570 FeatureNotEnabled(String),
571 NotFound(String),
572 ReadOnly(String),
573 InvalidOperation(String),
574 Engine(String),
575 Catalog(String),
576 Query(String),
577 Validation {
578 message: String,
579 validation: crate::json::Value,
580 },
581 Io(io::Error),
582 VersionUnavailable,
583 QuotaExceeded(String),
589 Internal(String),
590}
591
592impl fmt::Display for RedDBError {
593 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
594 match self {
595 Self::InvalidConfig(msg) => write!(f, "invalid config: {msg}"),
596 Self::SchemaVersionMismatch { expected, found } => {
597 write!(
598 f,
599 "schema version mismatch: expected {expected}, found {found}"
600 )
601 }
602 Self::FeatureNotEnabled(msg) => write!(f, "feature disabled: {msg}"),
603 Self::NotFound(msg) => write!(f, "not found: {msg}"),
604 Self::ReadOnly(msg) => write!(f, "read-only violation: {msg}"),
605 Self::InvalidOperation(msg) => write!(f, "INVALID_OPERATION: {msg}"),
606 Self::Engine(msg) => write!(f, "engine error: {msg}"),
607 Self::Catalog(msg) => write!(f, "catalog error: {msg}"),
608 Self::Query(msg) => write!(f, "query error: {msg}"),
609 Self::Validation { message, .. } => write!(f, "validation error: {message}"),
610 Self::Io(err) => write!(f, "io error: {err}"),
611 Self::VersionUnavailable => write!(f, "version information unavailable"),
612 Self::QuotaExceeded(msg) => write!(f, "quota exceeded: {msg}"),
613 Self::Internal(msg) => write!(f, "internal error: {msg}"),
614 }
615 }
616}
617
618impl std::error::Error for RedDBError {}
619
620impl From<io::Error> for RedDBError {
621 fn from(err: io::Error) -> Self {
622 Self::Io(err)
623 }
624}
625
626impl From<crate::storage::engine::DatabaseError> for RedDBError {
627 fn from(err: crate::storage::engine::DatabaseError) -> Self {
628 Self::Engine(err.to_string())
629 }
630}
631
632impl From<crate::storage::wal::TxError> for RedDBError {
633 fn from(err: crate::storage::wal::TxError) -> Self {
634 Self::Engine(err.to_string())
635 }
636}
637
638impl From<crate::storage::StoreError> for RedDBError {
639 fn from(err: crate::storage::StoreError) -> Self {
640 Self::Catalog(err.to_string())
641 }
642}
643
644impl From<crate::storage::unified::devx::DevXError> for RedDBError {
645 fn from(err: crate::storage::unified::devx::DevXError) -> Self {
646 match err {
647 crate::storage::unified::devx::DevXError::Validation(msg) => Self::InvalidConfig(msg),
648 crate::storage::unified::devx::DevXError::Storage(msg) => Self::Engine(msg),
649 crate::storage::unified::devx::DevXError::NotFound(msg) => Self::NotFound(msg),
650 }
651 }
652}
653
654pub trait CatalogService {
655 fn list_collections(&self) -> Vec<String>;
656 fn collection_stats(&self, collection: &str) -> Option<CollectionStats>;
657 fn catalog_snapshot(&self) -> CatalogSnapshot;
658}
659
660pub trait QueryPlanner {
661 fn plan_cost(&self, query: &str) -> Option<f64>;
662}
663
664pub trait DataOps {
665 fn execute_query(&self, query: &str) -> RedDBResult<()>;
666}
667
668pub mod prelude {
669 pub use super::{
670 Capability, CapabilitySet, CatalogService, CatalogSnapshot, CollectionStats, DataOps,
671 QueryPlanner, RedDBError, RedDBOptions, RedDBResult, SchemaManifest, StorageMode,
672 REDDB_FORMAT_VERSION, REDDB_PROTOCOL_VERSION,
673 };
674}