Skip to main content

sentinel_wal/
recovery.rs

1//! WAL recovery functionality.
2//!
3//! This module provides recovery of collections from WAL entries.
4//! Unlike the previous flawed approach, this recovery:
5//! 1. Only replays operations that haven't been applied yet
6//! 2. Handles conflicts gracefully
7//! 3. Is idempotent (can be run multiple times safely)
8
9use std::collections::HashMap;
10
11use futures::StreamExt as _;
12use serde::{Deserialize, Serialize};
13use tracing::{debug, warn};
14
15use crate::{EntryType, LogEntry, Result, WalDocumentOps, WalManager};
16
17/// Result of WAL recovery operation
18#[derive(Debug)]
19pub struct WalRecoveryResult {
20    /// Number of operations successfully recovered
21    pub recovered_operations: usize,
22    /// Operations that were skipped (already applied)
23    pub skipped_operations:   usize,
24    /// Operations that failed to recover
25    pub failed_operations:    usize,
26    /// Detailed failure reasons
27    pub failures:             Vec<WalRecoveryFailure>,
28}
29
30/// Details of a recovery failure
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct WalRecoveryFailure {
33    /// Transaction ID of the failed operation
34    pub transaction_id: String,
35    /// Document ID affected
36    pub document_id:    String,
37    /// Type of operation that failed
38    pub operation_type: String,
39    /// Reason for failure
40    pub reason:         String,
41}
42
43/// Recover collection state from WAL entries
44///
45/// This function replays WAL entries to restore the collection to its
46/// correct state. It only applies operations that haven't been applied yet
47/// and handles conflicts gracefully.
48#[allow(
49    clippy::arithmetic_side_effects,
50    reason = "safe counter increments in recovery"
51)]
52pub async fn recover_from_wal_safe<D>(wal: &WalManager, document_ops: &D) -> Result<WalRecoveryResult>
53where
54    D: WalDocumentOps,
55{
56    // Enable recovery mode to skip WAL logging during replay
57    document_ops.set_recovery_mode(true);
58
59    let result = async {
60        let mut recovered = 0;
61        let mut skipped = 0;
62        let mut failed = 0;
63        let mut failures = Vec::new();
64
65        // Track applied operations to avoid duplicates
66        let mut applied_operations = HashMap::new(); // (doc_id, txn_id) -> applied
67
68        let stream = wal.stream_entries();
69        let mut pinned_stream = std::pin::pin!(stream);
70        while let Some(entry_result) = pinned_stream.next().await {
71            match entry_result {
72                Ok(entry) => {
73                    let key = (
74                        entry.document_id_str().to_owned(),
75                        entry.transaction_id_str().to_owned(),
76                    );
77
78                    // Skip if this operation was already applied
79                    if applied_operations.contains_key(&key) {
80                        skipped += 1;
81                        continue;
82                    }
83
84                    match replay_wal_entry_safe(&entry, document_ops).await {
85                        Ok(true) => {
86                            recovered += 1;
87                            applied_operations.insert(key, true);
88                        },
89                        Ok(false) => {
90                            skipped += 1;
91                            applied_operations.insert(key, true);
92                        },
93                        Err(e) => {
94                            failed += 1;
95                            failures.push(WalRecoveryFailure {
96                                transaction_id: entry.transaction_id_str().to_owned(),
97                                document_id:    entry.document_id_str().to_owned(),
98                                operation_type: format!("{:?}", entry.entry_type),
99                                reason:         format!("{}", e),
100                            });
101                        },
102                    }
103                },
104                Err(e) => {
105                    failed += 1;
106                    failures.push(WalRecoveryFailure {
107                        transaction_id: "unknown".to_owned(),
108                        document_id:    "unknown".to_owned(),
109                        operation_type: "read".to_owned(),
110                        reason:         format!("Failed to read WAL entry: {}", e),
111                    });
112                },
113            }
114        }
115
116        debug!(
117            "WAL recovery completed: {} recovered, {} skipped, {} failed",
118            recovered, skipped, failed
119        );
120
121        Ok(WalRecoveryResult {
122            recovered_operations: recovered,
123            skipped_operations: skipped,
124            failed_operations: failed,
125            failures,
126        })
127    }
128    .await;
129
130    // Disable recovery mode
131    document_ops.set_recovery_mode(false);
132
133    result
134}
135
136/// Safely replay a single WAL entry
137///
138/// Returns:
139/// - Ok(true) if operation was applied
140/// - Ok(false) if operation was skipped (already applied or conflict)
141/// - Err(_) if operation failed
142async fn replay_wal_entry_safe<D>(entry: &LogEntry, document_ops: &D) -> Result<bool>
143where
144    D: WalDocumentOps,
145{
146    match entry.entry_type {
147        EntryType::Insert => {
148            if let Some(data_str) = entry.data.as_ref() {
149                // Parse the JSON data
150                let data: serde_json::Value = serde_json::from_str(data_str)
151                    .map_err(|e| crate::error::WalError::Serialization(format!("Invalid JSON in WAL insert: {}", e)))?;
152
153                // Check if document already exists
154                match document_ops.get_document(entry.document_id_str()).await {
155                    Ok(Some(_)) => {
156                        // Document already exists, skip insert
157                        debug!(
158                            "Skipping insert for existing document {}",
159                            entry.document_id_str()
160                        );
161                        Ok(false)
162                    },
163                    Ok(None) => {
164                        // Document doesn't exist, apply insert
165                        document_ops
166                            .apply_operation(&EntryType::Insert, entry.document_id_str(), Some(data))
167                            .await?;
168                        Ok(true)
169                    },
170                    Err(e) => {
171                        // Error checking document, fail operation
172                        Err(e)
173                    },
174                }
175            }
176            else {
177                warn!(
178                    "WAL insert entry missing data for document {}",
179                    entry.document_id_str()
180                );
181                Ok(false)
182            }
183        },
184        EntryType::Update => {
185            if let Some(data_str) = entry.data.as_ref() {
186                // Parse the JSON data
187                let data: serde_json::Value = serde_json::from_str(data_str)
188                    .map_err(|e| crate::error::WalError::Serialization(format!("Invalid JSON in WAL update: {}", e)))?;
189
190                // Check if document exists
191                match document_ops.get_document(entry.document_id_str()).await {
192                    Ok(Some(existing_doc)) => {
193                        // Document exists, check if update is needed
194                        if existing_doc != data {
195                            document_ops
196                                .apply_operation(&EntryType::Update, entry.document_id_str(), Some(data))
197                                .await?;
198                            Ok(true)
199                        }
200                        else {
201                            Ok(false)
202                        }
203                    },
204                    Ok(None) => {
205                        // Document doesn't exist, this is an error for update
206                        warn!(
207                            "Skipping update for non-existent document {}",
208                            entry.document_id_str()
209                        );
210                        Ok(false)
211                    },
212                    Err(e) => {
213                        // Error checking document, fail operation
214                        Err(e)
215                    },
216                }
217            }
218            else {
219                warn!(
220                    "WAL update entry missing data for document {}",
221                    entry.document_id_str()
222                );
223                Ok(false)
224            }
225        },
226        EntryType::Delete => {
227            // Check if document exists
228            match document_ops.get_document(entry.document_id_str()).await {
229                Ok(Some(_)) => {
230                    // Document exists, apply delete
231                    document_ops
232                        .apply_operation(&EntryType::Delete, entry.document_id_str(), None)
233                        .await?;
234                    Ok(true)
235                },
236                Ok(None) => {
237                    // Document doesn't exist, skip delete
238                    debug!(
239                        "Skipping delete for non-existent document {}",
240                        entry.document_id_str()
241                    );
242                    Ok(false)
243                },
244                Err(e) => {
245                    // Error checking document, fail operation
246                    Err(e)
247                },
248            }
249        },
250        // Transaction control entries don't affect document state
251        EntryType::Begin | EntryType::Commit | EntryType::Rollback => Ok(false),
252    }
253}
254
255/// Recover collection from WAL with conflict resolution
256///
257/// This is a more aggressive recovery that attempts to resolve conflicts
258/// by overwriting conflicting states.
259#[allow(
260    clippy::arithmetic_side_effects,
261    reason = "safe counter increments in recovery"
262)]
263pub async fn recover_from_wal_force<D>(wal: &WalManager, document_ops: &D) -> Result<WalRecoveryResult>
264where
265    D: WalDocumentOps,
266{
267    let mut recovered = 0;
268    let mut skipped = 0;
269    let mut failed = 0;
270    let mut failures = Vec::new();
271
272    let stream = wal.stream_entries();
273    let mut pinned_stream = std::pin::pin!(stream);
274    while let Some(entry_result) = pinned_stream.next().await {
275        match entry_result {
276            Ok(entry) => {
277                match replay_wal_entry_force(&entry, document_ops).await {
278                    Ok(applied) => {
279                        if applied {
280                            recovered += 1;
281                        }
282                        else {
283                            skipped += 1;
284                        }
285                    },
286                    Err(e) => {
287                        failed += 1;
288                        failures.push(WalRecoveryFailure {
289                            transaction_id: entry.transaction_id_str().to_owned(),
290                            document_id:    entry.document_id_str().to_owned(),
291                            operation_type: format!("{:?}", entry.entry_type),
292                            reason:         format!("{}", e),
293                        });
294                    },
295                }
296            },
297            Err(e) => {
298                failed += 1;
299                failures.push(WalRecoveryFailure {
300                    transaction_id: "unknown".to_owned(),
301                    document_id:    "unknown".to_owned(),
302                    operation_type: "read".to_owned(),
303                    reason:         format!("Failed to read WAL entry: {}", e),
304                });
305            },
306        }
307    }
308
309    debug!(
310        "Forced WAL recovery completed: {} recovered, {} skipped, {} failed",
311        recovered, skipped, failed
312    );
313
314    Ok(WalRecoveryResult {
315        recovered_operations: recovered,
316        skipped_operations: skipped,
317        failed_operations: failed,
318        failures,
319    })
320}
321
322/// Force replay a WAL entry (overwrites conflicts)
323async fn replay_wal_entry_force<D>(entry: &LogEntry, document_ops: &D) -> Result<bool>
324where
325    D: WalDocumentOps,
326{
327    match entry.entry_type {
328        EntryType::Insert | EntryType::Update => {
329            if let Some(data_str) = entry.data.as_ref() {
330                let data: serde_json::Value = serde_json::from_str(data_str)
331                    .map_err(|e| crate::error::WalError::Serialization(format!("Invalid JSON in WAL entry: {}", e)))?;
332
333                // For force recovery, always apply the operation
334                document_ops
335                    .apply_operation(&entry.entry_type, entry.document_id_str(), Some(data))
336                    .await?;
337                Ok(true)
338            }
339            else {
340                Ok(false)
341            }
342        },
343        EntryType::Delete => {
344            // Force delete (ignore if document doesn't exist)
345            match document_ops
346                .apply_operation(&EntryType::Delete, entry.document_id_str(), None)
347                .await
348            {
349                Ok(_) => Ok(true),
350                Err(crate::error::WalError::Io {
351                    ..
352                }) => Ok(false), // Assume not found
353                Err(e) => Err(e),
354            }
355        },
356        // Transaction control entries don't affect document state
357        EntryType::Begin | EntryType::Commit | EntryType::Rollback => Ok(false),
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use std::{collections::HashMap, sync::Mutex};
364
365    use super::*;
366    use crate::{EntryType, LogEntry};
367
368    // Mock implementation of WalDocumentOps for testing
369    struct MockDocumentOps {
370        documents: Mutex<HashMap<String, serde_json::Value>>,
371    }
372
373    impl MockDocumentOps {
374        fn new() -> Self {
375            Self {
376                documents: Mutex::new(HashMap::new()),
377            }
378        }
379    }
380
381    #[async_trait::async_trait]
382    impl WalDocumentOps for MockDocumentOps {
383        async fn get_document(&self, id: &str) -> Result<Option<serde_json::Value>> {
384            Ok(self.documents.lock().unwrap().get(id).cloned())
385        }
386
387        async fn apply_operation(
388            &self,
389            operation: &EntryType,
390            id: &str,
391            data: Option<serde_json::Value>,
392        ) -> Result<()> {
393            let mut docs = self.documents.lock().unwrap();
394            match operation {
395                EntryType::Insert | EntryType::Update => {
396                    if let Some(data) = data {
397                        docs.insert(id.to_string(), data);
398                    }
399                },
400                EntryType::Delete => {
401                    docs.remove(id);
402                },
403                _ => {}, // No-op for other operations
404            }
405            Ok(())
406        }
407    }
408
409    fn create_test_entry(entry_type: EntryType, doc_id: &str, data: Option<&str>) -> LogEntry {
410        use crate::entry::{FixedBytes256, FixedBytes32};
411        LogEntry {
412            entry_type,
413            collection: FixedBytes256::from(b"test" as &[u8]),
414            document_id: FixedBytes256::from(doc_id.as_bytes()),
415            transaction_id: FixedBytes32::from(b"txn-123" as &[u8]),
416            data: data.map(|s| s.to_string()),
417            timestamp: chrono::Utc::now().timestamp_millis() as u64,
418        }
419    }
420
421    #[tokio::test]
422    async fn test_replay_wal_entry_safe_insert_new_document() {
423        let ops = MockDocumentOps::new();
424        let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "test"}"#));
425
426        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
427        assert!(result);
428
429        let doc = ops.get_document("doc1").await.unwrap();
430        assert_eq!(doc.unwrap()["name"], "test");
431    }
432
433    #[tokio::test]
434    async fn test_replay_wal_entry_safe_insert_existing_document() {
435        let ops = MockDocumentOps::new();
436        // Pre-insert document
437        ops.apply_operation(
438            &EntryType::Insert,
439            "doc1",
440            Some(serde_json::json!({"name": "existing"})),
441        )
442        .await
443        .unwrap();
444
445        let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "duplicate"}"#));
446
447        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
448        assert!(!result); // Should be skipped
449
450        let doc = ops.get_document("doc1").await.unwrap();
451        assert_eq!(doc.unwrap()["name"], "existing"); // Should remain unchanged
452    }
453
454    #[tokio::test]
455    async fn test_replay_wal_entry_safe_update_existing_document() {
456        let ops = MockDocumentOps::new();
457        // Pre-insert document
458        ops.apply_operation(
459            &EntryType::Insert,
460            "doc1",
461            Some(serde_json::json!({"name": "old"})),
462        )
463        .await
464        .unwrap();
465
466        let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"name": "updated"}"#));
467
468        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
469        assert!(result);
470
471        let doc = ops.get_document("doc1").await.unwrap();
472        assert_eq!(doc.unwrap()["name"], "updated");
473    }
474
475    #[tokio::test]
476    async fn test_replay_wal_entry_safe_update_nonexistent_document() {
477        let ops = MockDocumentOps::new();
478        let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"name": "updated"}"#));
479
480        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
481        assert!(!result); // Should be skipped
482    }
483
484    #[tokio::test]
485    async fn test_replay_wal_entry_safe_delete_existing_document() {
486        let ops = MockDocumentOps::new();
487        // Pre-insert document
488        ops.apply_operation(
489            &EntryType::Insert,
490            "doc1",
491            Some(serde_json::json!({"name": "test"})),
492        )
493        .await
494        .unwrap();
495
496        let entry = create_test_entry(EntryType::Delete, "doc1", None);
497
498        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
499        assert!(result);
500
501        let doc = ops.get_document("doc1").await.unwrap();
502        assert!(doc.is_none());
503    }
504
505    #[tokio::test]
506    async fn test_replay_wal_entry_safe_delete_nonexistent_document() {
507        let ops = MockDocumentOps::new();
508        let entry = create_test_entry(EntryType::Delete, "doc1", None);
509
510        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
511        assert!(!result); // Should be skipped
512    }
513
514    #[tokio::test]
515    async fn test_replay_wal_entry_safe_transaction_control() {
516        let ops = MockDocumentOps::new();
517        let entry = create_test_entry(EntryType::Begin, "doc1", None);
518
519        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
520        assert!(!result); // Transaction control should be skipped
521    }
522
523    #[tokio::test]
524    async fn test_replay_wal_entry_safe_invalid_json() {
525        let ops = MockDocumentOps::new();
526        let entry = create_test_entry(EntryType::Insert, "doc1", Some("invalid json"));
527
528        let result = replay_wal_entry_safe(&entry, &ops).await;
529        assert!(result.is_err());
530    }
531
532    // ============ Additional Error Path and Edge Case Tests ============
533
534    #[tokio::test]
535    async fn test_recover_wal_safe_stream_error_handling() {
536        // Test that stream errors are handled gracefully
537        use tempfile::tempdir;
538
539        use crate::{EntryType, LogEntry, WalConfig, WalManager};
540
541        let temp_dir = tempdir().unwrap();
542        let wal_path = temp_dir.path().join("test_stream_error.wal");
543
544        let wal = WalManager::new(wal_path.clone(), WalConfig::default())
545            .await
546            .unwrap();
547
548        // Write one valid entry
549        wal.write_entry(LogEntry::new(
550            EntryType::Insert,
551            "users".to_string(),
552            "user-1".to_string(),
553            Some(serde_json::json!({"name": "Alice"})),
554        ))
555        .await
556        .unwrap();
557
558        let ops = MockDocumentOps::new();
559        let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
560
561        // Should recover the valid entry
562        assert_eq!(result.recovered_operations, 1);
563        assert_eq!(result.failed_operations, 0);
564    }
565
566    #[tokio::test]
567    async fn test_replay_wal_safe_doc_read_error() {
568        // Test that errors when reading document are propagated correctly
569        struct ErrorDocumentOps;
570
571        #[async_trait::async_trait]
572        impl WalDocumentOps for ErrorDocumentOps {
573            async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> {
574                Err(crate::WalError::Io(std::io::Error::new(
575                    std::io::ErrorKind::Other,
576                    "test error",
577                )))
578            }
579
580            async fn apply_operation(
581                &self,
582                _operation: &EntryType,
583                _id: &str,
584                _data: Option<serde_json::Value>,
585            ) -> Result<()> {
586                Ok(())
587            }
588        }
589
590        let ops = ErrorDocumentOps;
591        let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"name": "test"}"#));
592
593        let result = replay_wal_entry_safe(&entry, &ops).await;
594        assert!(result.is_err());
595    }
596
597    #[tokio::test]
598    async fn test_recover_wal_safe_many_duplicates() {
599        // Test idempotency with many duplicate transaction IDs
600        use tempfile::tempdir;
601
602        use crate::{EntryType, LogEntry, WalConfig, WalManager};
603
604        let temp_dir = tempdir().unwrap();
605        let wal_path = temp_dir.path().join("test_many_duplicates.wal");
606
607        let wal = WalManager::new(wal_path.clone(), WalConfig::default())
608            .await
609            .unwrap();
610
611        // Write the same operation multiple times with same transaction ID
612        for _ in 0 .. 5 {
613            wal.write_entry(LogEntry::new(
614                EntryType::Insert,
615                "users".to_string(),
616                "user-1".to_string(),
617                Some(serde_json::json!({"name": "Alice"})),
618            ))
619            .await
620            .unwrap();
621        }
622
623        let ops = MockDocumentOps::new();
624        let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
625
626        // First one recovered, rest skipped due to duplicate detection
627        assert_eq!(result.recovered_operations, 1);
628        assert_eq!(result.skipped_operations, 4);
629    }
630
631    #[tokio::test]
632    async fn test_replay_wal_force_io_error_on_delete() {
633        // Test force delete handles IO errors gracefully
634        struct IoErrorDocumentOps;
635
636        #[async_trait::async_trait]
637        impl WalDocumentOps for IoErrorDocumentOps {
638            async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
639
640            async fn apply_operation(
641                &self,
642                operation: &EntryType,
643                _id: &str,
644                _data: Option<serde_json::Value>,
645            ) -> Result<()> {
646                if *operation == EntryType::Delete {
647                    // Simulate IO error on delete
648                    Err(crate::WalError::Io(std::io::Error::new(
649                        std::io::ErrorKind::PermissionDenied,
650                        "permission denied",
651                    )))
652                }
653                else {
654                    Ok(())
655                }
656            }
657        }
658
659        let ops = IoErrorDocumentOps;
660        let entry = create_test_entry(EntryType::Delete, "doc1", None);
661
662        // Force replay should handle IO error
663        let result = replay_wal_entry_force(&entry, &ops).await;
664        assert!(result.is_ok() || result.unwrap_err().to_string().contains("permission"));
665    }
666
667    #[tokio::test]
668    async fn test_wal_recovery_failure_special_chars() {
669        // Test WalRecoveryFailure with special characters in fields
670        let failure = WalRecoveryFailure {
671            transaction_id: "txn-特殊字符".to_string(),
672            document_id:    "doc-with-quotes".to_string(),
673            operation_type: "Insert".to_string(),
674            reason:         "Error with\nnewlines\tand tabs".to_string(),
675        };
676
677        // Serialize and deserialize
678        let json = serde_json::to_string(&failure).unwrap();
679        let deserialized: WalRecoveryFailure = serde_json::from_str(&json).unwrap();
680
681        assert_eq!(failure.transaction_id, deserialized.transaction_id);
682        assert_eq!(failure.document_id, deserialized.document_id);
683        assert_eq!(failure.reason, deserialized.reason);
684    }
685
686    #[tokio::test]
687    async fn test_recover_wal_safe_mixed_ops() {
688        // Test recovery with mix of recovered and skipped operations
689        use tempfile::tempdir;
690
691        use crate::{EntryType, LogEntry, WalConfig, WalManager};
692
693        let temp_dir = tempdir().unwrap();
694        let wal_path = temp_dir.path().join("test_mixed.wal");
695
696        let wal = WalManager::new(wal_path.clone(), WalConfig::default())
697            .await
698            .unwrap();
699
700        // Write multiple operations
701        wal.write_entry(LogEntry::new(
702            EntryType::Insert,
703            "users".to_string(),
704            "user-1".to_string(),
705            Some(serde_json::json!({"name": "Alice"})),
706        ))
707        .await
708        .unwrap();
709
710        wal.write_entry(LogEntry::new(
711            EntryType::Insert,
712            "users".to_string(),
713            "user-2".to_string(),
714            Some(serde_json::json!({"name": "Bob"})),
715        ))
716        .await
717        .unwrap();
718
719        wal.write_entry(LogEntry::new(
720            EntryType::Delete,
721            "users".to_string(),
722            "user-1".to_string(),
723            None,
724        ))
725        .await
726        .unwrap();
727
728        let ops = MockDocumentOps::new();
729        let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
730
731        // All 3 operations should be processed
732        assert_eq!(result.recovered_operations, 3);
733    }
734
735    #[tokio::test]
736    async fn test_recover_wal_force_txn_boundaries() {
737        // Test force recovery respects transaction boundaries correctly
738        use tempfile::tempdir;
739
740        use crate::{EntryType, LogEntry, WalConfig, WalManager};
741
742        let temp_dir = tempdir().unwrap();
743        let wal_path = temp_dir.path().join("test_txn_boundaries.wal");
744
745        let wal = WalManager::new(wal_path.clone(), WalConfig::default())
746            .await
747            .unwrap();
748
749        // Write a complete transaction
750        wal.write_entry(LogEntry::new(
751            EntryType::Begin,
752            "users".to_string(),
753            "txn-100".to_string(),
754            None,
755        ))
756        .await
757        .unwrap();
758
759        wal.write_entry(LogEntry::new(
760            EntryType::Insert,
761            "users".to_string(),
762            "user-new".to_string(),
763            Some(serde_json::json!({"name": "New User"})),
764        ))
765        .await
766        .unwrap();
767
768        wal.write_entry(LogEntry::new(
769            EntryType::Commit,
770            "users".to_string(),
771            "txn-100".to_string(),
772            None,
773        ))
774        .await
775        .unwrap();
776
777        let ops = MockDocumentOps::new();
778        let result = recover_from_wal_force(&wal, &ops).await.unwrap();
779
780        // Force recovery should apply insert (1 recovered), skip transaction controls (2 skipped)
781        assert_eq!(result.recovered_operations, 1);
782        assert_eq!(result.skipped_operations, 2);
783    }
784
785    #[tokio::test]
786    async fn test_replay_wal_safe_insert_fail_error() {
787        // Test that duplicate key errors during apply are handled
788        struct FailOnInsertDocumentOps;
789
790        #[async_trait::async_trait]
791        impl WalDocumentOps for FailOnInsertDocumentOps {
792            async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
793
794            async fn apply_operation(
795                &self,
796                operation: &EntryType,
797                id: &str,
798                _data: Option<serde_json::Value>,
799            ) -> Result<()> {
800                if *operation == EntryType::Insert && id == "fail-doc" {
801                    Err(crate::WalError::Io(std::io::Error::new(
802                        std::io::ErrorKind::Other,
803                        "insert failed",
804                    )))
805                }
806                else {
807                    Ok(())
808                }
809            }
810        }
811
812        let ops = FailOnInsertDocumentOps;
813        let entry = create_test_entry(EntryType::Insert, "fail-doc", Some(r#"{"name": "test"}"#));
814
815        let result = replay_wal_entry_safe(&entry, &ops).await;
816        assert!(result.is_err());
817    }
818
819    #[tokio::test]
820    async fn test_wal_recovery_result_zero_check() {
821        // Test WalRecoveryResult with all zeros
822        let result = WalRecoveryResult {
823            recovered_operations: 0,
824            skipped_operations:   0,
825            failed_operations:    0,
826            failures:             vec![],
827        };
828
829        assert_eq!(result.recovered_operations, 0);
830        assert_eq!(result.skipped_operations, 0);
831        assert_eq!(result.failed_operations, 0);
832        assert!(result.failures.is_empty());
833    }
834
835    #[tokio::test]
836    async fn test_recover_wal_safe_partial_update() {
837        // Test update that doesn't actually change the document
838        use tempfile::tempdir;
839
840        use crate::{EntryType, LogEntry, WalConfig, WalManager};
841
842        let temp_dir = tempdir().unwrap();
843        let wal_path = temp_dir.path().join("test_partial_update.wal");
844
845        let wal = WalManager::new(wal_path.clone(), WalConfig::default())
846            .await
847            .unwrap();
848
849        // First insert a document
850        wal.write_entry(LogEntry::new(
851            EntryType::Insert,
852            "users".to_string(),
853            "user-1".to_string(),
854            Some(serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"})),
855        ))
856        .await
857        .unwrap();
858
859        // Update with same data (should be skipped in safe recovery)
860        wal.write_entry(LogEntry::new(
861            EntryType::Update,
862            "users".to_string(),
863            "user-1".to_string(),
864            Some(serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"})), // Same data
865        ))
866        .await
867        .unwrap();
868
869        let ops = MockDocumentOps::new();
870        let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
871
872        // 1 recovered (insert), 1 skipped (update with no change)
873        assert_eq!(result.recovered_operations, 1);
874        assert_eq!(result.skipped_operations, 1);
875    }
876
877    #[tokio::test]
878    async fn test_replay_wal_entry_safe_insert_no_data() {
879        // Test insert entry with missing data
880        let ops = MockDocumentOps::new();
881        let entry = create_test_entry(EntryType::Insert, "doc1", None);
882
883        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
884        assert!(!result); // Should be skipped
885    }
886
887    #[tokio::test]
888    async fn test_replay_wal_entry_safe_update_no_data() {
889        // Test update entry with missing data
890        let ops = MockDocumentOps::new();
891        let entry = create_test_entry(EntryType::Update, "doc1", None);
892
893        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
894        assert!(!result); // Should be skipped
895    }
896
897    #[tokio::test]
898    async fn test_replay_wal_entry_safe_commit() {
899        // Test commit entry skips correctly
900        let ops = MockDocumentOps::new();
901        let entry = create_test_entry(EntryType::Commit, "doc1", None);
902
903        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
904        assert!(!result);
905    }
906
907    #[tokio::test]
908    async fn test_replay_wal_entry_safe_rollback() {
909        // Test rollback entry skips correctly
910        let ops = MockDocumentOps::new();
911        let entry = create_test_entry(EntryType::Rollback, "doc1", None);
912
913        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
914        assert!(!result);
915    }
916
917    #[tokio::test]
918    async fn test_replay_wal_entry_force_insert_no_data() {
919        // Test force insert with missing data
920        let ops = MockDocumentOps::new();
921        let entry = create_test_entry(EntryType::Insert, "doc1", None);
922
923        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
924        assert!(!result); // Should return false for missing data
925    }
926
927    #[tokio::test]
928    async fn test_replay_wal_entry_force_begin() {
929        // Test force replay of begin entry
930        let ops = MockDocumentOps::new();
931        let entry = create_test_entry(EntryType::Begin, "doc1", None);
932
933        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
934        assert!(!result);
935    }
936
937    #[tokio::test]
938    async fn test_replay_wal_entry_force_commit() {
939        // Test force replay of commit entry
940        let ops = MockDocumentOps::new();
941        let entry = create_test_entry(EntryType::Commit, "doc1", None);
942
943        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
944        assert!(!result);
945    }
946
947    #[tokio::test]
948    async fn test_replay_wal_entry_force_rollback() {
949        // Test force replay of rollback entry
950        let ops = MockDocumentOps::new();
951        let entry = create_test_entry(EntryType::Rollback, "doc1", None);
952
953        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
954        assert!(!result);
955    }
956
957    #[tokio::test]
958    async fn test_replay_wal_entry_force_update() {
959        // Test force update
960        let ops = MockDocumentOps::new();
961        ops.apply_operation(
962            &EntryType::Insert,
963            "doc1",
964            Some(serde_json::json!({"version": 1})),
965        )
966        .await
967        .unwrap();
968
969        let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"version": 2}"#));
970
971        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
972        assert!(result);
973
974        let doc = ops.get_document("doc1").await.unwrap();
975        assert_eq!(doc.unwrap()["version"], 2);
976    }
977
978    #[tokio::test]
979    async fn test_recover_wal_force_no_errors() {
980        // Test force recovery without errors
981        use tempfile::tempdir;
982
983        use crate::{EntryType, LogEntry, WalConfig, WalManager};
984
985        let temp_dir = tempdir().unwrap();
986        let wal_path = temp_dir.path().join("test_force_no_errors.wal");
987
988        let wal = WalManager::new(wal_path.clone(), WalConfig::default())
989            .await
990            .unwrap();
991
992        // Write various operations
993        wal.write_entry(LogEntry::new(
994            EntryType::Insert,
995            "users".to_string(),
996            "user-1".to_string(),
997            Some(serde_json::json!({"name": "Alice"})),
998        ))
999        .await
1000        .unwrap();
1001
1002        wal.write_entry(LogEntry::new(
1003            EntryType::Update,
1004            "users".to_string(),
1005            "user-1".to_string(),
1006            Some(serde_json::json!({"name": "Alice Updated"})),
1007        ))
1008        .await
1009        .unwrap();
1010
1011        wal.write_entry(LogEntry::new(
1012            EntryType::Delete,
1013            "users".to_string(),
1014            "user-1".to_string(),
1015            None,
1016        ))
1017        .await
1018        .unwrap();
1019
1020        let ops = MockDocumentOps::new();
1021        let result = recover_from_wal_force(&wal, &ops).await.unwrap();
1022
1023        assert_eq!(result.recovered_operations, 3);
1024        assert_eq!(result.failed_operations, 0);
1025    }
1026
1027    #[tokio::test]
1028    async fn test_recover_wal_safe_error_in_apply() {
1029        // Test safe recovery when apply_operation fails
1030        struct FailingDocumentOps {
1031            fail_on_insert: bool,
1032        }
1033
1034        #[async_trait::async_trait]
1035        impl WalDocumentOps for FailingDocumentOps {
1036            async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
1037
1038            async fn apply_operation(
1039                &self,
1040                operation: &EntryType,
1041                _id: &str,
1042                _data: Option<serde_json::Value>,
1043            ) -> Result<()> {
1044                if self.fail_on_insert && *operation == EntryType::Insert {
1045                    Err(crate::WalError::Io(std::io::Error::new(
1046                        std::io::ErrorKind::Other,
1047                        "apply failed",
1048                    )))
1049                }
1050                else {
1051                    Ok(())
1052                }
1053            }
1054        }
1055
1056        let ops = FailingDocumentOps {
1057            fail_on_insert: true,
1058        };
1059        let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "test"}"#));
1060
1061        let result = replay_wal_entry_safe(&entry, &ops).await;
1062        assert!(result.is_err());
1063    }
1064
1065    #[tokio::test]
1066    async fn test_recover_wal_safe_all_failures() {
1067        // Test recovery result with all failures
1068        use tempfile::tempdir;
1069
1070        use crate::{EntryType, LogEntry, WalConfig, WalManager};
1071
1072        let temp_dir = tempdir().unwrap();
1073        let wal_path = temp_dir.path().join("test_all_failures.wal");
1074
1075        let wal = WalManager::new(wal_path.clone(), WalConfig::default())
1076            .await
1077            .unwrap();
1078
1079        wal.write_entry(LogEntry::new(
1080            EntryType::Insert,
1081            "users".to_string(),
1082            "user-1".to_string(),
1083            Some(serde_json::json!({"name": "Alice"})),
1084        ))
1085        .await
1086        .unwrap();
1087
1088        struct FailAllDocumentOps;
1089
1090        #[async_trait::async_trait]
1091        impl WalDocumentOps for FailAllDocumentOps {
1092            async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> {
1093                Err(crate::WalError::Io(std::io::Error::new(
1094                    std::io::ErrorKind::Other,
1095                    "always fail",
1096                )))
1097            }
1098
1099            async fn apply_operation(
1100                &self,
1101                _operation: &EntryType,
1102                _id: &str,
1103                _data: Option<serde_json::Value>,
1104            ) -> Result<()> {
1105                Ok(())
1106            }
1107        }
1108
1109        let ops = FailAllDocumentOps;
1110        let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
1111
1112        assert_eq!(result.failed_operations, 1);
1113        assert!(!result.failures.is_empty());
1114    }
1115
1116    #[tokio::test]
1117    async fn test_replay_wal_entry_safe_update_same_data() {
1118        // Test update with same data returns false
1119        let ops = MockDocumentOps::new();
1120        let data = serde_json::json!({"name": "test", "age": 25});
1121
1122        ops.apply_operation(&EntryType::Insert, "doc1", Some(data.clone()))
1123            .await
1124            .unwrap();
1125
1126        let entry = create_test_entry(
1127            EntryType::Update,
1128            "doc1",
1129            Some(r#"{"name": "test", "age": 25}"#),
1130        );
1131
1132        let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
1133        assert!(!result); // Should be skipped because data is the same
1134    }
1135
1136    #[tokio::test]
1137    async fn test_replay_wal_entry_force_delete_nonexistent() {
1138        // Test force delete of nonexistent document returns false for IO error
1139        struct DeleteFailsDocumentOps;
1140
1141        #[async_trait::async_trait]
1142        impl WalDocumentOps for DeleteFailsDocumentOps {
1143            async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
1144
1145            async fn apply_operation(
1146                &self,
1147                _operation: &EntryType,
1148                _id: &str,
1149                _data: Option<serde_json::Value>,
1150            ) -> Result<()> {
1151                Err(crate::WalError::Io(std::io::Error::new(
1152                    std::io::ErrorKind::NotFound,
1153                    "not found",
1154                )))
1155            }
1156        }
1157
1158        let ops = DeleteFailsDocumentOps;
1159        let entry = create_test_entry(EntryType::Delete, "doc1", None);
1160
1161        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
1162        assert!(!result); // IO error treated as false
1163    }
1164
1165    #[tokio::test]
1166    async fn test_recover_wal_safe_update_invalid_json() {
1167        // Test update with invalid JSON
1168        let ops = MockDocumentOps::new();
1169        let entry = create_test_entry(EntryType::Update, "doc1", Some("not valid json"));
1170
1171        let result = replay_wal_entry_safe(&entry, &ops).await;
1172        assert!(result.is_err());
1173    }
1174
1175    #[tokio::test]
1176    async fn test_recover_wal_force_update_no_data() {
1177        // Test force update with no data
1178        let ops = MockDocumentOps::new();
1179        let entry = create_test_entry(EntryType::Update, "doc1", None);
1180
1181        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
1182        assert!(!result);
1183    }
1184
1185    #[tokio::test]
1186    async fn test_recover_wal_force_delete_success() {
1187        // Test force delete with success
1188        let ops = MockDocumentOps::new();
1189        ops.apply_operation(
1190            &EntryType::Insert,
1191            "doc1",
1192            Some(serde_json::json!({"name": "test"})),
1193        )
1194        .await
1195        .unwrap();
1196
1197        let entry = create_test_entry(EntryType::Delete, "doc1", None);
1198
1199        let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
1200        assert!(result);
1201
1202        let doc = ops.get_document("doc1").await.unwrap();
1203        assert!(doc.is_none());
1204    }
1205
1206    #[tokio::test]
1207    async fn test_recover_wal_force_with_apply_error() {
1208        // Test force recovery when apply_operation returns non-IO error
1209        struct CustomErrorOps;
1210
1211        #[async_trait::async_trait]
1212        impl WalDocumentOps for CustomErrorOps {
1213            async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
1214
1215            async fn apply_operation(
1216                &self,
1217                _operation: &EntryType,
1218                _id: &str,
1219                _data: Option<serde_json::Value>,
1220            ) -> Result<()> {
1221                Err(crate::WalError::Serialization("custom error".to_string()))
1222            }
1223        }
1224
1225        let ops = CustomErrorOps;
1226        let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "test"}"#));
1227
1228        let result = replay_wal_entry_force(&entry, &ops).await;
1229        assert!(result.is_err());
1230    }
1231}