1use std::collections::BTreeMap;
4use std::fs;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{
10 CatalogSnapshot, CollectionStats, RedDBOptions, SchemaManifest, StorageMode,
11 REDDB_FORMAT_VERSION,
12};
13use crate::index::IndexKind;
14use crate::json::{from_slice, parse_json, to_vec};
15use crate::serde_json::{Map, Value as JsonValue};
16
17pub const DEFAULT_GRID_BLOCK_SIZE: usize = 512 * 1024;
18pub const DEFAULT_PAGE_SIZE: usize = 4096;
19pub const DEFAULT_SUPERBLOCK_COPIES: u8 = 4;
20pub const PHYSICAL_METADATA_PROTOCOL_VERSION: &str = "reddb-physical-v1";
21pub const PHYSICAL_METADATA_BINARY_EXTENSION: &str = "meta.rdbx";
22pub const DEFAULT_MANIFEST_EVENT_HISTORY: usize = 256;
23pub const DEFAULT_METADATA_JOURNAL_RETENTION: usize = 32;
26pub const OPT_IN_METADATA_JOURNAL_RETENTION: usize = 4;
29
30use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
31
32static META_JSON_SIDECAR_POLICY: AtomicU8 = AtomicU8::new(0);
39
40pub fn set_meta_json_sidecar_enabled(enabled: bool) {
43 META_JSON_SIDECAR_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
44}
45
46pub fn meta_json_sidecar_enabled() -> bool {
51 match META_JSON_SIDECAR_POLICY.load(Ordering::Relaxed) {
52 1 => true,
53 2 => false,
54 _ => std::env::var("REDDB_META_JSON_SIDECAR")
55 .ok()
56 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
57 .unwrap_or(false),
58 }
59}
60
61static SEQN_JOURNAL_POLICY: AtomicU8 = AtomicU8::new(0);
67static SEQN_JOURNAL_RETENTION: AtomicUsize = AtomicUsize::new(0);
69
70pub fn set_seqn_journal_enabled(enabled: bool) {
75 SEQN_JOURNAL_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
76}
77
78pub fn seqn_journal_enabled() -> bool {
80 match SEQN_JOURNAL_POLICY.load(Ordering::Relaxed) {
81 1 => true,
82 2 => false,
83 _ => std::env::var("REDDB_SEQN_JOURNAL")
84 .ok()
85 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
86 .unwrap_or(false),
87 }
88}
89
90static FOLD_PAGER_META_POLICY: AtomicU8 = AtomicU8::new(0);
96
97pub fn set_fold_pager_meta_enabled(enabled: bool) {
102 FOLD_PAGER_META_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
103}
104
105pub fn fold_pager_meta_enabled() -> bool {
109 match FOLD_PAGER_META_POLICY.load(Ordering::Relaxed) {
110 1 => true,
111 2 => false,
112 _ => std::env::var("REDDB_FOLD_PAGER_META")
113 .ok()
114 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
115 .unwrap_or(false),
116 }
117}
118
119static FOLD_DWB_INTO_WAL_POLICY: AtomicU8 = AtomicU8::new(0);
125
126pub fn set_fold_dwb_into_wal_enabled(enabled: bool) {
131 FOLD_DWB_INTO_WAL_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
132}
133
134pub fn fold_dwb_into_wal_enabled() -> bool {
138 match FOLD_DWB_INTO_WAL_POLICY.load(Ordering::Relaxed) {
139 1 => true,
140 2 => false,
141 _ => std::env::var("REDDB_FOLD_DWB_INTO_WAL")
142 .ok()
143 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
144 .unwrap_or(false),
145 }
146}
147
148pub fn set_seqn_journal_retention(retention: usize) {
153 SEQN_JOURNAL_RETENTION.store(retention, Ordering::Relaxed);
154}
155
156pub fn seqn_journal_retention() -> usize {
160 let stored = SEQN_JOURNAL_RETENTION.load(Ordering::Relaxed);
161 if stored > 0 {
162 return stored;
163 }
164 std::env::var("REDDB_SEQN_JOURNAL_RETENTION")
165 .ok()
166 .and_then(|v| v.parse::<usize>().ok())
167 .filter(|v| *v > 0)
168 .unwrap_or(OPT_IN_METADATA_JOURNAL_RETENTION)
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum PhysicalMetadataSource {
173 Binary,
174 BinaryJournal,
175 Json,
176}
177
178impl PhysicalMetadataSource {
179 pub fn as_str(self) -> &'static str {
180 match self {
181 Self::Binary => "binary",
182 Self::BinaryJournal => "binary_journal",
183 Self::Json => "json",
184 }
185 }
186}
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
188pub struct BlockReference {
189 pub index: u64,
190 pub checksum: u128,
191}
192
193#[derive(Debug, Clone, Default)]
194pub struct ManifestPointers {
195 pub oldest: BlockReference,
196 pub newest: BlockReference,
197}
198
199#[derive(Debug, Clone)]
200pub struct SuperblockHeader {
201 pub format_version: u32,
202 pub sequence: u64,
203 pub copies: u8,
204 pub manifest: ManifestPointers,
205 pub free_set: BlockReference,
206 pub collection_roots: BTreeMap<String, u64>,
207}
208
209impl Default for SuperblockHeader {
210 fn default() -> Self {
211 Self {
212 format_version: crate::api::REDDB_FORMAT_VERSION,
213 sequence: 0,
214 copies: DEFAULT_SUPERBLOCK_COPIES,
215 manifest: ManifestPointers::default(),
216 free_set: BlockReference::default(),
217 collection_roots: BTreeMap::new(),
218 }
219 }
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub enum ManifestEventKind {
224 Insert,
225 Update,
226 Remove,
227 Checkpoint,
228}
229
230#[derive(Debug, Clone)]
231pub struct ManifestEvent {
232 pub collection: String,
233 pub object_key: String,
234 pub kind: ManifestEventKind,
235 pub block: BlockReference,
236 pub snapshot_min: u64,
237 pub snapshot_max: Option<u64>,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum CompactionPolicy {
242 Incremental,
243 Manual,
244}
245
246#[derive(Debug, Clone)]
247pub struct WalPolicy {
248 pub auto_checkpoint_pages: u32,
249 pub fsync_on_commit: bool,
250 pub ring_buffer_bytes: u64,
251}
252
253impl Default for WalPolicy {
254 fn default() -> Self {
255 Self {
256 auto_checkpoint_pages: 1000,
257 fsync_on_commit: true,
258 ring_buffer_bytes: 64 * 1024 * 1024,
259 }
260 }
261}
262
263#[derive(Debug, Clone)]
264pub struct GridLayout {
265 pub block_size: usize,
266 pub page_size: usize,
267 pub superblock_copies: u8,
268}
269
270impl Default for GridLayout {
271 fn default() -> Self {
272 Self {
273 block_size: DEFAULT_GRID_BLOCK_SIZE,
274 page_size: DEFAULT_PAGE_SIZE,
275 superblock_copies: DEFAULT_SUPERBLOCK_COPIES,
276 }
277 }
278}
279
280#[derive(Debug, Clone)]
281pub struct PhysicalLayout {
282 pub mode: StorageMode,
283 pub grid: GridLayout,
284 pub wal: WalPolicy,
285 pub compaction: CompactionPolicy,
286}
287
288impl PhysicalLayout {
289 pub fn from_options(options: &RedDBOptions) -> Self {
290 Self {
291 mode: options.mode,
292 grid: GridLayout::default(),
293 wal: WalPolicy {
294 auto_checkpoint_pages: options.auto_checkpoint_pages,
295 ..WalPolicy::default()
296 },
297 compaction: CompactionPolicy::Incremental,
298 }
299 }
300
301 pub fn is_persistent(&self) -> bool {
302 self.mode == StorageMode::Persistent
303 }
304}
305
306#[derive(Debug, Clone, Default)]
307pub struct SnapshotDescriptor {
308 pub snapshot_id: u64,
309 pub created_at_unix_ms: u128,
310 pub superblock_sequence: u64,
311 pub collection_count: usize,
312 pub total_entities: usize,
313}
314
315#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub enum ContractOrigin {
317 Explicit,
318 Implicit,
319 Migrated,
320}
321
322impl ContractOrigin {
323 pub fn as_str(self) -> &'static str {
324 match self {
325 Self::Explicit => "explicit",
326 Self::Implicit => "implicit",
327 Self::Migrated => "migrated",
328 }
329 }
330}
331
332#[derive(Debug, Clone)]
333pub struct DeclaredColumnContract {
334 pub name: String,
335 pub data_type: String,
336 pub sql_type: Option<crate::storage::schema::SqlTypeName>,
337 pub not_null: bool,
338 pub default: Option<String>,
339 pub compress: Option<u8>,
340 pub unique: bool,
341 pub primary_key: bool,
342 pub enum_variants: Vec<String>,
343 pub array_element: Option<String>,
344 pub decimal_precision: Option<u8>,
345}
346
347#[derive(Debug, Clone)]
348pub struct CollectionContract {
349 pub name: String,
350 pub declared_model: crate::catalog::CollectionModel,
351 pub schema_mode: crate::catalog::SchemaMode,
352 pub origin: ContractOrigin,
353 pub version: u32,
354 pub created_at_unix_ms: u128,
355 pub updated_at_unix_ms: u128,
356 pub default_ttl_ms: Option<u64>,
357 pub vector_dimension: Option<usize>,
358 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
359 pub context_index_fields: Vec<String>,
360 pub declared_columns: Vec<DeclaredColumnContract>,
361 pub table_def: Option<crate::storage::schema::TableDef>,
362 pub timestamps_enabled: bool,
368 pub context_index_enabled: bool,
380 pub metrics_raw_retention_ms: Option<u64>,
383 pub metrics_rollup_policies: Vec<String>,
385 pub metrics_tenant_identity: Option<String>,
388 pub metrics_namespace: Option<String>,
391 pub append_only: bool,
399 pub subscriptions: Vec<crate::catalog::SubscriptionDescriptor>,
402 pub analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
406 pub session_key: Option<String>,
411 pub session_gap_ms: Option<u64>,
417 pub retention_duration_ms: Option<u64>,
423}
424
425#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
438pub enum ArtifactState {
439 Declared,
441 Building,
443 Ready,
445 Disabled,
447 Stale,
449 Failed,
451 RequiresRebuild,
453}
454
455impl ArtifactState {
456 pub fn from_build_state(s: &str, enabled: bool) -> Self {
458 if !enabled {
459 return Self::Disabled;
460 }
461 match s {
462 "ready" => Self::Ready,
463 "building" | "catalog-derived" | "metadata-only" | "artifact-published"
464 | "registry-loaded" => Self::Building,
465 "stale" => Self::Stale,
466 "failed" => Self::Failed,
467 "requires_rebuild" | "requires-rebuild" => Self::RequiresRebuild,
468 _ => Self::Declared,
469 }
470 }
471
472 pub fn as_str(&self) -> &'static str {
474 match self {
475 Self::Declared => "declared",
476 Self::Building => "building",
477 Self::Ready => "ready",
478 Self::Disabled => "disabled",
479 Self::Stale => "stale",
480 Self::Failed => "failed",
481 Self::RequiresRebuild => "requires_rebuild",
482 }
483 }
484
485 pub fn is_queryable(&self) -> bool {
487 matches!(self, Self::Ready)
488 }
489
490 pub fn can_rebuild(&self) -> bool {
492 matches!(
493 self,
494 Self::Declared | Self::Stale | Self::Failed | Self::RequiresRebuild
495 )
496 }
497
498 pub fn needs_attention(&self) -> bool {
500 matches!(self, Self::Failed | Self::RequiresRebuild | Self::Stale)
501 }
502}
503
504impl std::fmt::Display for ArtifactState {
505 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506 f.write_str(self.as_str())
507 }
508}
509
510#[derive(Debug, Clone)]
511pub struct PhysicalIndexState {
512 pub name: String,
513 pub kind: IndexKind,
514 pub collection: Option<String>,
515 pub enabled: bool,
516 pub entries: usize,
517 pub estimated_memory_bytes: u64,
518 pub last_refresh_ms: Option<u128>,
519 pub backend: String,
520 pub artifact_kind: Option<String>,
521 pub artifact_root_page: Option<u32>,
522 pub artifact_checksum: Option<u64>,
523 pub build_state: String,
524}
525
526impl PhysicalIndexState {
527 pub fn artifact_state(&self) -> ArtifactState {
529 ArtifactState::from_build_state(&self.build_state, self.enabled)
530 }
531}
532
533#[derive(Debug, Clone)]
534pub struct ExportDescriptor {
535 pub name: String,
536 pub created_at_unix_ms: u128,
537 pub snapshot_id: Option<u64>,
538 pub superblock_sequence: u64,
539 pub data_path: String,
540 pub metadata_path: String,
541 pub collection_count: usize,
542 pub total_entities: usize,
543}
544
545#[derive(Debug, Clone)]
546pub struct PhysicalGraphProjection {
547 pub name: String,
548 pub created_at_unix_ms: u128,
549 pub updated_at_unix_ms: u128,
550 pub state: String,
551 pub source: String,
552 pub node_labels: Vec<String>,
553 pub node_types: Vec<String>,
554 pub edge_labels: Vec<String>,
555 pub last_materialized_sequence: Option<u64>,
556}
557
558#[derive(Debug, Clone)]
559pub struct PhysicalAnalyticsJob {
560 pub id: String,
561 pub kind: String,
562 pub state: String,
563 pub projection: Option<String>,
564 pub created_at_unix_ms: u128,
565 pub updated_at_unix_ms: u128,
566 pub last_run_sequence: Option<u64>,
567 pub metadata: BTreeMap<String, String>,
568}
569
570#[derive(Debug, Clone)]
571pub struct PhysicalTreeDefinition {
572 pub collection: String,
573 pub name: String,
574 pub root_id: u64,
575 pub default_max_children: usize,
576 pub ordered_children: bool,
577 pub ownership: String,
578 pub auto_fix_mode: String,
579 pub created_at_unix_ms: u128,
580 pub updated_at_unix_ms: u128,
581}
582
583#[derive(Debug, Clone)]
589pub struct PhysicalHypertableChunk {
590 pub start_ns: u64,
591 pub end_ns_exclusive: u64,
592 pub row_count: u64,
593 pub min_ts_ns: u64,
594 pub max_ts_ns: u64,
595 pub sealed: bool,
596 pub ttl_override_ns: Option<u64>,
597}
598
599#[derive(Debug, Clone)]
605pub struct PhysicalHypertable {
606 pub name: String,
607 pub time_column: String,
608 pub chunk_interval_ns: u64,
609 pub default_ttl_ns: Option<u64>,
610 pub chunks: Vec<PhysicalHypertableChunk>,
611}
612
613#[derive(Debug, Clone)]
614pub struct PhysicalMetadataFile {
615 pub protocol_version: String,
616 pub generated_at_unix_ms: u128,
617 pub last_loaded_from: Option<String>,
618 pub last_healed_at_unix_ms: Option<u128>,
619 pub manifest: SchemaManifest,
620 pub catalog: CatalogSnapshot,
621 pub manifest_events: Vec<ManifestEvent>,
622 pub indexes: Vec<PhysicalIndexState>,
623 pub graph_projections: Vec<PhysicalGraphProjection>,
624 pub analytics_jobs: Vec<PhysicalAnalyticsJob>,
625 pub tree_definitions: Vec<PhysicalTreeDefinition>,
626 pub collection_ttl_defaults_ms: BTreeMap<String, u64>,
627 pub collection_contracts: Vec<CollectionContract>,
628 pub hypertables: Vec<PhysicalHypertable>,
632 pub exports: Vec<ExportDescriptor>,
633 pub superblock: SuperblockHeader,
634 pub snapshots: Vec<SnapshotDescriptor>,
635}
636
637mod helpers;
638mod json_codec;
639mod metadata_file;
640pub mod shm;
641
642pub use self::shm::{
643 provision_shm, read_shm_header, set_shm_provisioning_enabled, shm_path_for,
644 shm_provisioning_enabled, ShmHandle, ShmHeader, ShmProvisionState, SHM_FILE_SIZE,
645 SHM_HEADER_SIZE, SHM_MAGIC, SHM_VERSION,
646};
647
648use self::helpers::*;
649use self::json_codec::*;