1use std::fmt;
42use std::path::PathBuf;
43use std::time::Duration;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum DurabilityLevel {
48 Durable,
51
52 GroupCommit {
56 max_batch: usize,
58 flush_interval: Duration,
60 },
61
62 Periodic {
65 sync_interval: Duration,
67 accept_data_loss_risk: bool,
69 },
70
71 NoSync {
74 accept_data_loss_risk: bool,
76 },
77}
78
79impl DurabilityLevel {
80 pub fn is_production_safe(&self) -> bool {
82 matches!(
83 self,
84 DurabilityLevel::Durable | DurabilityLevel::GroupCommit { .. }
85 )
86 }
87
88 pub fn requires_risk_acceptance(&self) -> bool {
90 matches!(
91 self,
92 DurabilityLevel::Periodic { .. } | DurabilityLevel::NoSync { .. }
93 )
94 }
95
96 pub fn default_production() -> Self {
98 DurabilityLevel::GroupCommit {
99 max_batch: 128,
100 flush_interval: Duration::from_millis(10),
101 }
102 }
103
104 pub fn safest() -> Self {
106 DurabilityLevel::Durable
107 }
108}
109
110impl Default for DurabilityLevel {
111 fn default() -> Self {
112 Self::default_production()
113 }
114}
115
116impl fmt::Display for DurabilityLevel {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 match self {
119 DurabilityLevel::Durable => write!(f, "Durable (fsync per commit)"),
120 DurabilityLevel::GroupCommit {
121 max_batch,
122 flush_interval,
123 } => {
124 write!(
125 f,
126 "GroupCommit (batch={}, interval={:?})",
127 max_batch, flush_interval
128 )
129 }
130 DurabilityLevel::Periodic { sync_interval, .. } => {
131 write!(f, "UNSAFE:Periodic (interval={:?})", sync_interval)
132 }
133 DurabilityLevel::NoSync { .. } => {
134 write!(f, "UNSAFE:NoSync (testing only)")
135 }
136 }
137 }
138}
139
140#[derive(Debug, Clone)]
142pub struct DurabilityContractError {
143 pub message: String,
144 pub contract_violated: &'static str,
145}
146
147impl fmt::Display for DurabilityContractError {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 write!(
150 f,
151 "Durability contract '{}' violated: {}",
152 self.contract_violated, self.message
153 )
154 }
155}
156
157impl std::error::Error for DurabilityContractError {}
158
159#[derive(Debug, Clone)]
161pub struct WalArchiveConfig {
162 pub enabled: bool,
164 pub destination: WalArchiveDestination,
166 pub trigger: WalArchiveTrigger,
168 pub compress: bool,
170 pub verify: bool,
172}
173
174impl Default for WalArchiveConfig {
175 fn default() -> Self {
176 Self {
177 enabled: false,
178 destination: WalArchiveDestination::LocalPath(PathBuf::from(
179 "/var/lib/sochdb/wal_archive",
180 )),
181 trigger: WalArchiveTrigger::SegmentFull,
182 compress: true,
183 verify: true,
184 }
185 }
186}
187
188#[derive(Debug, Clone)]
190pub enum WalArchiveDestination {
191 LocalPath(PathBuf),
193 ExternalCommand(String),
195 ObjectStorage {
197 endpoint: String,
198 bucket: String,
199 prefix: String,
200 },
201}
202
203#[derive(Debug, Clone, Copy)]
205pub enum WalArchiveTrigger {
206 SegmentFull,
208 TimeBased(Duration),
210 Both { max_interval: Duration },
212}
213
214#[derive(Debug, Clone)]
216pub struct CheckpointConfig {
217 pub mode: CheckpointMode,
219 pub max_duration: Duration,
221 pub wal_size_trigger: u64,
223 pub txn_count_trigger: u64,
225}
226
227impl Default for CheckpointConfig {
228 fn default() -> Self {
229 Self {
230 mode: CheckpointMode::Incremental,
231 max_duration: Duration::from_secs(30),
232 wal_size_trigger: 128 * 1024 * 1024, txn_count_trigger: 100_000,
234 }
235 }
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
240pub enum CheckpointMode {
241 Full,
243 Incremental,
245 CopyOnWrite,
247}
248
249pub struct DurabilityContract {
251 pub level: DurabilityLevel,
253 pub archive: WalArchiveConfig,
255 pub checkpoint: CheckpointConfig,
257 wal_before_page: bool,
259 force_on_commit: bool,
261}
262
263impl DurabilityContract {
264 pub fn new(level: DurabilityLevel) -> Result<Self, DurabilityContractError> {
266 match &level {
268 DurabilityLevel::Periodic {
269 accept_data_loss_risk,
270 ..
271 }
272 | DurabilityLevel::NoSync {
273 accept_data_loss_risk,
274 ..
275 } => {
276 if !accept_data_loss_risk {
277 return Err(DurabilityContractError {
278 message: format!(
279 "Unsafe durability level '{}' requires explicit accept_data_loss_risk=true",
280 level
281 ),
282 contract_violated: "EXPLICIT_RISK_ACCEPTANCE",
283 });
284 }
285 }
286 _ => {}
287 }
288
289 Ok(Self {
290 level,
291 archive: WalArchiveConfig::default(),
292 checkpoint: CheckpointConfig::default(),
293 wal_before_page: true,
294 force_on_commit: true,
295 })
296 }
297
298 pub fn production() -> Self {
300 Self {
301 level: DurabilityLevel::default_production(),
302 archive: WalArchiveConfig::default(),
303 checkpoint: CheckpointConfig::default(),
304 wal_before_page: true,
305 force_on_commit: true,
306 }
307 }
308
309 pub fn with_archive(mut self, config: WalArchiveConfig) -> Self {
311 self.archive = config;
312 self
313 }
314
315 pub fn with_checkpoint(mut self, config: CheckpointConfig) -> Self {
317 self.checkpoint = config;
318 self
319 }
320
321 pub fn validate_page_flush(
325 &self,
326 page_lsn: u64,
327 flushed_lsn: u64,
328 ) -> Result<(), DurabilityContractError> {
329 if self.wal_before_page && page_lsn > flushed_lsn {
330 return Err(DurabilityContractError {
331 message: format!(
332 "Page LSN {} exceeds flushed WAL LSN {} - would violate WAL protocol",
333 page_lsn, flushed_lsn
334 ),
335 contract_violated: "WAL_BEFORE_PAGE",
336 });
337 }
338 Ok(())
339 }
340
341 pub fn validate_commit(
345 &self,
346 commit_lsn: u64,
347 flushed_lsn: u64,
348 ) -> Result<(), DurabilityContractError> {
349 if self.force_on_commit && commit_lsn > flushed_lsn {
350 return Err(DurabilityContractError {
351 message: format!(
352 "Commit LSN {} not yet flushed (flushed: {}) - would violate force-on-commit",
353 commit_lsn, flushed_lsn
354 ),
355 contract_violated: "FORCE_ON_COMMIT",
356 });
357 }
358 Ok(())
359 }
360
361 pub fn max_data_loss_window(&self) -> Option<Duration> {
363 match &self.level {
364 DurabilityLevel::Durable => None, DurabilityLevel::GroupCommit { flush_interval, .. } => Some(*flush_interval),
366 DurabilityLevel::Periodic { sync_interval, .. } => Some(*sync_interval),
367 DurabilityLevel::NoSync { .. } => None, }
369 }
370
371 pub fn is_production_ready(&self) -> bool {
373 self.level.is_production_safe() && self.wal_before_page && self.force_on_commit
374 }
375
376 pub fn describe_guarantees(&self) -> String {
378 let mut desc = Vec::new();
379
380 desc.push(format!("Durability: {}", self.level));
381
382 if self.wal_before_page {
383 desc.push("WAL-before-page: ENFORCED".to_string());
384 } else {
385 desc.push("WAL-before-page: DISABLED (UNSAFE)".to_string());
386 }
387
388 if self.force_on_commit {
389 desc.push("Force-on-commit: ENFORCED".to_string());
390 } else {
391 desc.push("Force-on-commit: DISABLED (UNSAFE)".to_string());
392 }
393
394 if let Some(window) = self.max_data_loss_window() {
395 desc.push(format!("Max data loss window: {:?}", window));
396 } else if matches!(self.level, DurabilityLevel::NoSync { .. }) {
397 desc.push("Max data loss window: UNBOUNDED (all data at risk)".to_string());
398 } else {
399 desc.push("Max data loss window: None (fully durable)".to_string());
400 }
401
402 if self.archive.enabled {
403 desc.push("WAL archiving: ENABLED".to_string());
404 }
405
406 match self.checkpoint.mode {
407 CheckpointMode::Full => desc.push("Checkpoint: Full (may cause stalls)".to_string()),
408 CheckpointMode::Incremental => desc.push("Checkpoint: Incremental".to_string()),
409 CheckpointMode::CopyOnWrite => desc.push("Checkpoint: Copy-on-write".to_string()),
410 }
411
412 desc.join("\n")
413 }
414}
415
416impl Default for DurabilityContract {
417 fn default() -> Self {
418 Self::production()
419 }
420}
421
422#[derive(Debug, Clone)]
424pub struct RecoveryPoint {
425 pub lsn: u64,
427 pub timestamp: u64,
429 pub checkpoint_id: Option<u64>,
431 pub description: String,
433}
434
435#[derive(Debug, Clone)]
437pub struct WalSegmentMetadata {
438 pub segment_number: u64,
440 pub start_lsn: u64,
442 pub end_lsn: u64,
444 pub size_bytes: u64,
446 pub checksum: u32,
448 pub is_complete: bool,
450 pub archive_status: ArchiveStatus,
452}
453
454#[derive(Debug, Clone, Copy, PartialEq, Eq)]
456pub enum ArchiveStatus {
457 Pending,
459 InProgress,
461 Archived,
463 Failed,
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[test]
472 fn test_durability_level_defaults() {
473 let level = DurabilityLevel::default();
474 assert!(level.is_production_safe());
475 assert!(!level.requires_risk_acceptance());
476 }
477
478 #[test]
479 fn test_unsafe_requires_acknowledgement() {
480 let result = DurabilityContract::new(DurabilityLevel::NoSync {
481 accept_data_loss_risk: false,
482 });
483 assert!(result.is_err());
484
485 let result = DurabilityContract::new(DurabilityLevel::NoSync {
486 accept_data_loss_risk: true,
487 });
488 assert!(result.is_ok());
489 }
490
491 #[test]
492 fn test_wal_before_page_validation() {
493 let contract = DurabilityContract::production();
494
495 assert!(contract.validate_page_flush(100, 100).is_ok());
497 assert!(contract.validate_page_flush(50, 100).is_ok());
498
499 assert!(contract.validate_page_flush(150, 100).is_err());
501 }
502
503 #[test]
504 fn test_force_on_commit_validation() {
505 let contract = DurabilityContract::production();
506
507 assert!(contract.validate_commit(100, 100).is_ok());
509 assert!(contract.validate_commit(50, 100).is_ok());
510
511 assert!(contract.validate_commit(150, 100).is_err());
513 }
514
515 #[test]
516 fn test_data_loss_window() {
517 let durable = DurabilityContract::new(DurabilityLevel::Durable).unwrap();
518 assert!(durable.max_data_loss_window().is_none());
519
520 let group = DurabilityContract::new(DurabilityLevel::GroupCommit {
521 max_batch: 100,
522 flush_interval: Duration::from_millis(10),
523 })
524 .unwrap();
525 assert_eq!(
526 group.max_data_loss_window(),
527 Some(Duration::from_millis(10))
528 );
529 }
530
531 #[test]
532 fn test_production_ready_check() {
533 let contract = DurabilityContract::production();
534 assert!(contract.is_production_ready());
535
536 let unsafe_contract = DurabilityContract::new(DurabilityLevel::NoSync {
537 accept_data_loss_risk: true,
538 })
539 .unwrap();
540 assert!(!unsafe_contract.is_production_ready());
541 }
542}