Skip to main content

sochdb_storage/
pitr.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Point-in-Time Recovery (PITR)
19//!
20//! Enables recovery to any point in time using:
21//! - Continuous WAL archiving
22//! - Periodic base snapshots
23//! - Log shipping to object storage
24//!
25//! ## Architecture
26//!
27//! ```text
28//! ┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
29//! │   Live WAL      │────▶│  WAL Archiver    │────▶│  Object Storage │
30//! └─────────────────┘     └──────────────────┘     │  (S3/GCS/Azure) │
31//!                                                   └─────────────────┘
32//!                                                           │
33//! ┌─────────────────┐     ┌──────────────────┐              ▼
34//! │  Base Snapshot  │────▶│  Snapshot Store  │────▶┌─────────────────┐
35//! └─────────────────┘     └──────────────────┘     │    Recovery     │
36//!                                                   │    Target       │
37//!                                                   └─────────────────┘
38//! ```
39//!
40//! ## Recovery Point Objective (RPO)
41//!
42//! - With synchronous archiving: RPO = 0 (no data loss)
43//! - With async archiving (default): RPO = archive interval (typically 1 minute)
44
45use std::collections::BTreeMap;
46use std::path::{Path, PathBuf};
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::sync::Arc;
49use std::time::{Duration, SystemTime, UNIX_EPOCH};
50
51use parking_lot::RwLock;
52
53/// Log Sequence Number for PITR
54#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
55pub struct Lsn(pub u64);
56
57impl Lsn {
58    /// Invalid/zero LSN
59    pub const INVALID: Lsn = Lsn(0);
60
61    /// Create from raw value
62    pub fn new(value: u64) -> Self {
63        Lsn(value)
64    }
65
66    /// Get raw value
67    pub fn value(&self) -> u64 {
68        self.0
69    }
70
71    /// Increment by offset
72    pub fn advance(&self, offset: u64) -> Self {
73        Lsn(self.0 + offset)
74    }
75}
76
77/// Recovery target specification
78#[derive(Debug, Clone)]
79pub enum RecoveryTarget {
80    /// Recover to latest available state
81    Latest,
82    /// Recover to specific LSN
83    Lsn(Lsn),
84    /// Recover to specific timestamp
85    Timestamp(u64),
86    /// Recover to named restore point
87    RestorePoint(String),
88    /// Recover to specific transaction ID
89    TransactionId(u64),
90}
91
92/// WAL segment for archival
93#[derive(Debug, Clone)]
94pub struct WalSegment {
95    /// Segment number
96    pub segment_id: u64,
97    /// Starting LSN
98    pub start_lsn: Lsn,
99    /// Ending LSN
100    pub end_lsn: Lsn,
101    /// File path
102    pub path: PathBuf,
103    /// Segment size in bytes
104    pub size_bytes: u64,
105    /// CRC32 checksum
106    pub checksum: u32,
107    /// Creation timestamp
108    pub created_at: u64,
109    /// Is segment archived?
110    pub archived: bool,
111}
112
113/// Archive destination configuration
114#[derive(Debug, Clone)]
115pub enum ArchiveDestination {
116    /// Local filesystem
117    LocalPath(PathBuf),
118    /// Amazon S3
119    S3 {
120        bucket: String,
121        prefix: String,
122        region: String,
123    },
124    /// Google Cloud Storage
125    Gcs {
126        bucket: String,
127        prefix: String,
128    },
129    /// Azure Blob Storage
130    Azure {
131        container: String,
132        prefix: String,
133    },
134}
135
136/// WAL archiver configuration
137#[derive(Debug, Clone)]
138pub struct WalArchiverConfig {
139    /// Archive destination
140    pub destination: ArchiveDestination,
141    /// Archive interval (for async mode)
142    pub archive_interval: Duration,
143    /// Whether to use synchronous archiving (impacts write latency)
144    pub synchronous: bool,
145    /// Compression algorithm
146    pub compression: CompressionAlgorithm,
147    /// Encryption key (if enabled)
148    pub encryption_key: Option<Vec<u8>>,
149    /// Retention period for archived WAL
150    pub retention: Duration,
151    /// Maximum concurrent uploads
152    pub max_concurrent_uploads: usize,
153}
154
155impl Default for WalArchiverConfig {
156    fn default() -> Self {
157        Self {
158            destination: ArchiveDestination::LocalPath(PathBuf::from("/var/sochdb/wal_archive")),
159            archive_interval: Duration::from_secs(60),
160            synchronous: false,
161            compression: CompressionAlgorithm::Zstd,
162            encryption_key: None,
163            retention: Duration::from_secs(7 * 24 * 3600), // 7 days
164            max_concurrent_uploads: 4,
165        }
166    }
167}
168
169/// Compression algorithm for archived WAL
170#[derive(Debug, Clone, Copy)]
171pub enum CompressionAlgorithm {
172    None,
173    Lz4,
174    Zstd,
175    Snappy,
176}
177
178/// WAL archiver state
179pub struct WalArchiver {
180    config: WalArchiverConfig,
181    /// Current archive LSN (all WAL up to this LSN is archived)
182    archive_lsn: AtomicU64,
183    /// Segments pending archive
184    pending_segments: RwLock<Vec<WalSegment>>,
185    /// Archive history
186    archive_history: RwLock<BTreeMap<u64, WalSegment>>,
187}
188
189impl WalArchiver {
190    /// Create a new WAL archiver
191    pub fn new(config: WalArchiverConfig) -> Self {
192        Self {
193            config,
194            archive_lsn: AtomicU64::new(0),
195            pending_segments: RwLock::new(Vec::new()),
196            archive_history: RwLock::new(BTreeMap::new()),
197        }
198    }
199
200    /// Queue a segment for archiving
201    pub fn queue_segment(&self, segment: WalSegment) {
202        self.pending_segments.write().push(segment);
203    }
204
205    /// Get current archive LSN
206    pub fn archive_lsn(&self) -> Lsn {
207        Lsn(self.archive_lsn.load(Ordering::Acquire))
208    }
209
210    /// Archive pending segments (called periodically or on demand)
211    pub fn archive_pending(&self) -> Result<usize, PitrError> {
212        let segments: Vec<_> = {
213            let mut pending = self.pending_segments.write();
214            std::mem::take(&mut *pending)
215        };
216
217        if segments.is_empty() {
218            return Ok(0);
219        }
220
221        let count = segments.len();
222
223        for segment in segments {
224            self.archive_segment(&segment)?;
225
226            // Update archive LSN
227            let new_lsn = segment.end_lsn.value();
228            self.archive_lsn.fetch_max(new_lsn, Ordering::Release);
229
230            // Add to history
231            self.archive_history.write().insert(segment.segment_id, segment);
232        }
233
234        Ok(count)
235    }
236
237    /// Archive a single segment
238    fn archive_segment(&self, segment: &WalSegment) -> Result<(), PitrError> {
239        match &self.config.destination {
240            ArchiveDestination::LocalPath(path) => {
241                let dest = path.join(format!("wal_{:016x}", segment.segment_id));
242                // In production, this would copy with compression
243                std::fs::copy(&segment.path, &dest)
244                    .map_err(|e| PitrError::ArchiveFailed(e.to_string()))?;
245            }
246            ArchiveDestination::S3 { .. } |
247            ArchiveDestination::Gcs { .. } |
248            ArchiveDestination::Azure { .. } => {
249                // Would use appropriate SDK
250                return Err(PitrError::NotImplemented("Cloud storage archiving".into()));
251            }
252        }
253
254        Ok(())
255    }
256
257    /// Get segments needed for recovery to target LSN
258    pub fn segments_for_recovery(&self, target: Lsn) -> Vec<WalSegment> {
259        self.archive_history
260            .read()
261            .values()
262            .filter(|s| s.end_lsn >= target)
263            .cloned()
264            .collect()
265    }
266}
267
268/// Base snapshot for PITR
269#[derive(Debug, Clone)]
270pub struct BaseSnapshot {
271    /// Snapshot ID
272    pub id: String,
273    /// Creation timestamp
274    pub created_at: u64,
275    /// LSN at snapshot time
276    pub lsn: Lsn,
277    /// Snapshot size in bytes
278    pub size_bytes: u64,
279    /// Storage location
280    pub location: PathBuf,
281    /// Checksum
282    pub checksum: String,
283}
284
285/// PITR recovery orchestrator
286pub struct PitrRecovery {
287    /// Base snapshot to start from
288    base_snapshot: Option<BaseSnapshot>,
289    /// WAL segments to replay
290    wal_segments: Vec<WalSegment>,
291    /// Recovery target
292    target: RecoveryTarget,
293    /// Current recovery LSN
294    current_lsn: Lsn,
295    /// Recovery statistics
296    stats: RecoveryStats,
297}
298
299/// Recovery statistics
300#[derive(Debug, Default)]
301pub struct RecoveryStats {
302    /// Bytes restored from base snapshot
303    pub base_bytes_restored: u64,
304    /// WAL bytes replayed
305    pub wal_bytes_replayed: u64,
306    /// WAL records replayed
307    pub wal_records_replayed: u64,
308    /// Time spent on base restore
309    pub base_restore_time: Duration,
310    /// Time spent on WAL replay
311    pub wal_replay_time: Duration,
312    /// Target LSN reached
313    pub target_lsn_reached: bool,
314}
315
316impl PitrRecovery {
317    /// Create a new PITR recovery
318    pub fn new(target: RecoveryTarget) -> Self {
319        Self {
320            base_snapshot: None,
321            wal_segments: Vec::new(),
322            target,
323            current_lsn: Lsn::INVALID,
324            stats: RecoveryStats::default(),
325        }
326    }
327
328    /// Set base snapshot
329    pub fn with_base_snapshot(mut self, snapshot: BaseSnapshot) -> Self {
330        self.current_lsn = snapshot.lsn;
331        self.base_snapshot = Some(snapshot);
332        self
333    }
334
335    /// Add WAL segments for replay
336    pub fn with_wal_segments(mut self, segments: Vec<WalSegment>) -> Self {
337        self.wal_segments = segments;
338        self
339    }
340
341    /// Calculate if recovery is feasible
342    pub fn validate(&self) -> Result<(), PitrError> {
343        // Check if we have a base snapshot
344        if self.base_snapshot.is_none() {
345            return Err(PitrError::NoBaseSnapshot);
346        }
347
348        // Check if WAL chain is complete
349        let base = self.base_snapshot.as_ref().unwrap();
350        let mut expected_lsn = base.lsn;
351
352        for segment in &self.wal_segments {
353            if segment.start_lsn > expected_lsn {
354                return Err(PitrError::WalGap {
355                    expected: expected_lsn,
356                    found: segment.start_lsn,
357                });
358            }
359            expected_lsn = segment.end_lsn;
360        }
361
362        // Check if we can reach target
363        match &self.target {
364            RecoveryTarget::Lsn(target_lsn) => {
365                if *target_lsn > expected_lsn {
366                    return Err(PitrError::TargetUnreachable {
367                        target: *target_lsn,
368                        available: expected_lsn,
369                    });
370                }
371            }
372            _ => {}
373        }
374
375        Ok(())
376    }
377
378    /// Get recovery statistics
379    pub fn stats(&self) -> &RecoveryStats {
380        &self.stats
381    }
382
383    /// Get current recovery LSN
384    pub fn current_lsn(&self) -> Lsn {
385        self.current_lsn
386    }
387}
388
389/// Restore point (named bookmark for PITR)
390#[derive(Debug, Clone)]
391pub struct RestorePoint {
392    /// Restore point name
393    pub name: String,
394    /// LSN at creation
395    pub lsn: Lsn,
396    /// Creation timestamp
397    pub created_at: u64,
398    /// Optional description
399    pub description: Option<String>,
400}
401
402/// PITR error types
403#[derive(Debug)]
404pub enum PitrError {
405    /// Archive operation failed
406    ArchiveFailed(String),
407    /// No base snapshot available
408    NoBaseSnapshot,
409    /// Gap in WAL chain
410    WalGap { expected: Lsn, found: Lsn },
411    /// Target cannot be reached
412    TargetUnreachable { target: Lsn, available: Lsn },
413    /// Feature not implemented
414    NotImplemented(String),
415    /// I/O error
416    Io(std::io::Error),
417}
418
419impl std::fmt::Display for PitrError {
420    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421        match self {
422            PitrError::ArchiveFailed(msg) => write!(f, "Archive failed: {}", msg),
423            PitrError::NoBaseSnapshot => write!(f, "No base snapshot available for recovery"),
424            PitrError::WalGap { expected, found } => {
425                write!(f, "WAL gap: expected LSN {:?}, found {:?}", expected, found)
426            }
427            PitrError::TargetUnreachable { target, available } => {
428                write!(
429                    f,
430                    "Recovery target {:?} unreachable, only have WAL up to {:?}",
431                    target, available
432                )
433            }
434            PitrError::NotImplemented(feature) => write!(f, "Not implemented: {}", feature),
435            PitrError::Io(e) => write!(f, "I/O error: {}", e),
436        }
437    }
438}
439
440impl std::error::Error for PitrError {}
441
442impl From<std::io::Error> for PitrError {
443    fn from(e: std::io::Error) -> Self {
444        PitrError::Io(e)
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    #[test]
453    fn test_lsn_ordering() {
454        let lsn1 = Lsn::new(100);
455        let lsn2 = Lsn::new(200);
456
457        assert!(lsn1 < lsn2);
458        assert_eq!(lsn1.advance(100), lsn2);
459    }
460
461    #[test]
462    fn test_wal_archiver_queue() {
463        let config = WalArchiverConfig::default();
464        let archiver = WalArchiver::new(config);
465
466        let segment = WalSegment {
467            segment_id: 1,
468            start_lsn: Lsn::new(0),
469            end_lsn: Lsn::new(1000),
470            path: PathBuf::from("/tmp/wal_1"),
471            size_bytes: 1024,
472            checksum: 0x12345678,
473            created_at: 0,
474            archived: false,
475        };
476
477        archiver.queue_segment(segment);
478        assert_eq!(archiver.pending_segments.read().len(), 1);
479    }
480
481    #[test]
482    fn test_pitr_recovery_validation_no_snapshot() {
483        let recovery = PitrRecovery::new(RecoveryTarget::Latest);
484        assert!(matches!(recovery.validate(), Err(PitrError::NoBaseSnapshot)));
485    }
486
487    #[test]
488    fn test_pitr_recovery_validation_wal_gap() {
489        let snapshot = BaseSnapshot {
490            id: "snap1".to_string(),
491            created_at: 0,
492            lsn: Lsn::new(100),
493            size_bytes: 1024,
494            location: PathBuf::from("/tmp/snap1"),
495            checksum: "abc".to_string(),
496        };
497
498        let segments = vec![
499            WalSegment {
500                segment_id: 1,
501                start_lsn: Lsn::new(200), // Gap! Expected 100
502                end_lsn: Lsn::new(300),
503                path: PathBuf::from("/tmp/wal_1"),
504                size_bytes: 1024,
505                checksum: 0,
506                created_at: 0,
507                archived: true,
508            },
509        ];
510
511        let recovery = PitrRecovery::new(RecoveryTarget::Latest)
512            .with_base_snapshot(snapshot)
513            .with_wal_segments(segments);
514
515        assert!(matches!(recovery.validate(), Err(PitrError::WalGap { .. })));
516    }
517
518    #[test]
519    fn test_pitr_recovery_validation_success() {
520        let snapshot = BaseSnapshot {
521            id: "snap1".to_string(),
522            created_at: 0,
523            lsn: Lsn::new(100),
524            size_bytes: 1024,
525            location: PathBuf::from("/tmp/snap1"),
526            checksum: "abc".to_string(),
527        };
528
529        let segments = vec![
530            WalSegment {
531                segment_id: 1,
532                start_lsn: Lsn::new(100),
533                end_lsn: Lsn::new(200),
534                path: PathBuf::from("/tmp/wal_1"),
535                size_bytes: 1024,
536                checksum: 0,
537                created_at: 0,
538                archived: true,
539            },
540        ];
541
542        let recovery = PitrRecovery::new(RecoveryTarget::Lsn(Lsn::new(150)))
543            .with_base_snapshot(snapshot)
544            .with_wal_segments(segments);
545
546        assert!(recovery.validate().is_ok());
547    }
548}