Skip to main content

sochdb_storage/
durability_contract.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Durability Contract Hardening
19//!
20//! This module formalizes and enforces durability guarantees for the storage layer.
21//! It ensures that:
22//! - Any mode that allows data loss is explicitly labeled and requires opt-in
23//! - The default mode is always fsync-after-commit (ACID D)
24//! - ARIES recovery invariants are mechanically enforced
25//! - WAL archiving hooks are available for PITR
26//!
27//! ## Durability Levels
28//!
29//! - `Durable`: fsync after commit (default, safe)
30//! - `GroupCommit`: fsync batched, bounded latency (production)
31//! - `Periodic`: fsync on timer (UNSAFE - labeled clearly)
32//! - `NoSync`: no fsync (UNSAFE - for testing only)
33//!
34//! ## ARIES Recovery Contract
35//!
36//! 1. **Write-Ahead Logging**: No page written to disk before its log record
37//! 2. **Force on Commit**: All log records for a txn forced before commit returns
38//! 3. **Steal**: Buffer pool can evict dirty pages before commit (with WAL guarantee)
39//! 4. **Recovery Phases**: Analysis → Redo → Undo (with CLRs for nested undo)
40
41use std::fmt;
42use std::path::PathBuf;
43use std::time::Duration;
44
45/// Durability level for write operations
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum DurabilityLevel {
48    /// Every commit is immediately fsync'd (safest, slowest)
49    /// Guarantees: commit returns ⇒ data is on stable storage
50    Durable,
51
52    /// Commits are batched and fsync'd together (production default)
53    /// Guarantees: commit returns ⇒ data will be on stable storage within flush_interval
54    /// Trade-off: Up to flush_interval ms of data loss on crash
55    GroupCommit {
56        /// Maximum number of commits to batch
57        max_batch: usize,
58        /// Maximum time to wait before flush
59        flush_interval: Duration,
60    },
61
62    /// Fsync on timer (UNSAFE - data loss possible)
63    /// WARNING: Up to sync_interval of committed transactions can be lost
64    Periodic {
65        /// Sync interval
66        sync_interval: Duration,
67        /// Acknowledgement that this is unsafe
68        accept_data_loss_risk: bool,
69    },
70
71    /// No fsync (UNSAFE - testing only)
72    /// WARNING: ANY committed data can be lost on crash
73    NoSync {
74        /// Acknowledgement that this is unsafe
75        accept_data_loss_risk: bool,
76    },
77}
78
79impl DurabilityLevel {
80    /// Check if this level is safe for production
81    pub fn is_production_safe(&self) -> bool {
82        matches!(
83            self,
84            DurabilityLevel::Durable | DurabilityLevel::GroupCommit { .. }
85        )
86    }
87
88    /// Check if explicit risk acceptance is required
89    pub fn requires_risk_acceptance(&self) -> bool {
90        matches!(
91            self,
92            DurabilityLevel::Periodic { .. } | DurabilityLevel::NoSync { .. }
93        )
94    }
95
96    /// Get the default production-safe configuration
97    pub fn default_production() -> Self {
98        DurabilityLevel::GroupCommit {
99            max_batch: 128,
100            flush_interval: Duration::from_millis(10),
101        }
102    }
103
104    /// Get the safest configuration
105    pub fn safest() -> Self {
106        DurabilityLevel::Durable
107    }
108}
109
110impl Default for DurabilityLevel {
111    fn default() -> Self {
112        Self::default_production()
113    }
114}
115
116impl fmt::Display for DurabilityLevel {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        match self {
119            DurabilityLevel::Durable => write!(f, "Durable (fsync per commit)"),
120            DurabilityLevel::GroupCommit {
121                max_batch,
122                flush_interval,
123            } => {
124                write!(
125                    f,
126                    "GroupCommit (batch={}, interval={:?})",
127                    max_batch, flush_interval
128                )
129            }
130            DurabilityLevel::Periodic { sync_interval, .. } => {
131                write!(f, "UNSAFE:Periodic (interval={:?})", sync_interval)
132            }
133            DurabilityLevel::NoSync { .. } => {
134                write!(f, "UNSAFE:NoSync (testing only)")
135            }
136        }
137    }
138}
139
140/// Durability contract validation error
141#[derive(Debug, Clone)]
142pub struct DurabilityContractError {
143    pub message: String,
144    pub contract_violated: &'static str,
145}
146
147impl fmt::Display for DurabilityContractError {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        write!(
150            f,
151            "Durability contract '{}' violated: {}",
152            self.contract_violated, self.message
153        )
154    }
155}
156
157impl std::error::Error for DurabilityContractError {}
158
159/// WAL archiving configuration for PITR
160#[derive(Debug, Clone)]
161pub struct WalArchiveConfig {
162    /// Enable WAL archiving
163    pub enabled: bool,
164    /// Archive destination path (local) or command
165    pub destination: WalArchiveDestination,
166    /// Archive trigger: segment full or time-based
167    pub trigger: WalArchiveTrigger,
168    /// Compress archived segments
169    pub compress: bool,
170    /// Verify archived segments (read back and check)
171    pub verify: bool,
172}
173
174impl Default for WalArchiveConfig {
175    fn default() -> Self {
176        Self {
177            enabled: false,
178            destination: WalArchiveDestination::LocalPath(PathBuf::from(
179                "/var/lib/sochdb/wal_archive",
180            )),
181            trigger: WalArchiveTrigger::SegmentFull,
182            compress: true,
183            verify: true,
184        }
185    }
186}
187
188/// WAL archive destination
189#[derive(Debug, Clone)]
190pub enum WalArchiveDestination {
191    /// Local filesystem path
192    LocalPath(PathBuf),
193    /// External command (receives segment path as argument)
194    ExternalCommand(String),
195    /// S3-compatible object storage (bucket URL)
196    ObjectStorage {
197        endpoint: String,
198        bucket: String,
199        prefix: String,
200    },
201}
202
203/// When to trigger WAL archiving
204#[derive(Debug, Clone, Copy)]
205pub enum WalArchiveTrigger {
206    /// Archive when segment is full (default)
207    SegmentFull,
208    /// Archive on timer (may archive partial segments)
209    TimeBased(Duration),
210    /// Archive on both conditions (whichever first)
211    Both { max_interval: Duration },
212}
213
214/// Checkpoint configuration for avoiding long write stalls
215#[derive(Debug, Clone)]
216pub struct CheckpointConfig {
217    /// Checkpoint mode
218    pub mode: CheckpointMode,
219    /// Maximum time allowed for checkpoint
220    pub max_duration: Duration,
221    /// Trigger checkpoint at this WAL size
222    pub wal_size_trigger: u64,
223    /// Trigger checkpoint at this many transactions
224    pub txn_count_trigger: u64,
225}
226
227impl Default for CheckpointConfig {
228    fn default() -> Self {
229        Self {
230            mode: CheckpointMode::Incremental,
231            max_duration: Duration::from_secs(30),
232            wal_size_trigger: 128 * 1024 * 1024, // 128MB
233            txn_count_trigger: 100_000,
234        }
235    }
236}
237
238/// Checkpoint mode
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
240pub enum CheckpointMode {
241    /// Full checkpoint (write all dirty pages) - can cause stalls
242    Full,
243    /// Incremental checkpoint (spread writes over time)
244    Incremental,
245    /// Copy-on-write snapshot (no stalls, more space)
246    CopyOnWrite,
247}
248
249/// Durability contract that enforces ARIES invariants
250pub struct DurabilityContract {
251    /// Configured durability level
252    pub level: DurabilityLevel,
253    /// WAL archive configuration
254    pub archive: WalArchiveConfig,
255    /// Checkpoint configuration
256    pub checkpoint: CheckpointConfig,
257    /// Require WAL before page flush (invariant)
258    wal_before_page: bool,
259    /// Force log on commit (invariant)
260    force_on_commit: bool,
261}
262
263impl DurabilityContract {
264    /// Create a new durability contract with safe defaults
265    pub fn new(level: DurabilityLevel) -> Result<Self, DurabilityContractError> {
266        // Validate unsafe levels have risk acceptance
267        match &level {
268            DurabilityLevel::Periodic {
269                accept_data_loss_risk,
270                ..
271            }
272            | DurabilityLevel::NoSync {
273                accept_data_loss_risk,
274                ..
275            } => {
276                if !accept_data_loss_risk {
277                    return Err(DurabilityContractError {
278                        message: format!(
279                            "Unsafe durability level '{}' requires explicit accept_data_loss_risk=true",
280                            level
281                        ),
282                        contract_violated: "EXPLICIT_RISK_ACCEPTANCE",
283                    });
284                }
285            }
286            _ => {}
287        }
288
289        Ok(Self {
290            level,
291            archive: WalArchiveConfig::default(),
292            checkpoint: CheckpointConfig::default(),
293            wal_before_page: true,
294            force_on_commit: true,
295        })
296    }
297
298    /// Create with production defaults
299    pub fn production() -> Self {
300        Self {
301            level: DurabilityLevel::default_production(),
302            archive: WalArchiveConfig::default(),
303            checkpoint: CheckpointConfig::default(),
304            wal_before_page: true,
305            force_on_commit: true,
306        }
307    }
308
309    /// Enable WAL archiving
310    pub fn with_archive(mut self, config: WalArchiveConfig) -> Self {
311        self.archive = config;
312        self
313    }
314
315    /// Configure checkpointing
316    pub fn with_checkpoint(mut self, config: CheckpointConfig) -> Self {
317        self.checkpoint = config;
318        self
319    }
320
321    /// Validate that a page flush respects WAL protocol
322    ///
323    /// ARIES invariant: Page cannot be flushed until its log record is durable
324    pub fn validate_page_flush(
325        &self,
326        page_lsn: u64,
327        flushed_lsn: u64,
328    ) -> Result<(), DurabilityContractError> {
329        if self.wal_before_page && page_lsn > flushed_lsn {
330            return Err(DurabilityContractError {
331                message: format!(
332                    "Page LSN {} exceeds flushed WAL LSN {} - would violate WAL protocol",
333                    page_lsn, flushed_lsn
334                ),
335                contract_violated: "WAL_BEFORE_PAGE",
336            });
337        }
338        Ok(())
339    }
340
341    /// Validate that a commit respects force-on-commit
342    ///
343    /// ARIES invariant: Commit record must be durable before returning
344    pub fn validate_commit(
345        &self,
346        commit_lsn: u64,
347        flushed_lsn: u64,
348    ) -> Result<(), DurabilityContractError> {
349        if self.force_on_commit && commit_lsn > flushed_lsn {
350            return Err(DurabilityContractError {
351                message: format!(
352                    "Commit LSN {} not yet flushed (flushed: {}) - would violate force-on-commit",
353                    commit_lsn, flushed_lsn
354                ),
355                contract_violated: "FORCE_ON_COMMIT",
356            });
357        }
358        Ok(())
359    }
360
361    /// Get maximum data loss window based on durability level
362    pub fn max_data_loss_window(&self) -> Option<Duration> {
363        match &self.level {
364            DurabilityLevel::Durable => None, // No data loss possible
365            DurabilityLevel::GroupCommit { flush_interval, .. } => Some(*flush_interval),
366            DurabilityLevel::Periodic { sync_interval, .. } => Some(*sync_interval),
367            DurabilityLevel::NoSync { .. } => None, // Unbounded - all data at risk
368        }
369    }
370
371    /// Check if this contract is suitable for production
372    pub fn is_production_ready(&self) -> bool {
373        self.level.is_production_safe() && self.wal_before_page && self.force_on_commit
374    }
375
376    /// Generate a human-readable description of guarantees
377    pub fn describe_guarantees(&self) -> String {
378        let mut desc = Vec::new();
379
380        desc.push(format!("Durability: {}", self.level));
381
382        if self.wal_before_page {
383            desc.push("WAL-before-page: ENFORCED".to_string());
384        } else {
385            desc.push("WAL-before-page: DISABLED (UNSAFE)".to_string());
386        }
387
388        if self.force_on_commit {
389            desc.push("Force-on-commit: ENFORCED".to_string());
390        } else {
391            desc.push("Force-on-commit: DISABLED (UNSAFE)".to_string());
392        }
393
394        if let Some(window) = self.max_data_loss_window() {
395            desc.push(format!("Max data loss window: {:?}", window));
396        } else if matches!(self.level, DurabilityLevel::NoSync { .. }) {
397            desc.push("Max data loss window: UNBOUNDED (all data at risk)".to_string());
398        } else {
399            desc.push("Max data loss window: None (fully durable)".to_string());
400        }
401
402        if self.archive.enabled {
403            desc.push("WAL archiving: ENABLED".to_string());
404        }
405
406        match self.checkpoint.mode {
407            CheckpointMode::Full => desc.push("Checkpoint: Full (may cause stalls)".to_string()),
408            CheckpointMode::Incremental => desc.push("Checkpoint: Incremental".to_string()),
409            CheckpointMode::CopyOnWrite => desc.push("Checkpoint: Copy-on-write".to_string()),
410        }
411
412        desc.join("\n")
413    }
414}
415
416impl Default for DurabilityContract {
417    fn default() -> Self {
418        Self::production()
419    }
420}
421
422/// Recovery point information for PITR
423#[derive(Debug, Clone)]
424pub struct RecoveryPoint {
425    /// LSN of this recovery point
426    pub lsn: u64,
427    /// Timestamp of this recovery point
428    pub timestamp: u64,
429    /// Checkpoint ID if this is a checkpoint
430    pub checkpoint_id: Option<u64>,
431    /// Description
432    pub description: String,
433}
434
435/// WAL segment metadata for archiving
436#[derive(Debug, Clone)]
437pub struct WalSegmentMetadata {
438    /// Segment number
439    pub segment_number: u64,
440    /// Start LSN
441    pub start_lsn: u64,
442    /// End LSN
443    pub end_lsn: u64,
444    /// Size in bytes
445    pub size_bytes: u64,
446    /// CRC32 checksum
447    pub checksum: u32,
448    /// Is segment complete (closed)
449    pub is_complete: bool,
450    /// Archive status
451    pub archive_status: ArchiveStatus,
452}
453
454/// Archive status for a WAL segment
455#[derive(Debug, Clone, Copy, PartialEq, Eq)]
456pub enum ArchiveStatus {
457    /// Not yet archived
458    Pending,
459    /// Archive in progress
460    InProgress,
461    /// Successfully archived
462    Archived,
463    /// Archive failed
464    Failed,
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn test_durability_level_defaults() {
473        let level = DurabilityLevel::default();
474        assert!(level.is_production_safe());
475        assert!(!level.requires_risk_acceptance());
476    }
477
478    #[test]
479    fn test_unsafe_requires_acknowledgement() {
480        let result = DurabilityContract::new(DurabilityLevel::NoSync {
481            accept_data_loss_risk: false,
482        });
483        assert!(result.is_err());
484
485        let result = DurabilityContract::new(DurabilityLevel::NoSync {
486            accept_data_loss_risk: true,
487        });
488        assert!(result.is_ok());
489    }
490
491    #[test]
492    fn test_wal_before_page_validation() {
493        let contract = DurabilityContract::production();
494
495        // Valid: page LSN <= flushed LSN
496        assert!(contract.validate_page_flush(100, 100).is_ok());
497        assert!(contract.validate_page_flush(50, 100).is_ok());
498
499        // Invalid: page LSN > flushed LSN
500        assert!(contract.validate_page_flush(150, 100).is_err());
501    }
502
503    #[test]
504    fn test_force_on_commit_validation() {
505        let contract = DurabilityContract::production();
506
507        // Valid: commit LSN <= flushed LSN
508        assert!(contract.validate_commit(100, 100).is_ok());
509        assert!(contract.validate_commit(50, 100).is_ok());
510
511        // Invalid: commit LSN > flushed LSN
512        assert!(contract.validate_commit(150, 100).is_err());
513    }
514
515    #[test]
516    fn test_data_loss_window() {
517        let durable = DurabilityContract::new(DurabilityLevel::Durable).unwrap();
518        assert!(durable.max_data_loss_window().is_none());
519
520        let group = DurabilityContract::new(DurabilityLevel::GroupCommit {
521            max_batch: 100,
522            flush_interval: Duration::from_millis(10),
523        })
524        .unwrap();
525        assert_eq!(
526            group.max_data_loss_window(),
527            Some(Duration::from_millis(10))
528        );
529    }
530
531    #[test]
532    fn test_production_ready_check() {
533        let contract = DurabilityContract::production();
534        assert!(contract.is_production_ready());
535
536        let unsafe_contract = DurabilityContract::new(DurabilityLevel::NoSync {
537            accept_data_loss_risk: true,
538        })
539        .unwrap();
540        assert!(!unsafe_contract.is_production_ready());
541    }
542}