1use std::collections::HashMap;
10
11use futures::StreamExt as _;
12use serde::{Deserialize, Serialize};
13use tracing::{debug, warn};
14
15use crate::{EntryType, LogEntry, Result, WalDocumentOps, WalManager};
16
17#[derive(Debug)]
19pub struct WalRecoveryResult {
20 pub recovered_operations: usize,
22 pub skipped_operations: usize,
24 pub failed_operations: usize,
26 pub failures: Vec<WalRecoveryFailure>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct WalRecoveryFailure {
33 pub transaction_id: String,
35 pub document_id: String,
37 pub operation_type: String,
39 pub reason: String,
41}
42
43#[allow(
49 clippy::arithmetic_side_effects,
50 reason = "safe counter increments in recovery"
51)]
52pub async fn recover_from_wal_safe<D>(wal: &WalManager, document_ops: &D) -> Result<WalRecoveryResult>
53where
54 D: WalDocumentOps,
55{
56 document_ops.set_recovery_mode(true);
58
59 let result = async {
60 let mut recovered = 0;
61 let mut skipped = 0;
62 let mut failed = 0;
63 let mut failures = Vec::new();
64
65 let mut applied_operations = HashMap::new(); let stream = wal.stream_entries();
69 let mut pinned_stream = std::pin::pin!(stream);
70 while let Some(entry_result) = pinned_stream.next().await {
71 match entry_result {
72 Ok(entry) => {
73 let key = (
74 entry.document_id_str().to_owned(),
75 entry.transaction_id_str().to_owned(),
76 );
77
78 if applied_operations.contains_key(&key) {
80 skipped += 1;
81 continue;
82 }
83
84 match replay_wal_entry_safe(&entry, document_ops).await {
85 Ok(true) => {
86 recovered += 1;
87 applied_operations.insert(key, true);
88 },
89 Ok(false) => {
90 skipped += 1;
91 applied_operations.insert(key, true);
92 },
93 Err(e) => {
94 failed += 1;
95 failures.push(WalRecoveryFailure {
96 transaction_id: entry.transaction_id_str().to_owned(),
97 document_id: entry.document_id_str().to_owned(),
98 operation_type: format!("{:?}", entry.entry_type),
99 reason: format!("{}", e),
100 });
101 },
102 }
103 },
104 Err(e) => {
105 failed += 1;
106 failures.push(WalRecoveryFailure {
107 transaction_id: "unknown".to_owned(),
108 document_id: "unknown".to_owned(),
109 operation_type: "read".to_owned(),
110 reason: format!("Failed to read WAL entry: {}", e),
111 });
112 },
113 }
114 }
115
116 debug!(
117 "WAL recovery completed: {} recovered, {} skipped, {} failed",
118 recovered, skipped, failed
119 );
120
121 Ok(WalRecoveryResult {
122 recovered_operations: recovered,
123 skipped_operations: skipped,
124 failed_operations: failed,
125 failures,
126 })
127 }
128 .await;
129
130 document_ops.set_recovery_mode(false);
132
133 result
134}
135
136async fn replay_wal_entry_safe<D>(entry: &LogEntry, document_ops: &D) -> Result<bool>
143where
144 D: WalDocumentOps,
145{
146 match entry.entry_type {
147 EntryType::Insert => {
148 if let Some(data_str) = entry.data.as_ref() {
149 let data: serde_json::Value = serde_json::from_str(data_str)
151 .map_err(|e| crate::error::WalError::Serialization(format!("Invalid JSON in WAL insert: {}", e)))?;
152
153 match document_ops.get_document(entry.document_id_str()).await {
155 Ok(Some(_)) => {
156 debug!(
158 "Skipping insert for existing document {}",
159 entry.document_id_str()
160 );
161 Ok(false)
162 },
163 Ok(None) => {
164 document_ops
166 .apply_operation(&EntryType::Insert, entry.document_id_str(), Some(data))
167 .await?;
168 Ok(true)
169 },
170 Err(e) => {
171 Err(e)
173 },
174 }
175 }
176 else {
177 warn!(
178 "WAL insert entry missing data for document {}",
179 entry.document_id_str()
180 );
181 Ok(false)
182 }
183 },
184 EntryType::Update => {
185 if let Some(data_str) = entry.data.as_ref() {
186 let data: serde_json::Value = serde_json::from_str(data_str)
188 .map_err(|e| crate::error::WalError::Serialization(format!("Invalid JSON in WAL update: {}", e)))?;
189
190 match document_ops.get_document(entry.document_id_str()).await {
192 Ok(Some(existing_doc)) => {
193 if existing_doc != data {
195 document_ops
196 .apply_operation(&EntryType::Update, entry.document_id_str(), Some(data))
197 .await?;
198 Ok(true)
199 }
200 else {
201 Ok(false)
202 }
203 },
204 Ok(None) => {
205 warn!(
207 "Skipping update for non-existent document {}",
208 entry.document_id_str()
209 );
210 Ok(false)
211 },
212 Err(e) => {
213 Err(e)
215 },
216 }
217 }
218 else {
219 warn!(
220 "WAL update entry missing data for document {}",
221 entry.document_id_str()
222 );
223 Ok(false)
224 }
225 },
226 EntryType::Delete => {
227 match document_ops.get_document(entry.document_id_str()).await {
229 Ok(Some(_)) => {
230 document_ops
232 .apply_operation(&EntryType::Delete, entry.document_id_str(), None)
233 .await?;
234 Ok(true)
235 },
236 Ok(None) => {
237 debug!(
239 "Skipping delete for non-existent document {}",
240 entry.document_id_str()
241 );
242 Ok(false)
243 },
244 Err(e) => {
245 Err(e)
247 },
248 }
249 },
250 EntryType::Begin | EntryType::Commit | EntryType::Rollback => Ok(false),
252 }
253}
254
255#[allow(
260 clippy::arithmetic_side_effects,
261 reason = "safe counter increments in recovery"
262)]
263pub async fn recover_from_wal_force<D>(wal: &WalManager, document_ops: &D) -> Result<WalRecoveryResult>
264where
265 D: WalDocumentOps,
266{
267 let mut recovered = 0;
268 let mut skipped = 0;
269 let mut failed = 0;
270 let mut failures = Vec::new();
271
272 let stream = wal.stream_entries();
273 let mut pinned_stream = std::pin::pin!(stream);
274 while let Some(entry_result) = pinned_stream.next().await {
275 match entry_result {
276 Ok(entry) => {
277 match replay_wal_entry_force(&entry, document_ops).await {
278 Ok(applied) => {
279 if applied {
280 recovered += 1;
281 }
282 else {
283 skipped += 1;
284 }
285 },
286 Err(e) => {
287 failed += 1;
288 failures.push(WalRecoveryFailure {
289 transaction_id: entry.transaction_id_str().to_owned(),
290 document_id: entry.document_id_str().to_owned(),
291 operation_type: format!("{:?}", entry.entry_type),
292 reason: format!("{}", e),
293 });
294 },
295 }
296 },
297 Err(e) => {
298 failed += 1;
299 failures.push(WalRecoveryFailure {
300 transaction_id: "unknown".to_owned(),
301 document_id: "unknown".to_owned(),
302 operation_type: "read".to_owned(),
303 reason: format!("Failed to read WAL entry: {}", e),
304 });
305 },
306 }
307 }
308
309 debug!(
310 "Forced WAL recovery completed: {} recovered, {} skipped, {} failed",
311 recovered, skipped, failed
312 );
313
314 Ok(WalRecoveryResult {
315 recovered_operations: recovered,
316 skipped_operations: skipped,
317 failed_operations: failed,
318 failures,
319 })
320}
321
322async fn replay_wal_entry_force<D>(entry: &LogEntry, document_ops: &D) -> Result<bool>
324where
325 D: WalDocumentOps,
326{
327 match entry.entry_type {
328 EntryType::Insert | EntryType::Update => {
329 if let Some(data_str) = entry.data.as_ref() {
330 let data: serde_json::Value = serde_json::from_str(data_str)
331 .map_err(|e| crate::error::WalError::Serialization(format!("Invalid JSON in WAL entry: {}", e)))?;
332
333 document_ops
335 .apply_operation(&entry.entry_type, entry.document_id_str(), Some(data))
336 .await?;
337 Ok(true)
338 }
339 else {
340 Ok(false)
341 }
342 },
343 EntryType::Delete => {
344 match document_ops
346 .apply_operation(&EntryType::Delete, entry.document_id_str(), None)
347 .await
348 {
349 Ok(_) => Ok(true),
350 Err(crate::error::WalError::Io {
351 ..
352 }) => Ok(false), Err(e) => Err(e),
354 }
355 },
356 EntryType::Begin | EntryType::Commit | EntryType::Rollback => Ok(false),
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use std::{collections::HashMap, sync::Mutex};
364
365 use super::*;
366 use crate::{EntryType, LogEntry};
367
368 struct MockDocumentOps {
370 documents: Mutex<HashMap<String, serde_json::Value>>,
371 }
372
373 impl MockDocumentOps {
374 fn new() -> Self {
375 Self {
376 documents: Mutex::new(HashMap::new()),
377 }
378 }
379 }
380
381 #[async_trait::async_trait]
382 impl WalDocumentOps for MockDocumentOps {
383 async fn get_document(&self, id: &str) -> Result<Option<serde_json::Value>> {
384 Ok(self.documents.lock().unwrap().get(id).cloned())
385 }
386
387 async fn apply_operation(
388 &self,
389 operation: &EntryType,
390 id: &str,
391 data: Option<serde_json::Value>,
392 ) -> Result<()> {
393 let mut docs = self.documents.lock().unwrap();
394 match operation {
395 EntryType::Insert | EntryType::Update => {
396 if let Some(data) = data {
397 docs.insert(id.to_string(), data);
398 }
399 },
400 EntryType::Delete => {
401 docs.remove(id);
402 },
403 _ => {}, }
405 Ok(())
406 }
407 }
408
409 fn create_test_entry(entry_type: EntryType, doc_id: &str, data: Option<&str>) -> LogEntry {
410 use crate::entry::{FixedBytes256, FixedBytes32};
411 LogEntry {
412 entry_type,
413 collection: FixedBytes256::from(b"test" as &[u8]),
414 document_id: FixedBytes256::from(doc_id.as_bytes()),
415 transaction_id: FixedBytes32::from(b"txn-123" as &[u8]),
416 data: data.map(|s| s.to_string()),
417 timestamp: chrono::Utc::now().timestamp_millis() as u64,
418 }
419 }
420
421 #[tokio::test]
422 async fn test_replay_wal_entry_safe_insert_new_document() {
423 let ops = MockDocumentOps::new();
424 let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "test"}"#));
425
426 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
427 assert!(result);
428
429 let doc = ops.get_document("doc1").await.unwrap();
430 assert_eq!(doc.unwrap()["name"], "test");
431 }
432
433 #[tokio::test]
434 async fn test_replay_wal_entry_safe_insert_existing_document() {
435 let ops = MockDocumentOps::new();
436 ops.apply_operation(
438 &EntryType::Insert,
439 "doc1",
440 Some(serde_json::json!({"name": "existing"})),
441 )
442 .await
443 .unwrap();
444
445 let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "duplicate"}"#));
446
447 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
448 assert!(!result); let doc = ops.get_document("doc1").await.unwrap();
451 assert_eq!(doc.unwrap()["name"], "existing"); }
453
454 #[tokio::test]
455 async fn test_replay_wal_entry_safe_update_existing_document() {
456 let ops = MockDocumentOps::new();
457 ops.apply_operation(
459 &EntryType::Insert,
460 "doc1",
461 Some(serde_json::json!({"name": "old"})),
462 )
463 .await
464 .unwrap();
465
466 let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"name": "updated"}"#));
467
468 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
469 assert!(result);
470
471 let doc = ops.get_document("doc1").await.unwrap();
472 assert_eq!(doc.unwrap()["name"], "updated");
473 }
474
475 #[tokio::test]
476 async fn test_replay_wal_entry_safe_update_nonexistent_document() {
477 let ops = MockDocumentOps::new();
478 let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"name": "updated"}"#));
479
480 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
481 assert!(!result); }
483
484 #[tokio::test]
485 async fn test_replay_wal_entry_safe_delete_existing_document() {
486 let ops = MockDocumentOps::new();
487 ops.apply_operation(
489 &EntryType::Insert,
490 "doc1",
491 Some(serde_json::json!({"name": "test"})),
492 )
493 .await
494 .unwrap();
495
496 let entry = create_test_entry(EntryType::Delete, "doc1", None);
497
498 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
499 assert!(result);
500
501 let doc = ops.get_document("doc1").await.unwrap();
502 assert!(doc.is_none());
503 }
504
505 #[tokio::test]
506 async fn test_replay_wal_entry_safe_delete_nonexistent_document() {
507 let ops = MockDocumentOps::new();
508 let entry = create_test_entry(EntryType::Delete, "doc1", None);
509
510 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
511 assert!(!result); }
513
514 #[tokio::test]
515 async fn test_replay_wal_entry_safe_transaction_control() {
516 let ops = MockDocumentOps::new();
517 let entry = create_test_entry(EntryType::Begin, "doc1", None);
518
519 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
520 assert!(!result); }
522
523 #[tokio::test]
524 async fn test_replay_wal_entry_safe_invalid_json() {
525 let ops = MockDocumentOps::new();
526 let entry = create_test_entry(EntryType::Insert, "doc1", Some("invalid json"));
527
528 let result = replay_wal_entry_safe(&entry, &ops).await;
529 assert!(result.is_err());
530 }
531
532 #[tokio::test]
535 async fn test_recover_wal_safe_stream_error_handling() {
536 use tempfile::tempdir;
538
539 use crate::{EntryType, LogEntry, WalConfig, WalManager};
540
541 let temp_dir = tempdir().unwrap();
542 let wal_path = temp_dir.path().join("test_stream_error.wal");
543
544 let wal = WalManager::new(wal_path.clone(), WalConfig::default())
545 .await
546 .unwrap();
547
548 wal.write_entry(LogEntry::new(
550 EntryType::Insert,
551 "users".to_string(),
552 "user-1".to_string(),
553 Some(serde_json::json!({"name": "Alice"})),
554 ))
555 .await
556 .unwrap();
557
558 let ops = MockDocumentOps::new();
559 let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
560
561 assert_eq!(result.recovered_operations, 1);
563 assert_eq!(result.failed_operations, 0);
564 }
565
566 #[tokio::test]
567 async fn test_replay_wal_safe_doc_read_error() {
568 struct ErrorDocumentOps;
570
571 #[async_trait::async_trait]
572 impl WalDocumentOps for ErrorDocumentOps {
573 async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> {
574 Err(crate::WalError::Io(std::io::Error::new(
575 std::io::ErrorKind::Other,
576 "test error",
577 )))
578 }
579
580 async fn apply_operation(
581 &self,
582 _operation: &EntryType,
583 _id: &str,
584 _data: Option<serde_json::Value>,
585 ) -> Result<()> {
586 Ok(())
587 }
588 }
589
590 let ops = ErrorDocumentOps;
591 let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"name": "test"}"#));
592
593 let result = replay_wal_entry_safe(&entry, &ops).await;
594 assert!(result.is_err());
595 }
596
597 #[tokio::test]
598 async fn test_recover_wal_safe_many_duplicates() {
599 use tempfile::tempdir;
601
602 use crate::{EntryType, LogEntry, WalConfig, WalManager};
603
604 let temp_dir = tempdir().unwrap();
605 let wal_path = temp_dir.path().join("test_many_duplicates.wal");
606
607 let wal = WalManager::new(wal_path.clone(), WalConfig::default())
608 .await
609 .unwrap();
610
611 for _ in 0 .. 5 {
613 wal.write_entry(LogEntry::new(
614 EntryType::Insert,
615 "users".to_string(),
616 "user-1".to_string(),
617 Some(serde_json::json!({"name": "Alice"})),
618 ))
619 .await
620 .unwrap();
621 }
622
623 let ops = MockDocumentOps::new();
624 let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
625
626 assert_eq!(result.recovered_operations, 1);
628 assert_eq!(result.skipped_operations, 4);
629 }
630
631 #[tokio::test]
632 async fn test_replay_wal_force_io_error_on_delete() {
633 struct IoErrorDocumentOps;
635
636 #[async_trait::async_trait]
637 impl WalDocumentOps for IoErrorDocumentOps {
638 async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
639
640 async fn apply_operation(
641 &self,
642 operation: &EntryType,
643 _id: &str,
644 _data: Option<serde_json::Value>,
645 ) -> Result<()> {
646 if *operation == EntryType::Delete {
647 Err(crate::WalError::Io(std::io::Error::new(
649 std::io::ErrorKind::PermissionDenied,
650 "permission denied",
651 )))
652 }
653 else {
654 Ok(())
655 }
656 }
657 }
658
659 let ops = IoErrorDocumentOps;
660 let entry = create_test_entry(EntryType::Delete, "doc1", None);
661
662 let result = replay_wal_entry_force(&entry, &ops).await;
664 assert!(result.is_ok() || result.unwrap_err().to_string().contains("permission"));
665 }
666
667 #[tokio::test]
668 async fn test_wal_recovery_failure_special_chars() {
669 let failure = WalRecoveryFailure {
671 transaction_id: "txn-特殊字符".to_string(),
672 document_id: "doc-with-quotes".to_string(),
673 operation_type: "Insert".to_string(),
674 reason: "Error with\nnewlines\tand tabs".to_string(),
675 };
676
677 let json = serde_json::to_string(&failure).unwrap();
679 let deserialized: WalRecoveryFailure = serde_json::from_str(&json).unwrap();
680
681 assert_eq!(failure.transaction_id, deserialized.transaction_id);
682 assert_eq!(failure.document_id, deserialized.document_id);
683 assert_eq!(failure.reason, deserialized.reason);
684 }
685
686 #[tokio::test]
687 async fn test_recover_wal_safe_mixed_ops() {
688 use tempfile::tempdir;
690
691 use crate::{EntryType, LogEntry, WalConfig, WalManager};
692
693 let temp_dir = tempdir().unwrap();
694 let wal_path = temp_dir.path().join("test_mixed.wal");
695
696 let wal = WalManager::new(wal_path.clone(), WalConfig::default())
697 .await
698 .unwrap();
699
700 wal.write_entry(LogEntry::new(
702 EntryType::Insert,
703 "users".to_string(),
704 "user-1".to_string(),
705 Some(serde_json::json!({"name": "Alice"})),
706 ))
707 .await
708 .unwrap();
709
710 wal.write_entry(LogEntry::new(
711 EntryType::Insert,
712 "users".to_string(),
713 "user-2".to_string(),
714 Some(serde_json::json!({"name": "Bob"})),
715 ))
716 .await
717 .unwrap();
718
719 wal.write_entry(LogEntry::new(
720 EntryType::Delete,
721 "users".to_string(),
722 "user-1".to_string(),
723 None,
724 ))
725 .await
726 .unwrap();
727
728 let ops = MockDocumentOps::new();
729 let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
730
731 assert_eq!(result.recovered_operations, 3);
733 }
734
735 #[tokio::test]
736 async fn test_recover_wal_force_txn_boundaries() {
737 use tempfile::tempdir;
739
740 use crate::{EntryType, LogEntry, WalConfig, WalManager};
741
742 let temp_dir = tempdir().unwrap();
743 let wal_path = temp_dir.path().join("test_txn_boundaries.wal");
744
745 let wal = WalManager::new(wal_path.clone(), WalConfig::default())
746 .await
747 .unwrap();
748
749 wal.write_entry(LogEntry::new(
751 EntryType::Begin,
752 "users".to_string(),
753 "txn-100".to_string(),
754 None,
755 ))
756 .await
757 .unwrap();
758
759 wal.write_entry(LogEntry::new(
760 EntryType::Insert,
761 "users".to_string(),
762 "user-new".to_string(),
763 Some(serde_json::json!({"name": "New User"})),
764 ))
765 .await
766 .unwrap();
767
768 wal.write_entry(LogEntry::new(
769 EntryType::Commit,
770 "users".to_string(),
771 "txn-100".to_string(),
772 None,
773 ))
774 .await
775 .unwrap();
776
777 let ops = MockDocumentOps::new();
778 let result = recover_from_wal_force(&wal, &ops).await.unwrap();
779
780 assert_eq!(result.recovered_operations, 1);
782 assert_eq!(result.skipped_operations, 2);
783 }
784
785 #[tokio::test]
786 async fn test_replay_wal_safe_insert_fail_error() {
787 struct FailOnInsertDocumentOps;
789
790 #[async_trait::async_trait]
791 impl WalDocumentOps for FailOnInsertDocumentOps {
792 async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
793
794 async fn apply_operation(
795 &self,
796 operation: &EntryType,
797 id: &str,
798 _data: Option<serde_json::Value>,
799 ) -> Result<()> {
800 if *operation == EntryType::Insert && id == "fail-doc" {
801 Err(crate::WalError::Io(std::io::Error::new(
802 std::io::ErrorKind::Other,
803 "insert failed",
804 )))
805 }
806 else {
807 Ok(())
808 }
809 }
810 }
811
812 let ops = FailOnInsertDocumentOps;
813 let entry = create_test_entry(EntryType::Insert, "fail-doc", Some(r#"{"name": "test"}"#));
814
815 let result = replay_wal_entry_safe(&entry, &ops).await;
816 assert!(result.is_err());
817 }
818
819 #[tokio::test]
820 async fn test_wal_recovery_result_zero_check() {
821 let result = WalRecoveryResult {
823 recovered_operations: 0,
824 skipped_operations: 0,
825 failed_operations: 0,
826 failures: vec![],
827 };
828
829 assert_eq!(result.recovered_operations, 0);
830 assert_eq!(result.skipped_operations, 0);
831 assert_eq!(result.failed_operations, 0);
832 assert!(result.failures.is_empty());
833 }
834
835 #[tokio::test]
836 async fn test_recover_wal_safe_partial_update() {
837 use tempfile::tempdir;
839
840 use crate::{EntryType, LogEntry, WalConfig, WalManager};
841
842 let temp_dir = tempdir().unwrap();
843 let wal_path = temp_dir.path().join("test_partial_update.wal");
844
845 let wal = WalManager::new(wal_path.clone(), WalConfig::default())
846 .await
847 .unwrap();
848
849 wal.write_entry(LogEntry::new(
851 EntryType::Insert,
852 "users".to_string(),
853 "user-1".to_string(),
854 Some(serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"})),
855 ))
856 .await
857 .unwrap();
858
859 wal.write_entry(LogEntry::new(
861 EntryType::Update,
862 "users".to_string(),
863 "user-1".to_string(),
864 Some(serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"})), ))
866 .await
867 .unwrap();
868
869 let ops = MockDocumentOps::new();
870 let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
871
872 assert_eq!(result.recovered_operations, 1);
874 assert_eq!(result.skipped_operations, 1);
875 }
876
877 #[tokio::test]
878 async fn test_replay_wal_entry_safe_insert_no_data() {
879 let ops = MockDocumentOps::new();
881 let entry = create_test_entry(EntryType::Insert, "doc1", None);
882
883 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
884 assert!(!result); }
886
887 #[tokio::test]
888 async fn test_replay_wal_entry_safe_update_no_data() {
889 let ops = MockDocumentOps::new();
891 let entry = create_test_entry(EntryType::Update, "doc1", None);
892
893 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
894 assert!(!result); }
896
897 #[tokio::test]
898 async fn test_replay_wal_entry_safe_commit() {
899 let ops = MockDocumentOps::new();
901 let entry = create_test_entry(EntryType::Commit, "doc1", None);
902
903 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
904 assert!(!result);
905 }
906
907 #[tokio::test]
908 async fn test_replay_wal_entry_safe_rollback() {
909 let ops = MockDocumentOps::new();
911 let entry = create_test_entry(EntryType::Rollback, "doc1", None);
912
913 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
914 assert!(!result);
915 }
916
917 #[tokio::test]
918 async fn test_replay_wal_entry_force_insert_no_data() {
919 let ops = MockDocumentOps::new();
921 let entry = create_test_entry(EntryType::Insert, "doc1", None);
922
923 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
924 assert!(!result); }
926
927 #[tokio::test]
928 async fn test_replay_wal_entry_force_begin() {
929 let ops = MockDocumentOps::new();
931 let entry = create_test_entry(EntryType::Begin, "doc1", None);
932
933 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
934 assert!(!result);
935 }
936
937 #[tokio::test]
938 async fn test_replay_wal_entry_force_commit() {
939 let ops = MockDocumentOps::new();
941 let entry = create_test_entry(EntryType::Commit, "doc1", None);
942
943 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
944 assert!(!result);
945 }
946
947 #[tokio::test]
948 async fn test_replay_wal_entry_force_rollback() {
949 let ops = MockDocumentOps::new();
951 let entry = create_test_entry(EntryType::Rollback, "doc1", None);
952
953 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
954 assert!(!result);
955 }
956
957 #[tokio::test]
958 async fn test_replay_wal_entry_force_update() {
959 let ops = MockDocumentOps::new();
961 ops.apply_operation(
962 &EntryType::Insert,
963 "doc1",
964 Some(serde_json::json!({"version": 1})),
965 )
966 .await
967 .unwrap();
968
969 let entry = create_test_entry(EntryType::Update, "doc1", Some(r#"{"version": 2}"#));
970
971 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
972 assert!(result);
973
974 let doc = ops.get_document("doc1").await.unwrap();
975 assert_eq!(doc.unwrap()["version"], 2);
976 }
977
978 #[tokio::test]
979 async fn test_recover_wal_force_no_errors() {
980 use tempfile::tempdir;
982
983 use crate::{EntryType, LogEntry, WalConfig, WalManager};
984
985 let temp_dir = tempdir().unwrap();
986 let wal_path = temp_dir.path().join("test_force_no_errors.wal");
987
988 let wal = WalManager::new(wal_path.clone(), WalConfig::default())
989 .await
990 .unwrap();
991
992 wal.write_entry(LogEntry::new(
994 EntryType::Insert,
995 "users".to_string(),
996 "user-1".to_string(),
997 Some(serde_json::json!({"name": "Alice"})),
998 ))
999 .await
1000 .unwrap();
1001
1002 wal.write_entry(LogEntry::new(
1003 EntryType::Update,
1004 "users".to_string(),
1005 "user-1".to_string(),
1006 Some(serde_json::json!({"name": "Alice Updated"})),
1007 ))
1008 .await
1009 .unwrap();
1010
1011 wal.write_entry(LogEntry::new(
1012 EntryType::Delete,
1013 "users".to_string(),
1014 "user-1".to_string(),
1015 None,
1016 ))
1017 .await
1018 .unwrap();
1019
1020 let ops = MockDocumentOps::new();
1021 let result = recover_from_wal_force(&wal, &ops).await.unwrap();
1022
1023 assert_eq!(result.recovered_operations, 3);
1024 assert_eq!(result.failed_operations, 0);
1025 }
1026
1027 #[tokio::test]
1028 async fn test_recover_wal_safe_error_in_apply() {
1029 struct FailingDocumentOps {
1031 fail_on_insert: bool,
1032 }
1033
1034 #[async_trait::async_trait]
1035 impl WalDocumentOps for FailingDocumentOps {
1036 async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
1037
1038 async fn apply_operation(
1039 &self,
1040 operation: &EntryType,
1041 _id: &str,
1042 _data: Option<serde_json::Value>,
1043 ) -> Result<()> {
1044 if self.fail_on_insert && *operation == EntryType::Insert {
1045 Err(crate::WalError::Io(std::io::Error::new(
1046 std::io::ErrorKind::Other,
1047 "apply failed",
1048 )))
1049 }
1050 else {
1051 Ok(())
1052 }
1053 }
1054 }
1055
1056 let ops = FailingDocumentOps {
1057 fail_on_insert: true,
1058 };
1059 let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "test"}"#));
1060
1061 let result = replay_wal_entry_safe(&entry, &ops).await;
1062 assert!(result.is_err());
1063 }
1064
1065 #[tokio::test]
1066 async fn test_recover_wal_safe_all_failures() {
1067 use tempfile::tempdir;
1069
1070 use crate::{EntryType, LogEntry, WalConfig, WalManager};
1071
1072 let temp_dir = tempdir().unwrap();
1073 let wal_path = temp_dir.path().join("test_all_failures.wal");
1074
1075 let wal = WalManager::new(wal_path.clone(), WalConfig::default())
1076 .await
1077 .unwrap();
1078
1079 wal.write_entry(LogEntry::new(
1080 EntryType::Insert,
1081 "users".to_string(),
1082 "user-1".to_string(),
1083 Some(serde_json::json!({"name": "Alice"})),
1084 ))
1085 .await
1086 .unwrap();
1087
1088 struct FailAllDocumentOps;
1089
1090 #[async_trait::async_trait]
1091 impl WalDocumentOps for FailAllDocumentOps {
1092 async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> {
1093 Err(crate::WalError::Io(std::io::Error::new(
1094 std::io::ErrorKind::Other,
1095 "always fail",
1096 )))
1097 }
1098
1099 async fn apply_operation(
1100 &self,
1101 _operation: &EntryType,
1102 _id: &str,
1103 _data: Option<serde_json::Value>,
1104 ) -> Result<()> {
1105 Ok(())
1106 }
1107 }
1108
1109 let ops = FailAllDocumentOps;
1110 let result = recover_from_wal_safe(&wal, &ops).await.unwrap();
1111
1112 assert_eq!(result.failed_operations, 1);
1113 assert!(!result.failures.is_empty());
1114 }
1115
1116 #[tokio::test]
1117 async fn test_replay_wal_entry_safe_update_same_data() {
1118 let ops = MockDocumentOps::new();
1120 let data = serde_json::json!({"name": "test", "age": 25});
1121
1122 ops.apply_operation(&EntryType::Insert, "doc1", Some(data.clone()))
1123 .await
1124 .unwrap();
1125
1126 let entry = create_test_entry(
1127 EntryType::Update,
1128 "doc1",
1129 Some(r#"{"name": "test", "age": 25}"#),
1130 );
1131
1132 let result = replay_wal_entry_safe(&entry, &ops).await.unwrap();
1133 assert!(!result); }
1135
1136 #[tokio::test]
1137 async fn test_replay_wal_entry_force_delete_nonexistent() {
1138 struct DeleteFailsDocumentOps;
1140
1141 #[async_trait::async_trait]
1142 impl WalDocumentOps for DeleteFailsDocumentOps {
1143 async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
1144
1145 async fn apply_operation(
1146 &self,
1147 _operation: &EntryType,
1148 _id: &str,
1149 _data: Option<serde_json::Value>,
1150 ) -> Result<()> {
1151 Err(crate::WalError::Io(std::io::Error::new(
1152 std::io::ErrorKind::NotFound,
1153 "not found",
1154 )))
1155 }
1156 }
1157
1158 let ops = DeleteFailsDocumentOps;
1159 let entry = create_test_entry(EntryType::Delete, "doc1", None);
1160
1161 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
1162 assert!(!result); }
1164
1165 #[tokio::test]
1166 async fn test_recover_wal_safe_update_invalid_json() {
1167 let ops = MockDocumentOps::new();
1169 let entry = create_test_entry(EntryType::Update, "doc1", Some("not valid json"));
1170
1171 let result = replay_wal_entry_safe(&entry, &ops).await;
1172 assert!(result.is_err());
1173 }
1174
1175 #[tokio::test]
1176 async fn test_recover_wal_force_update_no_data() {
1177 let ops = MockDocumentOps::new();
1179 let entry = create_test_entry(EntryType::Update, "doc1", None);
1180
1181 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
1182 assert!(!result);
1183 }
1184
1185 #[tokio::test]
1186 async fn test_recover_wal_force_delete_success() {
1187 let ops = MockDocumentOps::new();
1189 ops.apply_operation(
1190 &EntryType::Insert,
1191 "doc1",
1192 Some(serde_json::json!({"name": "test"})),
1193 )
1194 .await
1195 .unwrap();
1196
1197 let entry = create_test_entry(EntryType::Delete, "doc1", None);
1198
1199 let result = replay_wal_entry_force(&entry, &ops).await.unwrap();
1200 assert!(result);
1201
1202 let doc = ops.get_document("doc1").await.unwrap();
1203 assert!(doc.is_none());
1204 }
1205
1206 #[tokio::test]
1207 async fn test_recover_wal_force_with_apply_error() {
1208 struct CustomErrorOps;
1210
1211 #[async_trait::async_trait]
1212 impl WalDocumentOps for CustomErrorOps {
1213 async fn get_document(&self, _id: &str) -> Result<Option<serde_json::Value>> { Ok(None) }
1214
1215 async fn apply_operation(
1216 &self,
1217 _operation: &EntryType,
1218 _id: &str,
1219 _data: Option<serde_json::Value>,
1220 ) -> Result<()> {
1221 Err(crate::WalError::Serialization("custom error".to_string()))
1222 }
1223 }
1224
1225 let ops = CustomErrorOps;
1226 let entry = create_test_entry(EntryType::Insert, "doc1", Some(r#"{"name": "test"}"#));
1227
1228 let result = replay_wal_entry_force(&entry, &ops).await;
1229 assert!(result.is_err());
1230 }
1231}