Skip to main content

sochdb_storage/
durability_contract.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Durability Contract Hardening
19//!
20//! This module formalizes and enforces durability guarantees for the storage layer.
21//! It ensures that:
22//! - Any mode that allows data loss is explicitly labeled and requires opt-in
23//! - The default mode is always fsync-after-commit (ACID D)
24//! - ARIES recovery invariants are mechanically enforced
25//! - WAL archiving hooks are available for PITR
26//!
27//! ## Durability Levels
28//!
29//! - `Durable`: fsync after commit (default, safe)
30//! - `GroupCommit`: fsync batched, bounded latency (production)
31//! - `Periodic`: fsync on timer (UNSAFE - labeled clearly)
32//! - `NoSync`: no fsync (UNSAFE - for testing only)
33//!
34//! ## ARIES Recovery Contract
35//!
36//! 1. **Write-Ahead Logging**: No page written to disk before its log record
37//! 2. **Force on Commit**: All log records for a txn forced before commit returns
38//! 3. **Steal**: Buffer pool can evict dirty pages before commit (with WAL guarantee)
39//! 4. **Recovery Phases**: Analysis → Redo → Undo (with CLRs for nested undo)
40
41use std::fmt;
42use std::path::{Path, PathBuf};
43use std::time::Duration;
44
45/// Durability level for write operations
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum DurabilityLevel {
48    /// Every commit is immediately fsync'd (safest, slowest)
49    /// Guarantees: commit returns ⇒ data is on stable storage
50    Durable,
51
52    /// Commits are batched and fsync'd together (production default)
53    /// Guarantees: commit returns ⇒ data will be on stable storage within flush_interval
54    /// Trade-off: Up to flush_interval ms of data loss on crash
55    GroupCommit {
56        /// Maximum number of commits to batch
57        max_batch: usize,
58        /// Maximum time to wait before flush
59        flush_interval: Duration,
60    },
61
62    /// Fsync on timer (UNSAFE - data loss possible)
63    /// WARNING: Up to sync_interval of committed transactions can be lost
64    Periodic {
65        /// Sync interval
66        sync_interval: Duration,
67        /// Acknowledgement that this is unsafe
68        accept_data_loss_risk: bool,
69    },
70
71    /// No fsync (UNSAFE - testing only)
72    /// WARNING: ANY committed data can be lost on crash
73    NoSync {
74        /// Acknowledgement that this is unsafe
75        accept_data_loss_risk: bool,
76    },
77}
78
79impl DurabilityLevel {
80    /// Check if this level is safe for production
81    pub fn is_production_safe(&self) -> bool {
82        matches!(self, DurabilityLevel::Durable | DurabilityLevel::GroupCommit { .. })
83    }
84
85    /// Check if explicit risk acceptance is required
86    pub fn requires_risk_acceptance(&self) -> bool {
87        matches!(self, DurabilityLevel::Periodic { .. } | DurabilityLevel::NoSync { .. })
88    }
89
90    /// Get the default production-safe configuration
91    pub fn default_production() -> Self {
92        DurabilityLevel::GroupCommit {
93            max_batch: 128,
94            flush_interval: Duration::from_millis(10),
95        }
96    }
97
98    /// Get the safest configuration
99    pub fn safest() -> Self {
100        DurabilityLevel::Durable
101    }
102}
103
104impl Default for DurabilityLevel {
105    fn default() -> Self {
106        Self::default_production()
107    }
108}
109
110impl fmt::Display for DurabilityLevel {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        match self {
113            DurabilityLevel::Durable => write!(f, "Durable (fsync per commit)"),
114            DurabilityLevel::GroupCommit { max_batch, flush_interval } => {
115                write!(f, "GroupCommit (batch={}, interval={:?})", max_batch, flush_interval)
116            }
117            DurabilityLevel::Periodic { sync_interval, .. } => {
118                write!(f, "UNSAFE:Periodic (interval={:?})", sync_interval)
119            }
120            DurabilityLevel::NoSync { .. } => {
121                write!(f, "UNSAFE:NoSync (testing only)")
122            }
123        }
124    }
125}
126
127/// Durability contract validation error
128#[derive(Debug, Clone)]
129pub struct DurabilityContractError {
130    pub message: String,
131    pub contract_violated: &'static str,
132}
133
134impl fmt::Display for DurabilityContractError {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        write!(f, "Durability contract '{}' violated: {}", self.contract_violated, self.message)
137    }
138}
139
140impl std::error::Error for DurabilityContractError {}
141
142/// WAL archiving configuration for PITR
143#[derive(Debug, Clone)]
144pub struct WalArchiveConfig {
145    /// Enable WAL archiving
146    pub enabled: bool,
147    /// Archive destination path (local) or command
148    pub destination: WalArchiveDestination,
149    /// Archive trigger: segment full or time-based
150    pub trigger: WalArchiveTrigger,
151    /// Compress archived segments
152    pub compress: bool,
153    /// Verify archived segments (read back and check)
154    pub verify: bool,
155}
156
157impl Default for WalArchiveConfig {
158    fn default() -> Self {
159        Self {
160            enabled: false,
161            destination: WalArchiveDestination::LocalPath(PathBuf::from("/var/lib/sochdb/wal_archive")),
162            trigger: WalArchiveTrigger::SegmentFull,
163            compress: true,
164            verify: true,
165        }
166    }
167}
168
169/// WAL archive destination
170#[derive(Debug, Clone)]
171pub enum WalArchiveDestination {
172    /// Local filesystem path
173    LocalPath(PathBuf),
174    /// External command (receives segment path as argument)
175    ExternalCommand(String),
176    /// S3-compatible object storage (bucket URL)
177    ObjectStorage {
178        endpoint: String,
179        bucket: String,
180        prefix: String,
181    },
182}
183
184/// When to trigger WAL archiving
185#[derive(Debug, Clone, Copy)]
186pub enum WalArchiveTrigger {
187    /// Archive when segment is full (default)
188    SegmentFull,
189    /// Archive on timer (may archive partial segments)
190    TimeBased(Duration),
191    /// Archive on both conditions (whichever first)
192    Both {
193        max_interval: Duration,
194    },
195}
196
197/// Checkpoint configuration for avoiding long write stalls
198#[derive(Debug, Clone)]
199pub struct CheckpointConfig {
200    /// Checkpoint mode
201    pub mode: CheckpointMode,
202    /// Maximum time allowed for checkpoint
203    pub max_duration: Duration,
204    /// Trigger checkpoint at this WAL size
205    pub wal_size_trigger: u64,
206    /// Trigger checkpoint at this many transactions
207    pub txn_count_trigger: u64,
208}
209
210impl Default for CheckpointConfig {
211    fn default() -> Self {
212        Self {
213            mode: CheckpointMode::Incremental,
214            max_duration: Duration::from_secs(30),
215            wal_size_trigger: 128 * 1024 * 1024, // 128MB
216            txn_count_trigger: 100_000,
217        }
218    }
219}
220
221/// Checkpoint mode
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub enum CheckpointMode {
224    /// Full checkpoint (write all dirty pages) - can cause stalls
225    Full,
226    /// Incremental checkpoint (spread writes over time)
227    Incremental,
228    /// Copy-on-write snapshot (no stalls, more space)
229    CopyOnWrite,
230}
231
232/// Durability contract that enforces ARIES invariants
233pub struct DurabilityContract {
234    /// Configured durability level
235    pub level: DurabilityLevel,
236    /// WAL archive configuration
237    pub archive: WalArchiveConfig,
238    /// Checkpoint configuration
239    pub checkpoint: CheckpointConfig,
240    /// Require WAL before page flush (invariant)
241    wal_before_page: bool,
242    /// Force log on commit (invariant)
243    force_on_commit: bool,
244}
245
246impl DurabilityContract {
247    /// Create a new durability contract with safe defaults
248    pub fn new(level: DurabilityLevel) -> Result<Self, DurabilityContractError> {
249        // Validate unsafe levels have risk acceptance
250        match &level {
251            DurabilityLevel::Periodic { accept_data_loss_risk, .. } 
252            | DurabilityLevel::NoSync { accept_data_loss_risk, .. } => {
253                if !accept_data_loss_risk {
254                    return Err(DurabilityContractError {
255                        message: format!(
256                            "Unsafe durability level '{}' requires explicit accept_data_loss_risk=true",
257                            level
258                        ),
259                        contract_violated: "EXPLICIT_RISK_ACCEPTANCE",
260                    });
261                }
262            }
263            _ => {}
264        }
265
266        Ok(Self {
267            level,
268            archive: WalArchiveConfig::default(),
269            checkpoint: CheckpointConfig::default(),
270            wal_before_page: true,
271            force_on_commit: true,
272        })
273    }
274
275    /// Create with production defaults
276    pub fn production() -> Self {
277        Self {
278            level: DurabilityLevel::default_production(),
279            archive: WalArchiveConfig::default(),
280            checkpoint: CheckpointConfig::default(),
281            wal_before_page: true,
282            force_on_commit: true,
283        }
284    }
285
286    /// Enable WAL archiving
287    pub fn with_archive(mut self, config: WalArchiveConfig) -> Self {
288        self.archive = config;
289        self
290    }
291
292    /// Configure checkpointing
293    pub fn with_checkpoint(mut self, config: CheckpointConfig) -> Self {
294        self.checkpoint = config;
295        self
296    }
297
298    /// Validate that a page flush respects WAL protocol
299    /// 
300    /// ARIES invariant: Page cannot be flushed until its log record is durable
301    pub fn validate_page_flush(&self, page_lsn: u64, flushed_lsn: u64) -> Result<(), DurabilityContractError> {
302        if self.wal_before_page && page_lsn > flushed_lsn {
303            return Err(DurabilityContractError {
304                message: format!(
305                    "Page LSN {} exceeds flushed WAL LSN {} - would violate WAL protocol",
306                    page_lsn, flushed_lsn
307                ),
308                contract_violated: "WAL_BEFORE_PAGE",
309            });
310        }
311        Ok(())
312    }
313
314    /// Validate that a commit respects force-on-commit
315    ///
316    /// ARIES invariant: Commit record must be durable before returning
317    pub fn validate_commit(&self, commit_lsn: u64, flushed_lsn: u64) -> Result<(), DurabilityContractError> {
318        if self.force_on_commit && commit_lsn > flushed_lsn {
319            return Err(DurabilityContractError {
320                message: format!(
321                    "Commit LSN {} not yet flushed (flushed: {}) - would violate force-on-commit",
322                    commit_lsn, flushed_lsn
323                ),
324                contract_violated: "FORCE_ON_COMMIT",
325            });
326        }
327        Ok(())
328    }
329
330    /// Get maximum data loss window based on durability level
331    pub fn max_data_loss_window(&self) -> Option<Duration> {
332        match &self.level {
333            DurabilityLevel::Durable => None, // No data loss possible
334            DurabilityLevel::GroupCommit { flush_interval, .. } => Some(*flush_interval),
335            DurabilityLevel::Periodic { sync_interval, .. } => Some(*sync_interval),
336            DurabilityLevel::NoSync { .. } => None, // Unbounded - all data at risk
337        }
338    }
339
340    /// Check if this contract is suitable for production
341    pub fn is_production_ready(&self) -> bool {
342        self.level.is_production_safe() && self.wal_before_page && self.force_on_commit
343    }
344
345    /// Generate a human-readable description of guarantees
346    pub fn describe_guarantees(&self) -> String {
347        let mut desc = Vec::new();
348
349        desc.push(format!("Durability: {}", self.level));
350
351        if self.wal_before_page {
352            desc.push("WAL-before-page: ENFORCED".to_string());
353        } else {
354            desc.push("WAL-before-page: DISABLED (UNSAFE)".to_string());
355        }
356
357        if self.force_on_commit {
358            desc.push("Force-on-commit: ENFORCED".to_string());
359        } else {
360            desc.push("Force-on-commit: DISABLED (UNSAFE)".to_string());
361        }
362
363        if let Some(window) = self.max_data_loss_window() {
364            desc.push(format!("Max data loss window: {:?}", window));
365        } else if matches!(self.level, DurabilityLevel::NoSync { .. }) {
366            desc.push("Max data loss window: UNBOUNDED (all data at risk)".to_string());
367        } else {
368            desc.push("Max data loss window: None (fully durable)".to_string());
369        }
370
371        if self.archive.enabled {
372            desc.push("WAL archiving: ENABLED".to_string());
373        }
374
375        match self.checkpoint.mode {
376            CheckpointMode::Full => desc.push("Checkpoint: Full (may cause stalls)".to_string()),
377            CheckpointMode::Incremental => desc.push("Checkpoint: Incremental".to_string()),
378            CheckpointMode::CopyOnWrite => desc.push("Checkpoint: Copy-on-write".to_string()),
379        }
380
381        desc.join("\n")
382    }
383}
384
385impl Default for DurabilityContract {
386    fn default() -> Self {
387        Self::production()
388    }
389}
390
391/// Recovery point information for PITR
392#[derive(Debug, Clone)]
393pub struct RecoveryPoint {
394    /// LSN of this recovery point
395    pub lsn: u64,
396    /// Timestamp of this recovery point
397    pub timestamp: u64,
398    /// Checkpoint ID if this is a checkpoint
399    pub checkpoint_id: Option<u64>,
400    /// Description
401    pub description: String,
402}
403
404/// WAL segment metadata for archiving
405#[derive(Debug, Clone)]
406pub struct WalSegmentMetadata {
407    /// Segment number
408    pub segment_number: u64,
409    /// Start LSN
410    pub start_lsn: u64,
411    /// End LSN
412    pub end_lsn: u64,
413    /// Size in bytes
414    pub size_bytes: u64,
415    /// CRC32 checksum
416    pub checksum: u32,
417    /// Is segment complete (closed)
418    pub is_complete: bool,
419    /// Archive status
420    pub archive_status: ArchiveStatus,
421}
422
423/// Archive status for a WAL segment
424#[derive(Debug, Clone, Copy, PartialEq, Eq)]
425pub enum ArchiveStatus {
426    /// Not yet archived
427    Pending,
428    /// Archive in progress
429    InProgress,
430    /// Successfully archived
431    Archived,
432    /// Archive failed
433    Failed,
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    #[test]
441    fn test_durability_level_defaults() {
442        let level = DurabilityLevel::default();
443        assert!(level.is_production_safe());
444        assert!(!level.requires_risk_acceptance());
445    }
446
447    #[test]
448    fn test_unsafe_requires_acknowledgement() {
449        let result = DurabilityContract::new(DurabilityLevel::NoSync {
450            accept_data_loss_risk: false,
451        });
452        assert!(result.is_err());
453
454        let result = DurabilityContract::new(DurabilityLevel::NoSync {
455            accept_data_loss_risk: true,
456        });
457        assert!(result.is_ok());
458    }
459
460    #[test]
461    fn test_wal_before_page_validation() {
462        let contract = DurabilityContract::production();
463
464        // Valid: page LSN <= flushed LSN
465        assert!(contract.validate_page_flush(100, 100).is_ok());
466        assert!(contract.validate_page_flush(50, 100).is_ok());
467
468        // Invalid: page LSN > flushed LSN
469        assert!(contract.validate_page_flush(150, 100).is_err());
470    }
471
472    #[test]
473    fn test_force_on_commit_validation() {
474        let contract = DurabilityContract::production();
475
476        // Valid: commit LSN <= flushed LSN
477        assert!(contract.validate_commit(100, 100).is_ok());
478        assert!(contract.validate_commit(50, 100).is_ok());
479
480        // Invalid: commit LSN > flushed LSN
481        assert!(contract.validate_commit(150, 100).is_err());
482    }
483
484    #[test]
485    fn test_data_loss_window() {
486        let durable = DurabilityContract::new(DurabilityLevel::Durable).unwrap();
487        assert!(durable.max_data_loss_window().is_none());
488
489        let group = DurabilityContract::new(DurabilityLevel::GroupCommit {
490            max_batch: 100,
491            flush_interval: Duration::from_millis(10),
492        }).unwrap();
493        assert_eq!(group.max_data_loss_window(), Some(Duration::from_millis(10)));
494    }
495
496    #[test]
497    fn test_production_ready_check() {
498        let contract = DurabilityContract::production();
499        assert!(contract.is_production_ready());
500
501        let unsafe_contract = DurabilityContract::new(DurabilityLevel::NoSync {
502            accept_data_loss_risk: true,
503        }).unwrap();
504        assert!(!unsafe_contract.is_production_ready());
505    }
506}