1use std::collections::HashMap;
10
11use futures::StreamExt as _;
12use serde::{Deserialize, Serialize};
13
14use crate::{EntryType, LogEntry, Result, WalDocumentOps, WalManager};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct WalVerificationIssue {
19 pub transaction_id: String,
21 pub document_id: String,
23 pub description: String,
25 pub is_critical: bool,
27}
28
29#[derive(Debug)]
31pub struct WalVerificationResult {
32 pub issues: Vec<WalVerificationIssue>,
34 pub passed: bool,
36 pub entries_processed: u64,
38 pub affected_documents: u64,
40}
41
42#[allow(clippy::arithmetic_side_effects, reason = "counter increment in loop")]
49pub async fn verify_wal_consistency<D>(wal: &WalManager, document_ops: &D) -> Result<WalVerificationResult>
50where
51 D: WalDocumentOps + Sync,
52{
53 let mut issues = Vec::new();
54 let mut wal_states = HashMap::new(); let mut active_transactions = HashMap::new(); let mut entries_processed = 0;
57
58 let stream = wal.stream_entries();
59 let mut pinned_stream = std::pin::pin!(stream);
60 while let Some(entry_result) = pinned_stream.next().await {
61 match entry_result {
62 Ok(entry) => {
63 entries_processed += 1;
64 if let Some(issue) =
65 verify_wal_entry_consistency(&entry, &mut wal_states, &mut active_transactions).await?
66 {
67 issues.push(issue);
68 }
69 },
70 Err(e) => {
71 issues.push(WalVerificationIssue {
72 transaction_id: "unknown".to_owned(),
73 document_id: "unknown".to_owned(),
74 description: format!("Failed to read WAL entry: {}", e),
75 is_critical: true,
76 });
77 },
78 }
79 }
80
81 for doc_id in wal_states.keys() {
83 match document_ops.get_document(doc_id).await {
84 Ok(Some(existing_doc)) => {
85 if let Some(wal_doc) = wal_states.get(doc_id) &&
87 existing_doc != *wal_doc
88 {
89 issues.push(WalVerificationIssue {
90 transaction_id: "final_check".to_owned(),
91 document_id: doc_id.clone(),
92 description: format!("Document {} data mismatch between WAL and disk", doc_id),
93 is_critical: true,
94 });
95 }
96 },
97 Ok(None) => {
98 issues.push(WalVerificationIssue {
99 transaction_id: "final_check".to_owned(),
100 document_id: doc_id.clone(),
101 description: format!("Document {} exists in WAL but not on disk", doc_id),
102 is_critical: true,
103 });
104 },
105 Err(e) => {
106 issues.push(WalVerificationIssue {
107 transaction_id: "final_check".to_owned(),
108 document_id: doc_id.clone(),
109 description: format!("Failed to read document {} from disk: {}", doc_id, e),
110 is_critical: true,
111 });
112 },
113 }
114 }
115
116 let passed = !issues.iter().any(|issue| issue.is_critical);
117
118 Ok(WalVerificationResult {
119 issues,
120 passed,
121 entries_processed: entries_processed as u64,
122 affected_documents: wal_states.len() as u64,
123 })
124}
125
126async fn verify_wal_entry_consistency(
128 entry: &LogEntry,
129 wal_states: &mut HashMap<String, serde_json::Value>,
130 active_transactions: &mut HashMap<String, Vec<LogEntry>>,
131) -> Result<Option<WalVerificationIssue>> {
132 let txn_id = entry.transaction_id_str();
133 let doc_id = entry.document_id_str();
134
135 active_transactions
137 .entry(txn_id.to_owned())
138 .or_insert_with(Vec::new)
139 .push(entry.clone());
140
141 match entry.entry_type {
142 EntryType::Begin => {
143 if entry.data.is_some() {
145 return Ok(Some(WalVerificationIssue {
146 transaction_id: txn_id.to_owned(),
147 document_id: doc_id.to_owned(),
148 description: "Transaction begin entry should not contain data".to_owned(),
149 is_critical: false,
150 }));
151 }
152 },
153 EntryType::Insert => {
154 if let Some(data_str) = entry.data.as_ref() {
155 match serde_json::from_str(data_str) {
156 Ok(data) => {
157 if wal_states.contains_key(doc_id) {
159 return Ok(Some(WalVerificationIssue {
160 transaction_id: txn_id.to_owned(),
161 document_id: doc_id.to_owned(),
162 description: format!("Document {} already exists in WAL state", doc_id),
163 is_critical: true,
164 }));
165 }
166 wal_states.insert(doc_id.to_owned(), data);
167 },
168 Err(e) => {
169 return Ok(Some(WalVerificationIssue {
170 transaction_id: txn_id.to_owned(),
171 document_id: doc_id.to_owned(),
172 description: format!("Invalid JSON data in insert operation: {}", e),
173 is_critical: true,
174 }));
175 },
176 }
177 }
178 else {
179 return Ok(Some(WalVerificationIssue {
180 transaction_id: txn_id.to_owned(),
181 document_id: doc_id.to_owned(),
182 description: "Insert operation missing data".to_owned(),
183 is_critical: true,
184 }));
185 }
186 },
187 EntryType::Update => {
188 if let Some(data_str) = entry.data.as_ref() {
189 match serde_json::from_str(data_str) {
190 Ok(data) => {
191 wal_states.insert(doc_id.to_owned(), data);
196 },
197 Err(e) => {
198 return Ok(Some(WalVerificationIssue {
199 transaction_id: txn_id.to_owned(),
200 document_id: doc_id.to_owned(),
201 description: format!("Invalid JSON data in update operation: {}", e),
202 is_critical: true,
203 }));
204 },
205 }
206 }
207 else {
208 return Ok(Some(WalVerificationIssue {
209 transaction_id: txn_id.to_owned(),
210 document_id: doc_id.to_owned(),
211 description: "Update operation missing data".to_owned(),
212 is_critical: true,
213 }));
214 }
215 },
216 EntryType::Delete => {
217 wal_states.remove(doc_id);
221 },
222 EntryType::Commit => {
223 if let Some(ops) = active_transactions.get(txn_id) &&
225 let Some(issue) = verify_transaction_consistency(ops).await?
226 {
227 return Ok(Some(issue));
228 }
229 active_transactions.remove(txn_id);
230 },
231 EntryType::Rollback => {
232 if let Some(ops) = active_transactions.remove(txn_id) {
234 for op in ops.iter().rev() {
235 match op.entry_type {
236 EntryType::Insert => {
237 wal_states.remove(op.document_id_str());
238 },
239 EntryType::Update => {
240 return Ok(Some(WalVerificationIssue {
243 transaction_id: txn_id.to_owned(),
244 document_id: doc_id.to_owned(),
245 description: "Transaction rollback not fully supported in verification".to_owned(),
246 is_critical: false,
247 }));
248 },
249 EntryType::Delete => {
250 return Ok(Some(WalVerificationIssue {
253 transaction_id: txn_id.to_owned(),
254 document_id: doc_id.to_owned(),
255 description: "Transaction rollback for delete not supported".to_owned(),
256 is_critical: false,
257 }));
258 },
259 EntryType::Begin | EntryType::Commit | EntryType::Rollback => {},
260 }
261 }
262 }
263 },
264 }
265
266 Ok(None)
267}
268
269#[allow(
271 clippy::expect_used,
272 reason = "ops is guaranteed to be non-empty in this context"
273)]
274async fn verify_transaction_consistency(ops: &[LogEntry]) -> Result<Option<WalVerificationIssue>> {
275 let has_begin = ops.iter().any(|op| op.entry_type == EntryType::Begin);
277 let has_commit = ops.iter().any(|op| op.entry_type == EntryType::Commit);
278
279 if !has_begin {
280 let first_op = ops.first().expect("ops is non-empty");
281 return Ok(Some(WalVerificationIssue {
282 transaction_id: first_op.transaction_id_str().to_owned(),
283 document_id: "transaction".to_owned(),
284 description: "Transaction missing begin entry".to_owned(),
285 is_critical: false,
286 }));
287 }
288
289 if !has_commit {
290 let first_op = ops.first().expect("ops is non-empty");
291 return Ok(Some(WalVerificationIssue {
292 transaction_id: first_op.transaction_id_str().to_owned(),
293 document_id: "transaction".to_owned(),
294 description: "Transaction missing commit entry".to_owned(),
295 is_critical: false,
296 }));
297 }
298
299 Ok(None)
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::{EntryType, LogEntry};
306
307 fn create_test_entry(entry_type: EntryType, doc_id: &str, txn_id: &str) -> LogEntry {
308 use crate::entry::{FixedBytes256, FixedBytes32};
309 LogEntry {
310 entry_type,
311 collection: FixedBytes256::from(b"test" as &[u8]),
312 document_id: FixedBytes256::from(doc_id.as_bytes()),
313 transaction_id: FixedBytes32::from(txn_id.as_bytes()),
314 data: None,
315 timestamp: chrono::Utc::now().timestamp_millis() as u64,
316 }
317 }
318
319 #[tokio::test]
320 async fn test_verify_transaction_consistency_valid() {
321 let ops = vec![
322 create_test_entry(EntryType::Begin, "doc1", "txn1"),
323 create_test_entry(EntryType::Insert, "doc1", "txn1"),
324 create_test_entry(EntryType::Commit, "doc1", "txn1"),
325 ];
326
327 let result = verify_transaction_consistency(&ops).await.unwrap();
328 assert!(result.is_none());
329 }
330
331 #[tokio::test]
332 async fn test_verify_transaction_consistency_missing_begin() {
333 let ops = vec![
334 create_test_entry(EntryType::Insert, "doc1", "txn1"),
335 create_test_entry(EntryType::Commit, "doc1", "txn1"),
336 ];
337
338 let result = verify_transaction_consistency(&ops).await.unwrap();
339 assert!(result.is_some());
340 let issue = result.unwrap();
341 assert!(issue.description.contains("missing begin"));
342 assert!(!issue.is_critical);
343 }
344
345 #[tokio::test]
346 async fn test_verify_transaction_consistency_missing_commit() {
347 let ops = vec![
348 create_test_entry(EntryType::Begin, "doc1", "txn1"),
349 create_test_entry(EntryType::Insert, "doc1", "txn1"),
350 ];
351
352 let result = verify_transaction_consistency(&ops).await.unwrap();
353 assert!(result.is_some());
354 let issue = result.unwrap();
355 assert!(issue.description.contains("missing commit"));
356 assert!(!issue.is_critical);
357 }
358
359 #[tokio::test]
360 async fn test_verify_wal_entry_consistency_insert() {
361 let mut wal_states = std::collections::HashMap::new();
362 let mut active_transactions = std::collections::HashMap::new();
363
364 let mut entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
365 entry.data = Some(r#"{"name": "test"}"#.to_string());
366
367 let result = verify_wal_entry_consistency(&entry, &mut wal_states, &mut active_transactions)
368 .await
369 .unwrap();
370 assert!(result.is_none());
371 assert!(wal_states.contains_key("doc1"));
372 }
373
374 #[tokio::test]
375 async fn test_verify_wal_entry_consistency_update() {
376 let mut wal_states = std::collections::HashMap::new();
377 let mut active_transactions = std::collections::HashMap::new();
378
379 let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
381 insert_entry.data = Some(r#"{"name": "test"}"#.to_string());
382 verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
383 .await
384 .unwrap();
385
386 let mut update_entry = create_test_entry(EntryType::Update, "doc1", "txn2");
388 update_entry.data = Some(r#"{"updated": true}"#.to_string());
389
390 let result = verify_wal_entry_consistency(&update_entry, &mut wal_states, &mut active_transactions)
391 .await
392 .unwrap();
393 assert!(result.is_none());
394 }
395
396 #[tokio::test]
397 async fn test_verify_wal_entry_consistency_delete() {
398 let mut wal_states = std::collections::HashMap::new();
399 let mut active_transactions = std::collections::HashMap::new();
400
401 let insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
403 verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
404 .await
405 .unwrap();
406
407 let delete_entry = create_test_entry(EntryType::Delete, "doc1", "txn2");
409
410 let result = verify_wal_entry_consistency(&delete_entry, &mut wal_states, &mut active_transactions)
411 .await
412 .unwrap();
413 assert!(result.is_none());
414 assert!(!wal_states.contains_key("doc1"));
415 }
416
417 #[tokio::test]
418 async fn test_verify_wal_entry_consistency_begin_with_data() {
419 let mut wal_states = std::collections::HashMap::new();
420 let mut active_transactions = std::collections::HashMap::new();
421
422 let mut begin_entry = create_test_entry(EntryType::Begin, "doc1", "txn1");
423 begin_entry.data = Some(r#"{"unexpected": "data"}"#.to_string());
424
425 let result = verify_wal_entry_consistency(&begin_entry, &mut wal_states, &mut active_transactions)
426 .await
427 .unwrap();
428 assert!(result.is_some());
429 let issue = result.unwrap();
430 assert_eq!(issue.transaction_id, "txn1");
431 assert_eq!(issue.document_id, "doc1");
432 assert!(issue.description.contains("should not contain data"));
433 assert!(!issue.is_critical);
434 }
435
436 #[tokio::test]
437 async fn test_verify_wal_entry_consistency_insert_invalid_json() {
438 let mut wal_states = std::collections::HashMap::new();
439 let mut active_transactions = std::collections::HashMap::new();
440
441 let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
442 insert_entry.data = Some(r#"{"invalid": json}"#.to_string());
443
444 let result = verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
445 .await
446 .unwrap();
447 assert!(result.is_some());
448 let issue = result.unwrap();
449 assert_eq!(issue.transaction_id, "txn1");
450 assert_eq!(issue.document_id, "doc1");
451 assert!(issue.description.contains("Invalid JSON data"));
452 assert!(issue.is_critical);
453 }
454
455 #[tokio::test]
456 async fn test_verify_wal_entry_consistency_insert_no_data() {
457 let mut wal_states = std::collections::HashMap::new();
458 let mut active_transactions = std::collections::HashMap::new();
459
460 let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
461 insert_entry.data = None;
462
463 let result = verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
464 .await
465 .unwrap();
466 assert!(result.is_some());
467 let issue = result.unwrap();
468 assert_eq!(issue.transaction_id, "txn1");
469 assert_eq!(issue.document_id, "doc1");
470 assert!(issue.description.contains("missing data"));
471 assert!(issue.is_critical);
472 }
473
474 #[tokio::test]
475 async fn test_verify_wal_entry_consistency_update_invalid_json() {
476 let mut wal_states = std::collections::HashMap::new();
477 let mut active_transactions = std::collections::HashMap::new();
478
479 let mut update_entry = create_test_entry(EntryType::Update, "doc1", "txn1");
480 update_entry.data = Some(r#"{"invalid": json}"#.to_string());
481
482 let result = verify_wal_entry_consistency(&update_entry, &mut wal_states, &mut active_transactions)
483 .await
484 .unwrap();
485 assert!(result.is_some());
486 let issue = result.unwrap();
487 assert_eq!(issue.transaction_id, "txn1");
488 assert_eq!(issue.document_id, "doc1");
489 assert!(issue.description.contains("Invalid JSON data"));
490 assert!(issue.is_critical);
491 }
492
493 #[tokio::test]
494 async fn test_verify_wal_entry_consistency_insert_duplicate() {
495 let mut wal_states = std::collections::HashMap::new();
496 let mut active_transactions = std::collections::HashMap::new();
497
498 let mut insert_entry1 = create_test_entry(EntryType::Insert, "doc1", "txn1");
500 insert_entry1.data = Some(r#"{"name": "test"}"#.to_string());
501 verify_wal_entry_consistency(&insert_entry1, &mut wal_states, &mut active_transactions)
502 .await
503 .unwrap();
504
505 let mut insert_entry2 = create_test_entry(EntryType::Insert, "doc1", "txn2");
507 insert_entry2.data = Some(r#"{"name": "test2"}"#.to_string());
508
509 let result = verify_wal_entry_consistency(&insert_entry2, &mut wal_states, &mut active_transactions)
510 .await
511 .unwrap();
512 assert!(result.is_some());
513 let issue = result.unwrap();
514 assert_eq!(issue.transaction_id, "txn2");
515 assert_eq!(issue.document_id, "doc1");
516 assert!(issue.description.contains("already exists"));
517 assert!(issue.is_critical);
518 }
519
520 #[tokio::test]
521 async fn test_verify_wal_entry_consistency_update_nonexistent_doc() {
522 let mut wal_states = std::collections::HashMap::new();
523 let mut active_transactions = std::collections::HashMap::new();
524
525 let mut update_entry = create_test_entry(EntryType::Update, "nonexistent", "txn1");
526 update_entry.data = Some(r#"{"name": "updated"}"#.to_string());
527
528 let result = verify_wal_entry_consistency(&update_entry, &mut wal_states, &mut active_transactions)
530 .await
531 .unwrap();
532 assert!(result.is_none());
534 }
535
536 #[tokio::test]
537 async fn test_verify_wal_entry_consistency_delete_nonexistent_doc() {
538 let mut wal_states = std::collections::HashMap::new();
539 let mut active_transactions = std::collections::HashMap::new();
540
541 let delete_entry = create_test_entry(EntryType::Delete, "nonexistent", "txn1");
542
543 let result = verify_wal_entry_consistency(&delete_entry, &mut wal_states, &mut active_transactions)
545 .await
546 .unwrap();
547 assert!(result.is_none());
549 }
550
551 #[tokio::test]
552 async fn test_verify_wal_entry_consistency_delete_after_insert() {
553 let mut wal_states = std::collections::HashMap::new();
554 let mut active_transactions = std::collections::HashMap::new();
555
556 let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
558 insert_entry.data = Some(r#"{"name": "test"}"#.to_string());
559 verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
560 .await
561 .unwrap();
562
563 let delete_entry = create_test_entry(EntryType::Delete, "doc1", "txn2");
565 let result = verify_wal_entry_consistency(&delete_entry, &mut wal_states, &mut active_transactions)
566 .await
567 .unwrap();
568 assert!(result.is_none()); }
570
571 #[tokio::test]
572 async fn test_verify_wal_entry_consistency_rollback_issue() {
573 let mut wal_states = std::collections::HashMap::new();
574 let mut active_transactions = std::collections::HashMap::new();
575
576 let begin_entry = create_test_entry(EntryType::Begin, "doc1", "txn1");
578 verify_wal_entry_consistency(&begin_entry, &mut wal_states, &mut active_transactions)
579 .await
580 .unwrap();
581
582 let rollback_entry = create_test_entry(EntryType::Rollback, "doc1", "txn1");
584 let result = verify_wal_entry_consistency(&rollback_entry, &mut wal_states, &mut active_transactions)
585 .await
586 .unwrap();
587 assert!(result.is_none()); }
589
590 #[tokio::test]
591 async fn test_verify_wal_entry_consistency_multiple_updates() {
592 let mut wal_states = std::collections::HashMap::new();
593 let mut active_transactions = std::collections::HashMap::new();
594
595 let mut insert_entry = create_test_entry(EntryType::Insert, "doc1", "txn1");
597 insert_entry.data = Some(r#"{"v": 1}"#.to_string());
598 verify_wal_entry_consistency(&insert_entry, &mut wal_states, &mut active_transactions)
599 .await
600 .unwrap();
601
602 let mut update1 = create_test_entry(EntryType::Update, "doc1", "txn2");
604 update1.data = Some(r#"{"v": 2}"#.to_string());
605 verify_wal_entry_consistency(&update1, &mut wal_states, &mut active_transactions)
606 .await
607 .unwrap();
608
609 let mut update2 = create_test_entry(EntryType::Update, "doc1", "txn3");
611 update2.data = Some(r#"{"v": 3}"#.to_string());
612 let result = verify_wal_entry_consistency(&update2, &mut wal_states, &mut active_transactions)
613 .await
614 .unwrap();
615 assert!(result.is_none()); }
617
618 #[tokio::test]
619 async fn test_verify_wal_entry_consistency_commit_valid() {
620 let mut wal_states = std::collections::HashMap::new();
621 let mut active_transactions = std::collections::HashMap::new();
622
623 let begin_entry = create_test_entry(EntryType::Begin, "doc1", "txn1");
624 verify_wal_entry_consistency(&begin_entry, &mut wal_states, &mut active_transactions)
625 .await
626 .unwrap();
627
628 let commit_entry = create_test_entry(EntryType::Commit, "doc1", "txn1");
629 let result = verify_wal_entry_consistency(&commit_entry, &mut wal_states, &mut active_transactions)
630 .await
631 .unwrap();
632 assert!(result.is_none()); }
634}