1use std::collections::BTreeMap;
46use std::path::{Path, PathBuf};
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::sync::Arc;
49use std::time::{Duration, SystemTime, UNIX_EPOCH};
50
51use parking_lot::RwLock;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
55pub struct Lsn(pub u64);
56
57impl Lsn {
58 pub const INVALID: Lsn = Lsn(0);
60
61 pub fn new(value: u64) -> Self {
63 Lsn(value)
64 }
65
66 pub fn value(&self) -> u64 {
68 self.0
69 }
70
71 pub fn advance(&self, offset: u64) -> Self {
73 Lsn(self.0 + offset)
74 }
75}
76
77#[derive(Debug, Clone)]
79pub enum RecoveryTarget {
80 Latest,
82 Lsn(Lsn),
84 Timestamp(u64),
86 RestorePoint(String),
88 TransactionId(u64),
90}
91
92#[derive(Debug, Clone)]
94pub struct WalSegment {
95 pub segment_id: u64,
97 pub start_lsn: Lsn,
99 pub end_lsn: Lsn,
101 pub path: PathBuf,
103 pub size_bytes: u64,
105 pub checksum: u32,
107 pub created_at: u64,
109 pub archived: bool,
111}
112
113#[derive(Debug, Clone)]
115pub enum ArchiveDestination {
116 LocalPath(PathBuf),
118 S3 {
120 bucket: String,
121 prefix: String,
122 region: String,
123 },
124 Gcs {
126 bucket: String,
127 prefix: String,
128 },
129 Azure {
131 container: String,
132 prefix: String,
133 },
134}
135
136#[derive(Debug, Clone)]
138pub struct WalArchiverConfig {
139 pub destination: ArchiveDestination,
141 pub archive_interval: Duration,
143 pub synchronous: bool,
145 pub compression: CompressionAlgorithm,
147 pub encryption_key: Option<Vec<u8>>,
149 pub retention: Duration,
151 pub max_concurrent_uploads: usize,
153}
154
155impl Default for WalArchiverConfig {
156 fn default() -> Self {
157 Self {
158 destination: ArchiveDestination::LocalPath(PathBuf::from("/var/sochdb/wal_archive")),
159 archive_interval: Duration::from_secs(60),
160 synchronous: false,
161 compression: CompressionAlgorithm::Zstd,
162 encryption_key: None,
163 retention: Duration::from_secs(7 * 24 * 3600), max_concurrent_uploads: 4,
165 }
166 }
167}
168
169#[derive(Debug, Clone, Copy)]
171pub enum CompressionAlgorithm {
172 None,
173 Lz4,
174 Zstd,
175 Snappy,
176}
177
178pub struct WalArchiver {
180 config: WalArchiverConfig,
181 archive_lsn: AtomicU64,
183 pending_segments: RwLock<Vec<WalSegment>>,
185 archive_history: RwLock<BTreeMap<u64, WalSegment>>,
187}
188
189impl WalArchiver {
190 pub fn new(config: WalArchiverConfig) -> Self {
192 Self {
193 config,
194 archive_lsn: AtomicU64::new(0),
195 pending_segments: RwLock::new(Vec::new()),
196 archive_history: RwLock::new(BTreeMap::new()),
197 }
198 }
199
200 pub fn queue_segment(&self, segment: WalSegment) {
202 self.pending_segments.write().push(segment);
203 }
204
205 pub fn archive_lsn(&self) -> Lsn {
207 Lsn(self.archive_lsn.load(Ordering::Acquire))
208 }
209
210 pub fn archive_pending(&self) -> Result<usize, PitrError> {
212 let segments: Vec<_> = {
213 let mut pending = self.pending_segments.write();
214 std::mem::take(&mut *pending)
215 };
216
217 if segments.is_empty() {
218 return Ok(0);
219 }
220
221 let count = segments.len();
222
223 for segment in segments {
224 self.archive_segment(&segment)?;
225
226 let new_lsn = segment.end_lsn.value();
228 self.archive_lsn.fetch_max(new_lsn, Ordering::Release);
229
230 self.archive_history.write().insert(segment.segment_id, segment);
232 }
233
234 Ok(count)
235 }
236
237 fn archive_segment(&self, segment: &WalSegment) -> Result<(), PitrError> {
239 match &self.config.destination {
240 ArchiveDestination::LocalPath(path) => {
241 let dest = path.join(format!("wal_{:016x}", segment.segment_id));
242 std::fs::copy(&segment.path, &dest)
244 .map_err(|e| PitrError::ArchiveFailed(e.to_string()))?;
245 }
246 ArchiveDestination::S3 { .. } |
247 ArchiveDestination::Gcs { .. } |
248 ArchiveDestination::Azure { .. } => {
249 return Err(PitrError::NotImplemented("Cloud storage archiving".into()));
251 }
252 }
253
254 Ok(())
255 }
256
257 pub fn segments_for_recovery(&self, target: Lsn) -> Vec<WalSegment> {
259 self.archive_history
260 .read()
261 .values()
262 .filter(|s| s.end_lsn >= target)
263 .cloned()
264 .collect()
265 }
266}
267
268#[derive(Debug, Clone)]
270pub struct BaseSnapshot {
271 pub id: String,
273 pub created_at: u64,
275 pub lsn: Lsn,
277 pub size_bytes: u64,
279 pub location: PathBuf,
281 pub checksum: String,
283}
284
285pub struct PitrRecovery {
287 base_snapshot: Option<BaseSnapshot>,
289 wal_segments: Vec<WalSegment>,
291 target: RecoveryTarget,
293 current_lsn: Lsn,
295 stats: RecoveryStats,
297}
298
299#[derive(Debug, Default)]
301pub struct RecoveryStats {
302 pub base_bytes_restored: u64,
304 pub wal_bytes_replayed: u64,
306 pub wal_records_replayed: u64,
308 pub base_restore_time: Duration,
310 pub wal_replay_time: Duration,
312 pub target_lsn_reached: bool,
314}
315
316impl PitrRecovery {
317 pub fn new(target: RecoveryTarget) -> Self {
319 Self {
320 base_snapshot: None,
321 wal_segments: Vec::new(),
322 target,
323 current_lsn: Lsn::INVALID,
324 stats: RecoveryStats::default(),
325 }
326 }
327
328 pub fn with_base_snapshot(mut self, snapshot: BaseSnapshot) -> Self {
330 self.current_lsn = snapshot.lsn;
331 self.base_snapshot = Some(snapshot);
332 self
333 }
334
335 pub fn with_wal_segments(mut self, segments: Vec<WalSegment>) -> Self {
337 self.wal_segments = segments;
338 self
339 }
340
341 pub fn validate(&self) -> Result<(), PitrError> {
343 if self.base_snapshot.is_none() {
345 return Err(PitrError::NoBaseSnapshot);
346 }
347
348 let base = self.base_snapshot.as_ref().unwrap();
350 let mut expected_lsn = base.lsn;
351
352 for segment in &self.wal_segments {
353 if segment.start_lsn > expected_lsn {
354 return Err(PitrError::WalGap {
355 expected: expected_lsn,
356 found: segment.start_lsn,
357 });
358 }
359 expected_lsn = segment.end_lsn;
360 }
361
362 match &self.target {
364 RecoveryTarget::Lsn(target_lsn) => {
365 if *target_lsn > expected_lsn {
366 return Err(PitrError::TargetUnreachable {
367 target: *target_lsn,
368 available: expected_lsn,
369 });
370 }
371 }
372 _ => {}
373 }
374
375 Ok(())
376 }
377
378 pub fn stats(&self) -> &RecoveryStats {
380 &self.stats
381 }
382
383 pub fn current_lsn(&self) -> Lsn {
385 self.current_lsn
386 }
387}
388
389#[derive(Debug, Clone)]
391pub struct RestorePoint {
392 pub name: String,
394 pub lsn: Lsn,
396 pub created_at: u64,
398 pub description: Option<String>,
400}
401
402#[derive(Debug)]
404pub enum PitrError {
405 ArchiveFailed(String),
407 NoBaseSnapshot,
409 WalGap { expected: Lsn, found: Lsn },
411 TargetUnreachable { target: Lsn, available: Lsn },
413 NotImplemented(String),
415 Io(std::io::Error),
417}
418
419impl std::fmt::Display for PitrError {
420 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421 match self {
422 PitrError::ArchiveFailed(msg) => write!(f, "Archive failed: {}", msg),
423 PitrError::NoBaseSnapshot => write!(f, "No base snapshot available for recovery"),
424 PitrError::WalGap { expected, found } => {
425 write!(f, "WAL gap: expected LSN {:?}, found {:?}", expected, found)
426 }
427 PitrError::TargetUnreachable { target, available } => {
428 write!(
429 f,
430 "Recovery target {:?} unreachable, only have WAL up to {:?}",
431 target, available
432 )
433 }
434 PitrError::NotImplemented(feature) => write!(f, "Not implemented: {}", feature),
435 PitrError::Io(e) => write!(f, "I/O error: {}", e),
436 }
437 }
438}
439
440impl std::error::Error for PitrError {}
441
442impl From<std::io::Error> for PitrError {
443 fn from(e: std::io::Error) -> Self {
444 PitrError::Io(e)
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 #[test]
453 fn test_lsn_ordering() {
454 let lsn1 = Lsn::new(100);
455 let lsn2 = Lsn::new(200);
456
457 assert!(lsn1 < lsn2);
458 assert_eq!(lsn1.advance(100), lsn2);
459 }
460
461 #[test]
462 fn test_wal_archiver_queue() {
463 let config = WalArchiverConfig::default();
464 let archiver = WalArchiver::new(config);
465
466 let segment = WalSegment {
467 segment_id: 1,
468 start_lsn: Lsn::new(0),
469 end_lsn: Lsn::new(1000),
470 path: PathBuf::from("/tmp/wal_1"),
471 size_bytes: 1024,
472 checksum: 0x12345678,
473 created_at: 0,
474 archived: false,
475 };
476
477 archiver.queue_segment(segment);
478 assert_eq!(archiver.pending_segments.read().len(), 1);
479 }
480
481 #[test]
482 fn test_pitr_recovery_validation_no_snapshot() {
483 let recovery = PitrRecovery::new(RecoveryTarget::Latest);
484 assert!(matches!(recovery.validate(), Err(PitrError::NoBaseSnapshot)));
485 }
486
487 #[test]
488 fn test_pitr_recovery_validation_wal_gap() {
489 let snapshot = BaseSnapshot {
490 id: "snap1".to_string(),
491 created_at: 0,
492 lsn: Lsn::new(100),
493 size_bytes: 1024,
494 location: PathBuf::from("/tmp/snap1"),
495 checksum: "abc".to_string(),
496 };
497
498 let segments = vec![
499 WalSegment {
500 segment_id: 1,
501 start_lsn: Lsn::new(200), end_lsn: Lsn::new(300),
503 path: PathBuf::from("/tmp/wal_1"),
504 size_bytes: 1024,
505 checksum: 0,
506 created_at: 0,
507 archived: true,
508 },
509 ];
510
511 let recovery = PitrRecovery::new(RecoveryTarget::Latest)
512 .with_base_snapshot(snapshot)
513 .with_wal_segments(segments);
514
515 assert!(matches!(recovery.validate(), Err(PitrError::WalGap { .. })));
516 }
517
518 #[test]
519 fn test_pitr_recovery_validation_success() {
520 let snapshot = BaseSnapshot {
521 id: "snap1".to_string(),
522 created_at: 0,
523 lsn: Lsn::new(100),
524 size_bytes: 1024,
525 location: PathBuf::from("/tmp/snap1"),
526 checksum: "abc".to_string(),
527 };
528
529 let segments = vec![
530 WalSegment {
531 segment_id: 1,
532 start_lsn: Lsn::new(100),
533 end_lsn: Lsn::new(200),
534 path: PathBuf::from("/tmp/wal_1"),
535 size_bytes: 1024,
536 checksum: 0,
537 created_at: 0,
538 archived: true,
539 },
540 ];
541
542 let recovery = PitrRecovery::new(RecoveryTarget::Lsn(Lsn::new(150)))
543 .with_base_snapshot(snapshot)
544 .with_wal_segments(segments);
545
546 assert!(recovery.validate().is_ok());
547 }
548}