1use std::collections::BTreeMap;
4use std::fs;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{
10 CatalogSnapshot, CollectionStats, RedDBOptions, SchemaManifest, StorageMode,
11 REDDB_FORMAT_VERSION,
12};
13use crate::index::IndexKind;
14use crate::json::{from_slice, parse_json, to_vec};
15use crate::serde_json::{Map, Value as JsonValue};
16
17pub const DEFAULT_GRID_BLOCK_SIZE: usize = 512 * 1024;
18pub const DEFAULT_PAGE_SIZE: usize = 4096;
19pub const DEFAULT_SUPERBLOCK_COPIES: u8 = 4;
20pub const PHYSICAL_METADATA_PROTOCOL_VERSION: &str = "reddb-physical-v1";
21pub const PHYSICAL_METADATA_BINARY_EXTENSION: &str = "meta.rdbx";
22pub const DEFAULT_MANIFEST_EVENT_HISTORY: usize = 256;
23pub const DEFAULT_METADATA_JOURNAL_RETENTION: usize = 32;
26pub const OPT_IN_METADATA_JOURNAL_RETENTION: usize = 4;
29
30use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
31
32static META_JSON_SIDECAR_POLICY: AtomicU8 = AtomicU8::new(0);
39
40pub fn set_meta_json_sidecar_enabled(enabled: bool) {
43 META_JSON_SIDECAR_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
44}
45
46pub fn meta_json_sidecar_enabled() -> bool {
51 match META_JSON_SIDECAR_POLICY.load(Ordering::Relaxed) {
52 1 => true,
53 2 => false,
54 _ => std::env::var("REDDB_META_JSON_SIDECAR")
55 .ok()
56 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
57 .unwrap_or(false),
58 }
59}
60
61static SEQN_JOURNAL_POLICY: AtomicU8 = AtomicU8::new(0);
67static SEQN_JOURNAL_RETENTION: AtomicUsize = AtomicUsize::new(0);
69
70pub fn set_seqn_journal_enabled(enabled: bool) {
75 SEQN_JOURNAL_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
76}
77
78pub fn seqn_journal_enabled() -> bool {
80 match SEQN_JOURNAL_POLICY.load(Ordering::Relaxed) {
81 1 => true,
82 2 => false,
83 _ => std::env::var("REDDB_SEQN_JOURNAL")
84 .ok()
85 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
86 .unwrap_or(false),
87 }
88}
89
90static FOLD_PAGER_META_POLICY: AtomicU8 = AtomicU8::new(0);
96
97pub fn set_fold_pager_meta_enabled(enabled: bool) {
102 FOLD_PAGER_META_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
103}
104
105pub fn fold_pager_meta_enabled() -> bool {
109 match FOLD_PAGER_META_POLICY.load(Ordering::Relaxed) {
110 1 => true,
111 2 => false,
112 _ => std::env::var("REDDB_FOLD_PAGER_META")
113 .ok()
114 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
115 .unwrap_or(false),
116 }
117}
118
119static FOLD_DWB_INTO_WAL_POLICY: AtomicU8 = AtomicU8::new(0);
125
126pub fn set_fold_dwb_into_wal_enabled(enabled: bool) {
131 FOLD_DWB_INTO_WAL_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
132}
133
134pub fn fold_dwb_into_wal_enabled() -> bool {
138 match FOLD_DWB_INTO_WAL_POLICY.load(Ordering::Relaxed) {
139 1 => true,
140 2 => false,
141 _ => std::env::var("REDDB_FOLD_DWB_INTO_WAL")
142 .ok()
143 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
144 .unwrap_or(false),
145 }
146}
147
148pub fn set_seqn_journal_retention(retention: usize) {
153 SEQN_JOURNAL_RETENTION.store(retention, Ordering::Relaxed);
154}
155
156pub fn seqn_journal_retention() -> usize {
160 let stored = SEQN_JOURNAL_RETENTION.load(Ordering::Relaxed);
161 if stored > 0 {
162 return stored;
163 }
164 std::env::var("REDDB_SEQN_JOURNAL_RETENTION")
165 .ok()
166 .and_then(|v| v.parse::<usize>().ok())
167 .filter(|v| *v > 0)
168 .unwrap_or(OPT_IN_METADATA_JOURNAL_RETENTION)
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum PhysicalMetadataSource {
173 Binary,
174 BinaryJournal,
175 Json,
176}
177
178impl PhysicalMetadataSource {
179 pub fn as_str(self) -> &'static str {
180 match self {
181 Self::Binary => "binary",
182 Self::BinaryJournal => "binary_journal",
183 Self::Json => "json",
184 }
185 }
186}
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
188pub struct BlockReference {
189 pub index: u64,
190 pub checksum: u128,
191}
192
193#[derive(Debug, Clone, Default)]
194pub struct ManifestPointers {
195 pub oldest: BlockReference,
196 pub newest: BlockReference,
197}
198
199#[derive(Debug, Clone)]
200pub struct SuperblockHeader {
201 pub format_version: u32,
202 pub sequence: u64,
203 pub copies: u8,
204 pub manifest: ManifestPointers,
205 pub free_set: BlockReference,
206 pub collection_roots: BTreeMap<String, u64>,
207}
208
209impl Default for SuperblockHeader {
210 fn default() -> Self {
211 Self {
212 format_version: crate::api::REDDB_FORMAT_VERSION,
213 sequence: 0,
214 copies: DEFAULT_SUPERBLOCK_COPIES,
215 manifest: ManifestPointers::default(),
216 free_set: BlockReference::default(),
217 collection_roots: BTreeMap::new(),
218 }
219 }
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub enum ManifestEventKind {
224 Insert,
225 Update,
226 Remove,
227 Checkpoint,
228}
229
230#[derive(Debug, Clone)]
231pub struct ManifestEvent {
232 pub collection: String,
233 pub object_key: String,
234 pub kind: ManifestEventKind,
235 pub block: BlockReference,
236 pub snapshot_min: u64,
237 pub snapshot_max: Option<u64>,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum CompactionPolicy {
242 Incremental,
243 Manual,
244}
245
246#[derive(Debug, Clone)]
247pub struct WalPolicy {
248 pub auto_checkpoint_pages: u32,
249 pub fsync_on_commit: bool,
250 pub ring_buffer_bytes: u64,
251}
252
253impl Default for WalPolicy {
254 fn default() -> Self {
255 Self {
256 auto_checkpoint_pages: 1000,
257 fsync_on_commit: true,
258 ring_buffer_bytes: 64 * 1024 * 1024,
259 }
260 }
261}
262
263#[derive(Debug, Clone)]
264pub struct GridLayout {
265 pub block_size: usize,
266 pub page_size: usize,
267 pub superblock_copies: u8,
268}
269
270impl Default for GridLayout {
271 fn default() -> Self {
272 Self {
273 block_size: DEFAULT_GRID_BLOCK_SIZE,
274 page_size: DEFAULT_PAGE_SIZE,
275 superblock_copies: DEFAULT_SUPERBLOCK_COPIES,
276 }
277 }
278}
279
280#[derive(Debug, Clone)]
281pub struct PhysicalLayout {
282 pub mode: StorageMode,
283 pub grid: GridLayout,
284 pub wal: WalPolicy,
285 pub compaction: CompactionPolicy,
286}
287
288impl PhysicalLayout {
289 pub fn from_options(options: &RedDBOptions) -> Self {
290 Self {
291 mode: options.mode,
292 grid: GridLayout::default(),
293 wal: WalPolicy {
294 auto_checkpoint_pages: options.auto_checkpoint_pages,
295 ..WalPolicy::default()
296 },
297 compaction: CompactionPolicy::Incremental,
298 }
299 }
300
301 pub fn is_persistent(&self) -> bool {
302 self.mode == StorageMode::Persistent
303 }
304}
305
306#[derive(Debug, Clone, Default)]
307pub struct SnapshotDescriptor {
308 pub snapshot_id: u64,
309 pub created_at_unix_ms: u128,
310 pub superblock_sequence: u64,
311 pub collection_count: usize,
312 pub total_entities: usize,
313}
314
315#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub enum ContractOrigin {
317 Explicit,
318 Implicit,
319 Migrated,
320}
321
322impl ContractOrigin {
323 pub fn as_str(self) -> &'static str {
324 match self {
325 Self::Explicit => "explicit",
326 Self::Implicit => "implicit",
327 Self::Migrated => "migrated",
328 }
329 }
330}
331
332#[derive(Debug, Clone)]
333pub struct DeclaredColumnContract {
334 pub name: String,
335 pub data_type: String,
336 pub sql_type: Option<crate::storage::schema::SqlTypeName>,
337 pub not_null: bool,
338 pub default: Option<String>,
339 pub compress: Option<u8>,
340 pub unique: bool,
341 pub primary_key: bool,
342 pub enum_variants: Vec<String>,
343 pub array_element: Option<String>,
344 pub decimal_precision: Option<u8>,
345}
346
347#[derive(Debug, Clone)]
348pub struct CollectionContract {
349 pub name: String,
350 pub declared_model: crate::catalog::CollectionModel,
351 pub schema_mode: crate::catalog::SchemaMode,
352 pub origin: ContractOrigin,
353 pub version: u32,
354 pub created_at_unix_ms: u128,
355 pub updated_at_unix_ms: u128,
356 pub default_ttl_ms: Option<u64>,
357 pub vector_dimension: Option<usize>,
358 pub vector_metric: Option<crate::storage::engine::distance::DistanceMetric>,
359 pub context_index_fields: Vec<String>,
360 pub declared_columns: Vec<DeclaredColumnContract>,
361 pub table_def: Option<crate::storage::schema::TableDef>,
362 pub timestamps_enabled: bool,
368 pub context_index_enabled: bool,
380 pub metrics_raw_retention_ms: Option<u64>,
383 pub metrics_rollup_policies: Vec<String>,
385 pub metrics_tenant_identity: Option<String>,
388 pub metrics_namespace: Option<String>,
391 pub append_only: bool,
399 pub subscriptions: Vec<crate::catalog::SubscriptionDescriptor>,
402 pub session_key: Option<String>,
407 pub session_gap_ms: Option<u64>,
413 pub retention_duration_ms: Option<u64>,
419}
420
421#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
434pub enum ArtifactState {
435 Declared,
437 Building,
439 Ready,
441 Disabled,
443 Stale,
445 Failed,
447 RequiresRebuild,
449}
450
451impl ArtifactState {
452 pub fn from_build_state(s: &str, enabled: bool) -> Self {
454 if !enabled {
455 return Self::Disabled;
456 }
457 match s {
458 "ready" => Self::Ready,
459 "building" | "catalog-derived" | "metadata-only" | "artifact-published"
460 | "registry-loaded" => Self::Building,
461 "stale" => Self::Stale,
462 "failed" => Self::Failed,
463 "requires_rebuild" | "requires-rebuild" => Self::RequiresRebuild,
464 _ => Self::Declared,
465 }
466 }
467
468 pub fn as_str(&self) -> &'static str {
470 match self {
471 Self::Declared => "declared",
472 Self::Building => "building",
473 Self::Ready => "ready",
474 Self::Disabled => "disabled",
475 Self::Stale => "stale",
476 Self::Failed => "failed",
477 Self::RequiresRebuild => "requires_rebuild",
478 }
479 }
480
481 pub fn is_queryable(&self) -> bool {
483 matches!(self, Self::Ready)
484 }
485
486 pub fn can_rebuild(&self) -> bool {
488 matches!(
489 self,
490 Self::Declared | Self::Stale | Self::Failed | Self::RequiresRebuild
491 )
492 }
493
494 pub fn needs_attention(&self) -> bool {
496 matches!(self, Self::Failed | Self::RequiresRebuild | Self::Stale)
497 }
498}
499
500impl std::fmt::Display for ArtifactState {
501 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
502 f.write_str(self.as_str())
503 }
504}
505
506#[derive(Debug, Clone)]
507pub struct PhysicalIndexState {
508 pub name: String,
509 pub kind: IndexKind,
510 pub collection: Option<String>,
511 pub enabled: bool,
512 pub entries: usize,
513 pub estimated_memory_bytes: u64,
514 pub last_refresh_ms: Option<u128>,
515 pub backend: String,
516 pub artifact_kind: Option<String>,
517 pub artifact_root_page: Option<u32>,
518 pub artifact_checksum: Option<u64>,
519 pub build_state: String,
520}
521
522impl PhysicalIndexState {
523 pub fn artifact_state(&self) -> ArtifactState {
525 ArtifactState::from_build_state(&self.build_state, self.enabled)
526 }
527}
528
529#[derive(Debug, Clone)]
530pub struct ExportDescriptor {
531 pub name: String,
532 pub created_at_unix_ms: u128,
533 pub snapshot_id: Option<u64>,
534 pub superblock_sequence: u64,
535 pub data_path: String,
536 pub metadata_path: String,
537 pub collection_count: usize,
538 pub total_entities: usize,
539}
540
541#[derive(Debug, Clone)]
542pub struct PhysicalGraphProjection {
543 pub name: String,
544 pub created_at_unix_ms: u128,
545 pub updated_at_unix_ms: u128,
546 pub state: String,
547 pub source: String,
548 pub node_labels: Vec<String>,
549 pub node_types: Vec<String>,
550 pub edge_labels: Vec<String>,
551 pub last_materialized_sequence: Option<u64>,
552}
553
554#[derive(Debug, Clone)]
555pub struct PhysicalAnalyticsJob {
556 pub id: String,
557 pub kind: String,
558 pub state: String,
559 pub projection: Option<String>,
560 pub created_at_unix_ms: u128,
561 pub updated_at_unix_ms: u128,
562 pub last_run_sequence: Option<u64>,
563 pub metadata: BTreeMap<String, String>,
564}
565
566#[derive(Debug, Clone)]
567pub struct PhysicalTreeDefinition {
568 pub collection: String,
569 pub name: String,
570 pub root_id: u64,
571 pub default_max_children: usize,
572 pub ordered_children: bool,
573 pub ownership: String,
574 pub auto_fix_mode: String,
575 pub created_at_unix_ms: u128,
576 pub updated_at_unix_ms: u128,
577}
578
579#[derive(Debug, Clone)]
580pub struct PhysicalMetadataFile {
581 pub protocol_version: String,
582 pub generated_at_unix_ms: u128,
583 pub last_loaded_from: Option<String>,
584 pub last_healed_at_unix_ms: Option<u128>,
585 pub manifest: SchemaManifest,
586 pub catalog: CatalogSnapshot,
587 pub manifest_events: Vec<ManifestEvent>,
588 pub indexes: Vec<PhysicalIndexState>,
589 pub graph_projections: Vec<PhysicalGraphProjection>,
590 pub analytics_jobs: Vec<PhysicalAnalyticsJob>,
591 pub tree_definitions: Vec<PhysicalTreeDefinition>,
592 pub collection_ttl_defaults_ms: BTreeMap<String, u64>,
593 pub collection_contracts: Vec<CollectionContract>,
594 pub exports: Vec<ExportDescriptor>,
595 pub superblock: SuperblockHeader,
596 pub snapshots: Vec<SnapshotDescriptor>,
597}
598
599mod helpers;
600mod json_codec;
601mod metadata_file;
602pub mod shm;
603
604pub use self::shm::{
605 provision_shm, read_shm_header, set_shm_provisioning_enabled, shm_path_for,
606 shm_provisioning_enabled, ShmHandle, ShmHeader, ShmProvisionState, SHM_FILE_SIZE,
607 SHM_HEADER_SIZE, SHM_MAGIC, SHM_VERSION,
608};
609
610use self::helpers::*;
611use self::json_codec::*;