Skip to main content

sentinel_wal/
verification.rs

1//! WAL verification functionality.
2//!
3//! This module provides verification of WAL consistency and data integrity.
4//! Unlike the previous flawed approach, this verifies:
5//! 1. WAL internal consistency (operations are valid sequences)
6//! 2. Final WAL state matches current disk state
7//! 3. No corrupted or invalid entries exist
8
9use std::collections::HashMap;
10
11use futures::StreamExt as _;
12use serde::{Deserialize, Serialize};
13
14use crate::{EntryType, LogEntry, Result, WalDocumentOps, WalManager};
15
16/// Issues found during WAL verification
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct WalVerificationIssue {
19    /// Transaction ID where the issue occurred
20    pub transaction_id: String,
21    /// Document ID affected
22    pub document_id:    String,
23    /// Description of the issue
24    pub description:    String,
25    /// Whether this is a critical issue
26    pub is_critical:    bool,
27}
28
29/// Result of WAL verification
30#[derive(Debug)]
31pub struct WalVerificationResult {
32    /// Issues found during verification
33    pub issues:             Vec<WalVerificationIssue>,
34    /// Whether verification passed (no critical issues)
35    pub passed:             bool,
36    /// Number of WAL entries processed
37    pub entries_processed:  u64,
38    /// Number of documents that would be affected by WAL replay
39    pub affected_documents: u64,
40}
41
42/// Verify WAL consistency and final state against disk
43///
44/// This function:
45/// 1. Replays all WAL entries to compute final expected states
46/// 2. Compares final WAL states with actual disk states
47/// 3. Checks for WAL internal consistency
48#[allow(clippy::arithmetic_side_effects, reason = "counter increment in loop")]
49pub async fn verify_wal_consistency<D>(wal: &WalManager, document_ops: &D) -> Result<WalVerificationResult>
50where
51    D: WalDocumentOps + Sync,
52{
53    let mut issues = Vec::new();
54    let mut wal_states = HashMap::new(); // document_id -> final_data
55    let mut active_transactions = HashMap::new(); // txn_id -> operations
56    let mut entries_processed = 0;
57
58    let stream = wal.stream_entries();
59    let mut pinned_stream = std::pin::pin!(stream);
60    while let Some(entry_result) = pinned_stream.next().await {
61        match entry_result {
62            Ok(entry) => {
63                entries_processed += 1;
64                if let Some(issue) =
65                    verify_wal_entry_consistency(&entry, &mut wal_states, &mut active_transactions).await?
66                {
67                    issues.push(issue);
68                }
69            },
70            Err(e) => {
71                issues.push(WalVerificationIssue {
72                    transaction_id: "unknown".to_owned(),
73                    document_id:    "unknown".to_owned(),
74                    description:    format!("Failed to read WAL entry: {}", e),
75                    is_critical:    true,
76                });
77            },
78        }
79    }
80
81    // Check that final WAL states match disk states
82    for doc_id in wal_states.keys() {
83        match document_ops.get_document(doc_id).await {
84            Ok(Some(existing_doc)) => {
85                // Compare WAL state with disk state
86                if let Some(wal_doc) = wal_states.get(doc_id) &&
87                    existing_doc != *wal_doc
88                {
89                    issues.push(WalVerificationIssue {
90                        transaction_id: "final_check".to_owned(),
91                        document_id:    doc_id.clone(),
92                        description:    format!("Document {} data mismatch between WAL and disk", doc_id),
93                        is_critical:    true,
94                    });
95                }
96            },
97            Ok(None) => {
98                issues.push(WalVerificationIssue {
99                    transaction_id: "final_check".to_owned(),
100                    document_id:    doc_id.clone(),
101                    description:    format!("Document {} exists in WAL but not on disk", doc_id),
102                    is_critical:    true,
103                });
104            },
105            Err(e) => {
106                issues.push(WalVerificationIssue {
107                    transaction_id: "final_check".to_owned(),
108                    document_id:    doc_id.clone(),
109                    description:    format!("Failed to read document {} from disk: {}", doc_id, e),
110                    is_critical:    true,
111                });
112            },
113        }
114    }
115
116    let passed = !issues.iter().any(|issue| issue.is_critical);
117
118    Ok(WalVerificationResult {
119        issues,
120        passed,
121        entries_processed: entries_processed as u64,
122        affected_documents: wal_states.len() as u64,
123    })
124}
125
126/// Verify a single WAL entry for consistency
127async fn verify_wal_entry_consistency(
128    entry: &LogEntry,
129    wal_states: &mut HashMap<String, serde_json::Value>,
130    active_transactions: &mut HashMap<String, Vec<LogEntry>>,
131) -> Result<Option<WalVerificationIssue>> {
132    let txn_id = entry.transaction_id_str();
133    let doc_id = entry.document_id_str();
134
135    // Track transaction operations
136    active_transactions
137        .entry(txn_id.to_owned())
138        .or_insert_with(Vec::new)
139        .push(entry.clone());
140
141    match entry.entry_type {
142        EntryType::Begin => {
143            // Transaction begin - should not have data
144            if entry.data.is_some() {
145                return Ok(Some(WalVerificationIssue {
146                    transaction_id: txn_id.to_owned(),
147                    document_id:    doc_id.to_owned(),
148                    description:    "Transaction begin entry should not contain data".to_owned(),
149                    is_critical:    false,
150                }));
151            }
152        },
153        EntryType::Insert => {
154            if let Some(data_str) = entry.data.as_ref() {
155                match serde_json::from_str(data_str) {
156                    Ok(data) => {
157                        // Check if document already exists in WAL state
158                        if wal_states.contains_key(doc_id) {
159                            return Ok(Some(WalVerificationIssue {
160                                transaction_id: txn_id.to_owned(),
161                                document_id:    doc_id.to_owned(),
162                                description:    format!("Document {} already exists in WAL state", doc_id),
163                                is_critical:    true,
164                            }));
165                        }
166                        wal_states.insert(doc_id.to_owned(), data);
167                    },
168                    Err(e) => {
169                        return Ok(Some(WalVerificationIssue {
170                            transaction_id: txn_id.to_owned(),
171                            document_id:    doc_id.to_owned(),
172                            description:    format!("Invalid JSON data in insert operation: {}", e),
173                            is_critical:    true,
174                        }));
175                    },
176                }
177            }
178            else {
179                return Ok(Some(WalVerificationIssue {
180                    transaction_id: txn_id.to_owned(),
181                    document_id:    doc_id.to_owned(),
182                    description:    "Insert operation missing data".to_owned(),
183                    is_critical:    true,
184                }));
185            }
186        },
187        EntryType::Update => {
188            if let Some(data_str) = entry.data.as_ref() {
189                match serde_json::from_str(data_str) {
190                    Ok(data) => {
191                        // For partial WAL verification, updates can reference documents
192                        // that were inserted in earlier WAL segments not being verified.
193                        // We allow this but track it as a potential issue.
194                        // Note: The insert below is common to both branches
195                        wal_states.insert(doc_id.to_owned(), data);
196                    },
197                    Err(e) => {
198                        return Ok(Some(WalVerificationIssue {
199                            transaction_id: txn_id.to_owned(),
200                            document_id:    doc_id.to_owned(),
201                            description:    format!("Invalid JSON data in update operation: {}", e),
202                            is_critical:    true,
203                        }));
204                    },
205                }
206            }
207            else {
208                return Ok(Some(WalVerificationIssue {
209                    transaction_id: txn_id.to_owned(),
210                    document_id:    doc_id.to_owned(),
211                    description:    "Update operation missing data".to_owned(),
212                    is_critical:    true,
213                }));
214            }
215        },
216        EntryType::Delete => {
217            // For partial WAL verification, deletes can reference documents
218            // that were inserted in earlier WAL segments not being verified.
219            // We allow this - the document may exist on disk.
220            wal_states.remove(doc_id);
221        },
222        EntryType::Commit => {
223            // Transaction commit - validate the transaction
224            if let Some(ops) = active_transactions.get(txn_id) &&
225                let Some(issue) = verify_transaction_consistency(ops).await?
226            {
227                return Ok(Some(issue));
228            }
229            active_transactions.remove(txn_id);
230        },
231        EntryType::Rollback => {
232            // Transaction rollback - undo all operations in this transaction
233            if let Some(ops) = active_transactions.remove(txn_id) {
234                for op in ops.iter().rev() {
235                    match op.entry_type {
236                        EntryType::Insert => {
237                            wal_states.remove(op.document_id_str());
238                        },
239                        EntryType::Update => {
240                            // For rollback, we'd need to track previous states
241                            // For now, mark as issue since we can't reliably rollback
242                            return Ok(Some(WalVerificationIssue {
243                                transaction_id: txn_id.to_owned(),
244                                document_id:    doc_id.to_owned(),
245                                description:    "Transaction rollback not fully supported in verification".to_owned(),
246                                is_critical:    false,
247                            }));
248                        },
249                        EntryType::Delete => {
250                            // For delete rollback, we'd need to restore previous state
251                            // Mark as issue
252                            return Ok(Some(WalVerificationIssue {
253                                transaction_id: txn_id.to_owned(),
254                                document_id:    doc_id.to_owned(),
255                                description:    "Transaction rollback for delete not supported".to_owned(),
256                                is_critical:    false,
257                            }));
258                        },
259                        EntryType::Begin | EntryType::Commit | EntryType::Rollback => {},
260                    }
261                }
262            }
263        },
264    }
265
266    Ok(None)
267}
268
269/// Verify transaction consistency
270#[allow(
271    clippy::expect_used,
272    reason = "ops is guaranteed to be non-empty in this context"
273)]
274async fn verify_transaction_consistency(ops: &[LogEntry]) -> Result<Option<WalVerificationIssue>> {
275    // Check that transaction has proper begin/commit structure
276    let has_begin = ops.iter().any(|op| op.entry_type == EntryType::Begin);
277    let has_commit = ops.iter().any(|op| op.entry_type == EntryType::Commit);
278
279    if !has_begin {
280        let first_op = ops.first().expect("ops is non-empty");
281        return Ok(Some(WalVerificationIssue {
282            transaction_id: first_op.transaction_id_str().to_owned(),
283            document_id:    "transaction".to_owned(),
284            description:    "Transaction missing begin entry".to_owned(),
285            is_critical:    false,
286        }));
287    }
288
289    if !has_commit {
290        let first_op = ops.first().expect("ops is non-empty");
291        return Ok(Some(WalVerificationIssue {
292            transaction_id: first_op.transaction_id_str().to_owned(),
293            document_id:    "transaction".to_owned(),
294            description:    "Transaction missing commit entry".to_owned(),
295            is_critical:    false,
296        }));
297    }
298
299    Ok(None)
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::{EntryType, LogEntry};
306
307    fn create_test_entry(entry_type: EntryType, doc_id: &str, txn_id: &str) -> LogEntry {
308        use crate::entry::{FixedBytes256, FixedBytes32};
309        LogEntry {
310            entry_type,
311            collection: FixedBytes256::from(b"test" as &[u8]),
312            document_id: FixedBytes256::from(doc_id.as_bytes()),
313            transaction_id: FixedBytes32::from(txn_id.as_bytes()),
314            data: None,
315            timestamp: chrono::Utc::now().timestamp_millis() as u64,
316        }
317    }
318
319    #[tokio::test]
320    async fn test_verify_transaction_consistency_valid() {
321        let ops = vec![
322            create_test_entry(EntryType::Begin, "doc1", "txn1"),
323            create_test_entry(EntryType::Insert, "doc1", "txn1"),
324            create_test_entry(EntryType::Commit, "doc1", "txn1"),
325        ];
326
327        let result = verify_transaction_consistency(&ops).await.unwrap();
328        assert!(result.is_none());
329    }
330
331    #[tokio::test]
332    async fn test_verify_transaction_consistency_missing_begin() {
333        let ops = vec![
334            create_test_entry(EntryType::Insert, "doc1", "txn1"),
335            create_test_entry(EntryType::Commit, "doc1", "txn1"),
336        ];
337
338        let result = verify_transaction_consistency(&ops).await.unwrap();
339        assert!(result.is_some());
340        let issue = result.unwrap();
341        assert!(issue.description.contains("missing begin"));
342        assert!(!issue.is_critical);
343    }
344
345    #[tokio::test]
346    async fn test_verify_transaction_consistency_missing_commit() {
347        let ops = vec![
348            create_test_entry(EntryType::Begin, "doc1", "txn1"),
349            create_test_entry(EntryType::Insert, "doc1", "txn1"),
350        ];
351
352        let result = verify_transaction_consistency(&ops).await.unwrap();
353        assert!(result.is_some());
354        let issue = result.unwrap();
355        assert!(issue.description.contains("missing commit"));
356        assert!(!issue.is_critical);
357    }
358
359    #[tokio::test]
360    async fn test_verify_wal_entry_consistency_insert() {
361        let mut wal_states = std::collections::HashMap::new();
362        let mut active_transactions = std::collections::HashMap::new();
363
364        let mut entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
365        entry.data = Some(r#"{"name": "test"}"#.to_string());
366
367        let result = verify_wal_entry_consistency(&entry, &mut wal_states, &mut active_transactions)
368            .await
369            .unwrap();
370        assert!(result.is_none());
371        assert!(wal_states.contains_key("doc1"));
372    }
373
374    #[tokio::test]
375    async fn test_verify_wal_entry_consistency_update() {
376        let mut wal_states = std::collections::HashMap::new();
377        let mut active_transactions = std::collections::HashMap::new();
378
379        // First insert
380        let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
381        insert_entry.data = Some(r#"{"name": "test"}"#.to_string());
382        verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
383            .await
384            .unwrap();
385
386        // Then update
387        let mut update_entry = create_test_entry(EntryType::Update, "doc1", "txn2");
388        update_entry.data = Some(r#"{"updated": true}"#.to_string());
389
390        let result = verify_wal_entry_consistency(&update_entry, &mut wal_states, &mut active_transactions)
391            .await
392            .unwrap();
393        assert!(result.is_none());
394    }
395
396    #[tokio::test]
397    async fn test_verify_wal_entry_consistency_delete() {
398        let mut wal_states = std::collections::HashMap::new();
399        let mut active_transactions = std::collections::HashMap::new();
400
401        // First insert
402        let insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
403        verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
404            .await
405            .unwrap();
406
407        // Then delete
408        let delete_entry = create_test_entry(EntryType::Delete, "doc1", "txn2");
409
410        let result = verify_wal_entry_consistency(&delete_entry, &mut wal_states, &mut active_transactions)
411            .await
412            .unwrap();
413        assert!(result.is_none());
414        assert!(!wal_states.contains_key("doc1"));
415    }
416
417    #[tokio::test]
418    async fn test_verify_wal_entry_consistency_begin_with_data() {
419        let mut wal_states = std::collections::HashMap::new();
420        let mut active_transactions = std::collections::HashMap::new();
421
422        let mut begin_entry = create_test_entry(EntryType::Begin, "doc1", "txn1");
423        begin_entry.data = Some(r#"{"unexpected": "data"}"#.to_string());
424
425        let result = verify_wal_entry_consistency(&begin_entry, &mut wal_states, &mut active_transactions)
426            .await
427            .unwrap();
428        assert!(result.is_some());
429        let issue = result.unwrap();
430        assert_eq!(issue.transaction_id, "txn1");
431        assert_eq!(issue.document_id, "doc1");
432        assert!(issue.description.contains("should not contain data"));
433        assert!(!issue.is_critical);
434    }
435
436    #[tokio::test]
437    async fn test_verify_wal_entry_consistency_insert_invalid_json() {
438        let mut wal_states = std::collections::HashMap::new();
439        let mut active_transactions = std::collections::HashMap::new();
440
441        let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
442        insert_entry.data = Some(r#"{"invalid": json}"#.to_string());
443
444        let result = verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
445            .await
446            .unwrap();
447        assert!(result.is_some());
448        let issue = result.unwrap();
449        assert_eq!(issue.transaction_id, "txn1");
450        assert_eq!(issue.document_id, "doc1");
451        assert!(issue.description.contains("Invalid JSON data"));
452        assert!(issue.is_critical);
453    }
454
455    #[tokio::test]
456    async fn test_verify_wal_entry_consistency_insert_no_data() {
457        let mut wal_states = std::collections::HashMap::new();
458        let mut active_transactions = std::collections::HashMap::new();
459
460        let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
461        insert_entry.data = None;
462
463        let result = verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
464            .await
465            .unwrap();
466        assert!(result.is_some());
467        let issue = result.unwrap();
468        assert_eq!(issue.transaction_id, "txn1");
469        assert_eq!(issue.document_id, "doc1");
470        assert!(issue.description.contains("missing data"));
471        assert!(issue.is_critical);
472    }
473
474    #[tokio::test]
475    async fn test_verify_wal_entry_consistency_update_invalid_json() {
476        let mut wal_states = std::collections::HashMap::new();
477        let mut active_transactions = std::collections::HashMap::new();
478
479        let mut update_entry = create_test_entry(EntryType::Update, "doc1", "txn1");
480        update_entry.data = Some(r#"{"invalid": json}"#.to_string());
481
482        let result = verify_wal_entry_consistency(&update_entry, &mut wal_states, &mut active_transactions)
483            .await
484            .unwrap();
485        assert!(result.is_some());
486        let issue = result.unwrap();
487        assert_eq!(issue.transaction_id, "txn1");
488        assert_eq!(issue.document_id, "doc1");
489        assert!(issue.description.contains("Invalid JSON data"));
490        assert!(issue.is_critical);
491    }
492
493    #[tokio::test]
494    async fn test_verify_wal_entry_consistency_insert_duplicate() {
495        let mut wal_states = std::collections::HashMap::new();
496        let mut active_transactions = std::collections::HashMap::new();
497
498        // First insert
499        let mut insert_entry1 = create_test_entry(EntryType::Insert, "doc1", "txn1");
500        insert_entry1.data = Some(r#"{"name": "test"}"#.to_string());
501        verify_wal_entry_consistency(&insert_entry1, &mut wal_states, &mut active_transactions)
502            .await
503            .unwrap();
504
505        // Second insert of same document
506        let mut insert_entry2 = create_test_entry(EntryType::Insert, "doc1", "txn2");
507        insert_entry2.data = Some(r#"{"name": "test2"}"#.to_string());
508
509        let result = verify_wal_entry_consistency(&insert_entry2, &mut wal_states, &mut active_transactions)
510            .await
511            .unwrap();
512        assert!(result.is_some());
513        let issue = result.unwrap();
514        assert_eq!(issue.transaction_id, "txn2");
515        assert_eq!(issue.document_id, "doc1");
516        assert!(issue.description.contains("already exists"));
517        assert!(issue.is_critical);
518    }
519
520    #[tokio::test]
521    async fn test_verify_wal_entry_consistency_update_nonexistent_doc() {
522        let mut wal_states = std::collections::HashMap::new();
523        let mut active_transactions = std::collections::HashMap::new();
524
525        let mut update_entry = create_test_entry(EntryType::Update, "nonexistent", "txn1");
526        update_entry.data = Some(r#"{"name": "updated"}"#.to_string());
527
528        // Update on nonexistent is actually valid (creates document-like state)
529        let result = verify_wal_entry_consistency(&update_entry, &mut wal_states, &mut active_transactions)
530            .await
531            .unwrap();
532        // This should be None since update is allowed on nonexistent
533        assert!(result.is_none());
534    }
535
536    #[tokio::test]
537    async fn test_verify_wal_entry_consistency_delete_nonexistent_doc() {
538        let mut wal_states = std::collections::HashMap::new();
539        let mut active_transactions = std::collections::HashMap::new();
540
541        let delete_entry = create_test_entry(EntryType::Delete, "nonexistent", "txn1");
542
543        // Delete on nonexistent is actually valid
544        let result = verify_wal_entry_consistency(&delete_entry, &mut wal_states, &mut active_transactions)
545            .await
546            .unwrap();
547        // This should be None since delete is allowed on nonexistent
548        assert!(result.is_none());
549    }
550
551    #[tokio::test]
552    async fn test_verify_wal_entry_consistency_delete_after_insert() {
553        let mut wal_states = std::collections::HashMap::new();
554        let mut active_transactions = std::collections::HashMap::new();
555
556        // First insert
557        let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
558        insert_entry.data = Some(r#"{"name": "test"}"#.to_string());
559        verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
560            .await
561            .unwrap();
562
563        // Then delete
564        let delete_entry = create_test_entry(EntryType::Delete, "doc1", "txn2");
565        let result = verify_wal_entry_consistency(&delete_entry, &mut wal_states, &mut active_transactions)
566            .await
567            .unwrap();
568        assert!(result.is_none()); // Should be valid
569    }
570
571    #[tokio::test]
572    async fn test_verify_wal_entry_consistency_rollback_issue() {
573        let mut wal_states = std::collections::HashMap::new();
574        let mut active_transactions = std::collections::HashMap::new();
575
576        // Begin first
577        let begin_entry = create_test_entry(EntryType::Begin, "doc1", "txn1");
578        verify_wal_entry_consistency(&begin_entry, &mut wal_states, &mut active_transactions)
579            .await
580            .unwrap();
581
582        // Try to rollback without operations
583        let rollback_entry = create_test_entry(EntryType::Rollback, "doc1", "txn1");
584        let result = verify_wal_entry_consistency(&rollback_entry, &mut wal_states, &mut active_transactions)
585            .await
586            .unwrap();
587        assert!(result.is_none()); // Rollback after begin without operations should be fine
588    }
589
590    #[tokio::test]
591    async fn test_verify_wal_entry_consistency_multiple_updates() {
592        let mut wal_states = std::collections::HashMap::new();
593        let mut active_transactions = std::collections::HashMap::new();
594
595        // Insert
596        let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
597        insert_entry.data = Some(r#"{"v": 1}"#.to_string());
598        verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
599            .await
600            .unwrap();
601
602        // Update 1
603        let mut update1 = create_test_entry(EntryType::Update, "doc1", "txn2");
604        update1.data = Some(r#"{"v": 2}"#.to_string());
605        verify_wal_entry_consistency(&update1, &mut wal_states, &mut active_transactions)
606            .await
607            .unwrap();
608
609        // Update 2
610        let mut update2 = create_test_entry(EntryType::Update, "doc1", "txn3");
611        update2.data = Some(r#"{"v": 3}"#.to_string());
612        let result = verify_wal_entry_consistency(&update2, &mut wal_states, &mut active_transactions)
613            .await
614            .unwrap();
615        assert!(result.is_none()); // Should be valid
616    }
617
618    #[tokio::test]
619    async fn test_verify_wal_entry_consistency_commit_valid() {
620        let mut wal_states = std::collections::HashMap::new();
621        let mut active_transactions = std::collections::HashMap::new();
622
623        let begin_entry = create_test_entry(EntryType::Begin, "doc1", "txn1");
624        verify_wal_entry_consistency(&begin_entry, &mut wal_states, &mut active_transactions)
625            .await
626            .unwrap();
627
628        let commit_entry = create_test_entry(EntryType::Commit, "doc1", "txn1");
629        let result = verify_wal_entry_consistency(&commit_entry, &mut wal_states, &mut active_transactions)
630            .await
631            .unwrap();
632        assert!(result.is_none()); // Commit should be fine after begin
633    }
634}