1use std::fmt;
42use std::path::{Path, PathBuf};
43use std::time::Duration;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum DurabilityLevel {
48 Durable,
51
52 GroupCommit {
56 max_batch: usize,
58 flush_interval: Duration,
60 },
61
62 Periodic {
65 sync_interval: Duration,
67 accept_data_loss_risk: bool,
69 },
70
71 NoSync {
74 accept_data_loss_risk: bool,
76 },
77}
78
79impl DurabilityLevel {
80 pub fn is_production_safe(&self) -> bool {
82 matches!(self, DurabilityLevel::Durable | DurabilityLevel::GroupCommit { .. })
83 }
84
85 pub fn requires_risk_acceptance(&self) -> bool {
87 matches!(self, DurabilityLevel::Periodic { .. } | DurabilityLevel::NoSync { .. })
88 }
89
90 pub fn default_production() -> Self {
92 DurabilityLevel::GroupCommit {
93 max_batch: 128,
94 flush_interval: Duration::from_millis(10),
95 }
96 }
97
98 pub fn safest() -> Self {
100 DurabilityLevel::Durable
101 }
102}
103
104impl Default for DurabilityLevel {
105 fn default() -> Self {
106 Self::default_production()
107 }
108}
109
110impl fmt::Display for DurabilityLevel {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 match self {
113 DurabilityLevel::Durable => write!(f, "Durable (fsync per commit)"),
114 DurabilityLevel::GroupCommit { max_batch, flush_interval } => {
115 write!(f, "GroupCommit (batch={}, interval={:?})", max_batch, flush_interval)
116 }
117 DurabilityLevel::Periodic { sync_interval, .. } => {
118 write!(f, "UNSAFE:Periodic (interval={:?})", sync_interval)
119 }
120 DurabilityLevel::NoSync { .. } => {
121 write!(f, "UNSAFE:NoSync (testing only)")
122 }
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
129pub struct DurabilityContractError {
130 pub message: String,
131 pub contract_violated: &'static str,
132}
133
134impl fmt::Display for DurabilityContractError {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 write!(f, "Durability contract '{}' violated: {}", self.contract_violated, self.message)
137 }
138}
139
140impl std::error::Error for DurabilityContractError {}
141
142#[derive(Debug, Clone)]
144pub struct WalArchiveConfig {
145 pub enabled: bool,
147 pub destination: WalArchiveDestination,
149 pub trigger: WalArchiveTrigger,
151 pub compress: bool,
153 pub verify: bool,
155}
156
157impl Default for WalArchiveConfig {
158 fn default() -> Self {
159 Self {
160 enabled: false,
161 destination: WalArchiveDestination::LocalPath(PathBuf::from("/var/lib/sochdb/wal_archive")),
162 trigger: WalArchiveTrigger::SegmentFull,
163 compress: true,
164 verify: true,
165 }
166 }
167}
168
169#[derive(Debug, Clone)]
171pub enum WalArchiveDestination {
172 LocalPath(PathBuf),
174 ExternalCommand(String),
176 ObjectStorage {
178 endpoint: String,
179 bucket: String,
180 prefix: String,
181 },
182}
183
184#[derive(Debug, Clone, Copy)]
186pub enum WalArchiveTrigger {
187 SegmentFull,
189 TimeBased(Duration),
191 Both {
193 max_interval: Duration,
194 },
195}
196
197#[derive(Debug, Clone)]
199pub struct CheckpointConfig {
200 pub mode: CheckpointMode,
202 pub max_duration: Duration,
204 pub wal_size_trigger: u64,
206 pub txn_count_trigger: u64,
208}
209
210impl Default for CheckpointConfig {
211 fn default() -> Self {
212 Self {
213 mode: CheckpointMode::Incremental,
214 max_duration: Duration::from_secs(30),
215 wal_size_trigger: 128 * 1024 * 1024, txn_count_trigger: 100_000,
217 }
218 }
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub enum CheckpointMode {
224 Full,
226 Incremental,
228 CopyOnWrite,
230}
231
232pub struct DurabilityContract {
234 pub level: DurabilityLevel,
236 pub archive: WalArchiveConfig,
238 pub checkpoint: CheckpointConfig,
240 wal_before_page: bool,
242 force_on_commit: bool,
244}
245
246impl DurabilityContract {
247 pub fn new(level: DurabilityLevel) -> Result<Self, DurabilityContractError> {
249 match &level {
251 DurabilityLevel::Periodic { accept_data_loss_risk, .. }
252 | DurabilityLevel::NoSync { accept_data_loss_risk, .. } => {
253 if !accept_data_loss_risk {
254 return Err(DurabilityContractError {
255 message: format!(
256 "Unsafe durability level '{}' requires explicit accept_data_loss_risk=true",
257 level
258 ),
259 contract_violated: "EXPLICIT_RISK_ACCEPTANCE",
260 });
261 }
262 }
263 _ => {}
264 }
265
266 Ok(Self {
267 level,
268 archive: WalArchiveConfig::default(),
269 checkpoint: CheckpointConfig::default(),
270 wal_before_page: true,
271 force_on_commit: true,
272 })
273 }
274
275 pub fn production() -> Self {
277 Self {
278 level: DurabilityLevel::default_production(),
279 archive: WalArchiveConfig::default(),
280 checkpoint: CheckpointConfig::default(),
281 wal_before_page: true,
282 force_on_commit: true,
283 }
284 }
285
286 pub fn with_archive(mut self, config: WalArchiveConfig) -> Self {
288 self.archive = config;
289 self
290 }
291
292 pub fn with_checkpoint(mut self, config: CheckpointConfig) -> Self {
294 self.checkpoint = config;
295 self
296 }
297
298 pub fn validate_page_flush(&self, page_lsn: u64, flushed_lsn: u64) -> Result<(), DurabilityContractError> {
302 if self.wal_before_page && page_lsn > flushed_lsn {
303 return Err(DurabilityContractError {
304 message: format!(
305 "Page LSN {} exceeds flushed WAL LSN {} - would violate WAL protocol",
306 page_lsn, flushed_lsn
307 ),
308 contract_violated: "WAL_BEFORE_PAGE",
309 });
310 }
311 Ok(())
312 }
313
314 pub fn validate_commit(&self, commit_lsn: u64, flushed_lsn: u64) -> Result<(), DurabilityContractError> {
318 if self.force_on_commit && commit_lsn > flushed_lsn {
319 return Err(DurabilityContractError {
320 message: format!(
321 "Commit LSN {} not yet flushed (flushed: {}) - would violate force-on-commit",
322 commit_lsn, flushed_lsn
323 ),
324 contract_violated: "FORCE_ON_COMMIT",
325 });
326 }
327 Ok(())
328 }
329
330 pub fn max_data_loss_window(&self) -> Option<Duration> {
332 match &self.level {
333 DurabilityLevel::Durable => None, DurabilityLevel::GroupCommit { flush_interval, .. } => Some(*flush_interval),
335 DurabilityLevel::Periodic { sync_interval, .. } => Some(*sync_interval),
336 DurabilityLevel::NoSync { .. } => None, }
338 }
339
340 pub fn is_production_ready(&self) -> bool {
342 self.level.is_production_safe() && self.wal_before_page && self.force_on_commit
343 }
344
345 pub fn describe_guarantees(&self) -> String {
347 let mut desc = Vec::new();
348
349 desc.push(format!("Durability: {}", self.level));
350
351 if self.wal_before_page {
352 desc.push("WAL-before-page: ENFORCED".to_string());
353 } else {
354 desc.push("WAL-before-page: DISABLED (UNSAFE)".to_string());
355 }
356
357 if self.force_on_commit {
358 desc.push("Force-on-commit: ENFORCED".to_string());
359 } else {
360 desc.push("Force-on-commit: DISABLED (UNSAFE)".to_string());
361 }
362
363 if let Some(window) = self.max_data_loss_window() {
364 desc.push(format!("Max data loss window: {:?}", window));
365 } else if matches!(self.level, DurabilityLevel::NoSync { .. }) {
366 desc.push("Max data loss window: UNBOUNDED (all data at risk)".to_string());
367 } else {
368 desc.push("Max data loss window: None (fully durable)".to_string());
369 }
370
371 if self.archive.enabled {
372 desc.push("WAL archiving: ENABLED".to_string());
373 }
374
375 match self.checkpoint.mode {
376 CheckpointMode::Full => desc.push("Checkpoint: Full (may cause stalls)".to_string()),
377 CheckpointMode::Incremental => desc.push("Checkpoint: Incremental".to_string()),
378 CheckpointMode::CopyOnWrite => desc.push("Checkpoint: Copy-on-write".to_string()),
379 }
380
381 desc.join("\n")
382 }
383}
384
385impl Default for DurabilityContract {
386 fn default() -> Self {
387 Self::production()
388 }
389}
390
391#[derive(Debug, Clone)]
393pub struct RecoveryPoint {
394 pub lsn: u64,
396 pub timestamp: u64,
398 pub checkpoint_id: Option<u64>,
400 pub description: String,
402}
403
404#[derive(Debug, Clone)]
406pub struct WalSegmentMetadata {
407 pub segment_number: u64,
409 pub start_lsn: u64,
411 pub end_lsn: u64,
413 pub size_bytes: u64,
415 pub checksum: u32,
417 pub is_complete: bool,
419 pub archive_status: ArchiveStatus,
421}
422
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
425pub enum ArchiveStatus {
426 Pending,
428 InProgress,
430 Archived,
432 Failed,
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439
440 #[test]
441 fn test_durability_level_defaults() {
442 let level = DurabilityLevel::default();
443 assert!(level.is_production_safe());
444 assert!(!level.requires_risk_acceptance());
445 }
446
447 #[test]
448 fn test_unsafe_requires_acknowledgement() {
449 let result = DurabilityContract::new(DurabilityLevel::NoSync {
450 accept_data_loss_risk: false,
451 });
452 assert!(result.is_err());
453
454 let result = DurabilityContract::new(DurabilityLevel::NoSync {
455 accept_data_loss_risk: true,
456 });
457 assert!(result.is_ok());
458 }
459
460 #[test]
461 fn test_wal_before_page_validation() {
462 let contract = DurabilityContract::production();
463
464 assert!(contract.validate_page_flush(100, 100).is_ok());
466 assert!(contract.validate_page_flush(50, 100).is_ok());
467
468 assert!(contract.validate_page_flush(150, 100).is_err());
470 }
471
472 #[test]
473 fn test_force_on_commit_validation() {
474 let contract = DurabilityContract::production();
475
476 assert!(contract.validate_commit(100, 100).is_ok());
478 assert!(contract.validate_commit(50, 100).is_ok());
479
480 assert!(contract.validate_commit(150, 100).is_err());
482 }
483
484 #[test]
485 fn test_data_loss_window() {
486 let durable = DurabilityContract::new(DurabilityLevel::Durable).unwrap();
487 assert!(durable.max_data_loss_window().is_none());
488
489 let group = DurabilityContract::new(DurabilityLevel::GroupCommit {
490 max_batch: 100,
491 flush_interval: Duration::from_millis(10),
492 }).unwrap();
493 assert_eq!(group.max_data_loss_window(), Some(Duration::from_millis(10)));
494 }
495
496 #[test]
497 fn test_production_ready_check() {
498 let contract = DurabilityContract::production();
499 assert!(contract.is_production_ready());
500
501 let unsafe_contract = DurabilityContract::new(DurabilityLevel::NoSync {
502 accept_data_loss_risk: true,
503 }).unwrap();
504 assert!(!unsafe_contract.is_production_ready());
505 }
506}