sentinel_dbms/wal/ops.rs
1//! Core WAL operations and trait definitions.
2//!
3//! This module provides Write-Ahead Logging (WAL) operations for both Store and Collection
4//! entities. WAL ensures data durability and consistency by logging operations before
5//! they are applied to the main data store.
6//!
7//! # Architecture
8//!
9//! The WAL system is organized into two layers:
10//! - **Low-level operations** in the `sentinel-wal` crate handle raw WAL file management
11//! - **High-level operations** in this module provide trait-based interfaces for Store and
12//! Collection
13//!
14//! # Key Concepts
15//!
16//! - **Checkpoint**: Flushes accumulated WAL entries to the main data store and truncates the log
17//! - **Recovery**: Replays WAL entries to restore data consistency after a crash
18//! - **Verification**: Validates WAL integrity and consistency with the main data store
19//! - **Streaming**: Provides real-time access to WAL entries for monitoring and replication
20//!
21//! # Examples
22//!
23//! ## Basic WAL Operations on a Collection
24//!
25//! ```rust,no_run
26//! # use sentinel_dbms::{Store, Collection};
27//! # use futures::StreamExt;
28//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
29//! # let store = Store::new("/tmp/store", None).await?;
30//! # let collection = store.collection_with_config("users", None).await?;
31//! use sentinel_dbms::wal::ops::CollectionWalOps;
32//!
33//! // Insert some data
34//! collection.insert("user-123", serde_json::json!({"name": "Alice"})).await?;
35//!
36//! // Checkpoint the WAL to persist changes
37//! collection.checkpoint_wal().await?;
38//!
39//! // Get WAL statistics
40//! let size = collection.wal_size().await?;
41//! let count = collection.wal_entries_count().await?;
42//! println!("WAL size: {} bytes, entries: {}", size, count);
43//!
44//! // Stream WAL entries for monitoring
45//! let mut stream = collection.stream_wal_entries().await?;
46//! while let Some(entry) = stream.next().await {
47//! let entry = entry?;
48//! println!("WAL entry: {:?}", entry);
49//! }
50//! # Ok(())
51//! # }
52//! ```
53//!
54//! ## Store-level WAL Operations
55//!
56//! ```rust,no_run
57//! # use sentinel_dbms::Store;
58//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
59//! # let store = Store::new("/tmp/store", None).await?;
60//! use sentinel_dbms::wal::ops::StoreWalOps;
61//!
62//! // Checkpoint all collections
63//! store.checkpoint_all_collections().await?;
64//!
65//! // Verify all collections against their WALs
66//! let issues = store.verify_all_collections().await?;
67//! for (collection_name, collection_issues) in issues {
68//! println!(
69//! "Collection {} has {} issues",
70//! collection_name,
71//! collection_issues.len()
72//! );
73//! }
74//!
75//! // Recover all collections from WAL
76//! let recovery_stats = store.recover_all_collections().await?;
77//! for (collection_name, operations) in recovery_stats {
78//! println!(
79//! "Recovered {} operations for {}",
80//! operations, collection_name
81//! );
82//! }
83//! # Ok(())
84//! # }
85//! ```
86
87use std::{collections::HashMap, pin::Pin};
88
89use async_trait::async_trait;
90use futures::{Stream, StreamExt as _};
91use tracing::{debug, error, info, warn};
92use sentinel_wal::{
93 recover_from_wal_safe,
94 verify_wal_consistency,
95 LogEntry,
96 WalRecoveryResult,
97 WalVerificationIssue,
98 WalVerificationResult,
99};
100
101use crate::{store::operations::collection_with_config, Collection, Store};
102
103/// Extension trait for Store to add WAL operations.
104///
105/// This trait provides high-level WAL operations that work across all collections
106/// in a store. Operations are performed sequentially on each collection to ensure
107/// consistency and avoid resource conflicts.
108///
109/// # Thread Safety
110///
111/// All operations are async and can be called concurrently, but the trait implementations
112/// handle internal synchronization through the Store's collection management.
113#[async_trait]
114pub trait StoreWalOps {
115 /// Perform a checkpoint on all collections in the store.
116 ///
117 /// This operation iterates through all collections and checkpoints each one's WAL,
118 /// ensuring all pending operations are flushed to the main data store. This is
119 /// typically called during maintenance windows or before backups.
120 ///
121 /// # Returns
122 ///
123 /// Returns `Ok(())` on success, or an error if any collection fails to checkpoint.
124 ///
125 /// # Examples
126 ///
127 /// ```rust,no_run
128 /// # use sentinel_dbms::Store;
129 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
130 /// # let store = Store::new("/tmp/store", None).await?;
131 /// use sentinel_dbms::wal::ops::StoreWalOps;
132 ///
133 /// store.checkpoint_all_collections().await?;
134 /// println!("All collections checkpointed successfully");
135 /// # Ok(())
136 /// # }
137 /// ```
138 async fn checkpoint_all_collections(&self) -> crate::Result<()>;
139
140 /// Stream WAL entries from all collections in the store.
141 ///
142 /// Creates a unified stream that yields WAL entries from all collections,
143 /// prefixed with the collection name. This is useful for monitoring, auditing,
144 /// and replication across the entire store.
145 ///
146 /// # Returns
147 ///
148 /// Returns a stream yielding `(collection_name, LogEntry)` tuples, or an error
149 /// if the collection list cannot be retrieved.
150 ///
151 /// # Examples
152 ///
153 /// ```rust,no_run
154 /// # use sentinel_dbms::Store;
155 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
156 /// # let store = Store::new("/tmp/store", None).await?;
157 /// use sentinel_dbms::wal::ops::StoreWalOps;
158 /// use futures::StreamExt;
159 ///
160 /// let mut stream = store.stream_all_wal_entries().await?;
161 /// while let Some(result) = stream.next().await {
162 /// let (collection_name, entry) = result?;
163 /// println!("Collection {}: {:?}", collection_name, entry);
164 /// }
165 /// # Ok(())
166 /// # }
167 /// ```
168 async fn stream_all_wal_entries(
169 &self,
170 ) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<(String, LogEntry)>> + Send + 'static>>>;
171
172 /// Verify all collections against their WAL files.
173 ///
174 /// Performs consistency checks on all collections to ensure WAL entries match
175 /// the current state of documents. Returns a map of collection names to any
176 /// verification issues found.
177 ///
178 /// # Returns
179 ///
180 /// Returns a `HashMap` where keys are collection names and values are vectors
181 /// of `WalVerificationIssue`s. Collections with no issues are not included.
182 ///
183 /// # Examples
184 ///
185 /// ```rust,no_run
186 /// # use sentinel_dbms::Store;
187 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
188 /// # let store = Store::new("/tmp/store", None).await?;
189 /// use sentinel_dbms::wal::ops::StoreWalOps;
190 ///
191 /// let issues = store.verify_all_collections().await?;
192 /// if issues.is_empty() {
193 /// println!("All collections are consistent with their WALs");
194 /// }
195 /// else {
196 /// for (collection_name, collection_issues) in issues {
197 /// println!(
198 /// "Collection {} has {} issues:",
199 /// collection_name,
200 /// collection_issues.len()
201 /// );
202 /// for issue in collection_issues {
203 /// println!(" - {}", issue.description);
204 /// }
205 /// }
206 /// }
207 /// # Ok(())
208 /// # }
209 /// ```
210 async fn verify_all_collections(&self) -> crate::Result<HashMap<String, Vec<WalVerificationIssue>>>;
211
212 /// Recover all collections from their WAL files.
213 ///
214 /// Performs crash recovery on all collections by replaying WAL entries.
215 /// This is typically called during store initialization after an unclean shutdown.
216 ///
217 /// # Returns
218 ///
219 /// Returns a `HashMap` where keys are collection names and values are the number
220 /// of operations recovered for each collection.
221 ///
222 /// # Examples
223 ///
224 /// ```rust,no_run
225 /// # use sentinel_dbms::Store;
226 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
227 /// # let store = Store::new("/tmp/store", None).await?;
228 /// use sentinel_dbms::wal::ops::StoreWalOps;
229 ///
230 /// let recovery_stats = store.recover_all_collections().await?;
231 /// let total_operations: usize = recovery_stats.values().sum();
232 /// println!(
233 /// "Recovered {} operations across {} collections",
234 /// total_operations,
235 /// recovery_stats.len()
236 /// );
237 /// # Ok(())
238 /// # }
239 /// ```
240 async fn recover_all_collections(&self) -> crate::Result<HashMap<String, usize>>;
241}
242
243/// Extension trait for Collection to add WAL operations.
244///
245/// This trait provides WAL operations specific to individual collections,
246/// including checkpointing, verification, recovery, and monitoring capabilities.
247///
248/// # Thread Safety
249///
250/// All operations are async and work with the collection's internal locking mechanisms.
251#[async_trait]
252pub trait CollectionWalOps {
253 /// Perform a checkpoint on this collection's WAL.
254 ///
255 /// Flushes all pending WAL entries to the main data store and truncates the WAL file.
256 /// This operation ensures durability and can help manage WAL file size.
257 ///
258 /// # Returns
259 ///
260 /// Returns `Ok(())` on success, or an error if the checkpoint operation fails.
261 ///
262 /// # Examples
263 ///
264 /// ```rust,no_run
265 /// # use sentinel_dbms::{Store, Collection};
266 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
267 /// # let store = Store::new("/tmp/store", None).await?;
268 /// # let collection = store.collection_with_config("users", None).await?;
269 /// use sentinel_dbms::wal::ops::CollectionWalOps;
270 ///
271 /// // Perform operations
272 /// collection.insert("user-123", serde_json::json!({"name": "Alice"})).await?;
273 /// collection.update("user-123", serde_json::json!({"name": "Alice", "age": 30})).await?;
274 ///
275 /// // Checkpoint to persist changes
276 /// collection.checkpoint_wal().await?;
277 /// println!("WAL checkpoint completed");
278 /// # Ok(())
279 /// # }
280 /// ```
281 async fn checkpoint_wal(&self) -> crate::Result<()>;
282
283 /// Stream WAL entries for this collection.
284 ///
285 /// Creates a stream that yields all current WAL entries for the collection.
286 /// This is useful for monitoring recent operations, auditing, or replication.
287 ///
288 /// # Returns
289 ///
290 /// Returns a stream yielding `LogEntry` items, or an error if WAL access fails.
291 ///
292 /// # Examples
293 ///
294 /// ```rust,no_run
295 /// # use sentinel_dbms::{Store, Collection};
296 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
297 /// # let store = Store::new("/tmp/store", None).await?;
298 /// # let collection = store.collection_with_config("users", None).await?;
299 /// use sentinel_dbms::wal::ops::CollectionWalOps;
300 /// use futures::StreamExt;
301 ///
302 /// let mut stream = collection.stream_wal_entries().await?;
303 /// let mut operation_count = 0;
304 /// while let Some(result) = stream.next().await {
305 /// let entry = result?;
306 /// operation_count += 1;
307 /// println!("Operation {}: {:?}", operation_count, entry.entry_type);
308 /// }
309 /// println!("Total operations in WAL: {}", operation_count);
310 /// # Ok(())
311 /// # }
312 /// ```
313 async fn stream_wal_entries(self) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>>>;
314
315 /// Verify this collection against its WAL file.
316 ///
317 /// Performs consistency checks to ensure WAL entries match the current state
318 /// of documents in the collection. This helps detect corruption or inconsistencies.
319 ///
320 /// # Returns
321 ///
322 /// Returns a `WalVerificationResult` containing verification statistics and any issues found.
323 ///
324 /// # Examples
325 ///
326 /// ```rust,no_run
327 /// # use sentinel_dbms::{Store, Collection};
328 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
329 /// # let store = Store::new("/tmp/store", None).await?;
330 /// # let collection = store.collection_with_config("users", None).await?;
331 /// use sentinel_dbms::wal::ops::CollectionWalOps;
332 ///
333 /// let result = collection.verify_against_wal().await?;
334 /// println!(
335 /// "Verification result: {}",
336 /// if result.passed { "PASSED" } else { "FAILED" }
337 /// );
338 /// println!("Entries processed: {}", result.entries_processed);
339 /// println!("Issues found: {}", result.issues.len());
340 ///
341 /// for issue in &result.issues {
342 /// println!("Issue: {}", issue.description);
343 /// }
344 /// # Ok(())
345 /// # }
346 /// ```
347 async fn verify_against_wal(&self) -> crate::Result<WalVerificationResult>;
348
349 /// Recover this collection from its WAL file.
350 ///
351 /// Replays WAL entries to restore the collection to a consistent state after
352 /// a crash or unclean shutdown. This operation is safe and will not overwrite
353 /// newer data.
354 ///
355 /// # Returns
356 ///
357 /// Returns a `WalRecoveryResult` with recovery statistics and any failures encountered.
358 ///
359 /// # Examples
360 ///
361 /// ```rust,no_run
362 /// # use sentinel_dbms::{Store, Collection};
363 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
364 /// # let store = Store::new("/tmp/store", None).await?;
365 /// # let collection = store.collection_with_config("users", None).await?;
366 /// use sentinel_dbms::wal::ops::CollectionWalOps;
367 ///
368 /// let result = collection.recover_from_wal().await?;
369 /// println!("Recovery completed:");
370 /// println!(" Operations recovered: {}", result.recovered_operations);
371 /// println!(" Operations skipped: {}", result.skipped_operations);
372 /// println!(" Operations failed: {}", result.failed_operations);
373 ///
374 /// if !result.failures.is_empty() {
375 /// println!("Recovery failures:");
376 /// for failure in &result.failures {
377 /// println!(" - {:?}", failure);
378 /// }
379 /// }
380 /// # Ok(())
381 /// # }
382 /// ```
383 async fn recover_from_wal(&self) -> crate::Result<WalRecoveryResult>;
384
385 /// Get the current WAL size in bytes.
386 ///
387 /// Returns the size of the WAL file on disk. This can be used to monitor
388 /// WAL growth and determine when checkpointing might be beneficial.
389 ///
390 /// # Returns
391 ///
392 /// Returns the WAL file size in bytes, or 0 if no WAL is configured.
393 ///
394 /// # Examples
395 ///
396 /// ```rust,no_run
397 /// # use sentinel_dbms::{Store, Collection};
398 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
399 /// # let store = Store::new("/tmp/store", None).await?;
400 /// # let collection = store.collection_with_config("users", None).await?;
401 /// use sentinel_dbms::wal::ops::CollectionWalOps;
402 ///
403 /// let size_bytes = collection.wal_size().await?;
404 /// let size_mb = size_bytes as f64 / (1024.0 * 1024.0);
405 /// println!("WAL size: {:.2} MB", size_mb);
406 ///
407 /// if size_bytes > 100 * 1024 * 1024 {
408 /// // 100 MB
409 /// println!("WAL is getting large, consider checkpointing");
410 /// collection.checkpoint_wal().await?;
411 /// }
412 /// # Ok(())
413 /// # }
414 /// ```
415 async fn wal_size(&self) -> crate::Result<u64>;
416
417 /// Get the number of entries in the WAL.
418 ///
419 /// Returns the count of operations logged in the WAL. This can be used to
420 /// monitor operation frequency and determine checkpoint timing.
421 ///
422 /// # Returns
423 ///
424 /// Returns the number of WAL entries, or 0 if no WAL is configured.
425 ///
426 /// # Examples
427 ///
428 /// ```rust,no_run
429 /// # use sentinel_dbms::{Store, Collection};
430 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
431 /// # let store = Store::new("/tmp/store", None).await?;
432 /// # let collection = store.collection_with_config("users", None).await?;
433 /// use sentinel_dbms::wal::ops::CollectionWalOps;
434 ///
435 /// let count = collection.wal_entries_count().await?;
436 /// println!("WAL contains {} entries", count);
437 ///
438 /// if count > 1000 {
439 /// println!("Many pending operations, checkpointing...");
440 /// collection.checkpoint_wal().await?;
441 /// }
442 /// # Ok(())
443 /// # }
444 /// ```
445 async fn wal_entries_count(&self) -> crate::Result<usize>;
446}
447
448#[async_trait]
449impl StoreWalOps for Store {
450 async fn checkpoint_all_collections(&self) -> crate::Result<()> {
451 let collections = self.list_collections().await?;
452 info!("Starting checkpoint for {} collections", collections.len());
453
454 for collection_name in collections {
455 debug!("Checkpointing collection: {}", collection_name);
456 let collection = collection_with_config(self, &collection_name, None).await?;
457 CollectionWalOps::checkpoint_wal(&collection).await?;
458 }
459
460 info!("Checkpoint completed for all collections");
461 Ok(())
462 }
463
464 async fn stream_all_wal_entries(
465 &self,
466 ) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<(String, LogEntry)>> + Send>>> {
467 let collection_names = self.list_collections().await?;
468 debug!(
469 "Streaming WAL entries from {} collections",
470 collection_names.len()
471 );
472
473 // Collect all collections to avoid borrowing self in the stream
474 let mut collections = Vec::new();
475 for name in collection_names {
476 match collection_with_config(self, &name, None).await {
477 Ok(collection) => collections.push(collection),
478 Err(e) => warn!("Failed to load collection {}: {}", name, e),
479 }
480 }
481
482 let stream_of_streams = futures::stream::iter(collections).filter_map(|collection| {
483 async move {
484 let name = collection.name().to_owned();
485 CollectionWalOps::stream_wal_entries(collection)
486 .await
487 .map_or_else(
488 |_| None,
489 |stream| Some(stream.map(move |entry| entry.map(|e| (name.clone(), e)))),
490 )
491 }
492 });
493
494 let combined_stream = stream_of_streams.flatten();
495 Ok(Box::pin(combined_stream))
496 }
497
498 async fn verify_all_collections(&self) -> crate::Result<HashMap<String, Vec<WalVerificationIssue>>> {
499 let collections = self.list_collections().await?;
500 info!(
501 "Starting WAL verification for {} collections",
502 collections.len()
503 );
504
505 let mut results = HashMap::new();
506 let mut total_issues: usize = 0;
507
508 for collection_name in collections {
509 debug!("Verifying collection: {}", collection_name);
510 let collection = collection_with_config(self, &collection_name, None).await?;
511 match CollectionWalOps::verify_against_wal(&collection).await {
512 Ok(verification_result) => {
513 if !verification_result.issues.is_empty() {
514 let issue_count = verification_result.issues.len();
515 total_issues = total_issues
516 .checked_add(issue_count)
517 .unwrap_or(total_issues);
518 results.insert(collection_name.clone(), verification_result.issues);
519 warn!(
520 "Collection {} has {} verification issues",
521 collection_name, issue_count
522 );
523 }
524 else {
525 debug!("Collection {} verification passed", collection_name);
526 }
527 },
528 Err(e) => {
529 error!("Failed to verify collection {}: {}", collection_name, e);
530 results.insert(
531 collection_name.clone(),
532 vec![WalVerificationIssue {
533 transaction_id: "unknown".to_owned(),
534 document_id: "unknown".to_owned(),
535 description: format!("Verification failed: {}", e),
536 is_critical: true,
537 }],
538 );
539 total_issues = total_issues.checked_add(1).unwrap_or(total_issues);
540 },
541 }
542 }
543
544 if total_issues > 0 {
545 warn!(
546 "WAL verification completed with {} total issues across {} collections",
547 total_issues,
548 results.len()
549 );
550 }
551 else {
552 info!("WAL verification completed successfully - no issues found");
553 }
554
555 Ok(results)
556 }
557
558 async fn recover_all_collections(&self) -> crate::Result<HashMap<String, usize>> {
559 let collections = self.list_collections().await?;
560 info!(
561 "Starting WAL recovery for {} collections",
562 collections.len()
563 );
564
565 let mut results = HashMap::new();
566 let mut total_operations: usize = 0;
567
568 for collection_name in collections {
569 debug!("Recovering collection: {}", collection_name);
570 let collection = collection_with_config(self, &collection_name, None).await?;
571 match CollectionWalOps::recover_from_wal(&collection).await {
572 Ok(recovery_result) => {
573 let operations = recovery_result.recovered_operations;
574 results.insert(collection_name.clone(), operations);
575 total_operations = total_operations
576 .checked_add(operations)
577 .unwrap_or(total_operations);
578 if operations > 0 {
579 info!(
580 "Recovered {} operations for collection {}",
581 operations, collection_name
582 );
583 }
584 else {
585 debug!("No recovery needed for collection {}", collection_name);
586 }
587 },
588 Err(e) => {
589 error!("Failed to recover collection {}: {}", collection_name, e);
590 return Err(e);
591 },
592 }
593 }
594
595 info!(
596 "WAL recovery completed - {} total operations recovered across {} collections",
597 total_operations,
598 results.len()
599 );
600 Ok(results)
601 }
602}
603
604#[async_trait]
605impl CollectionWalOps for Collection {
606 async fn checkpoint_wal(&self) -> crate::Result<()> {
607 if let Some(wal) = self.wal_manager.as_ref() {
608 debug!("Starting WAL checkpoint for collection {}", self.name());
609 wal.checkpoint().await?;
610 info!("WAL checkpoint completed for collection {}", self.name());
611 }
612 else {
613 debug!("No WAL manager configured for collection {}", self.name());
614 }
615 Ok(())
616 }
617
618 async fn stream_wal_entries(self) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>>> {
619 self.wal_manager.as_ref().map_or_else(
620 || {
621 debug!(
622 "No WAL manager configured for collection {}, returning empty stream",
623 self.name()
624 );
625 Ok(Box::pin(
626 futures::stream::empty::<std::result::Result<LogEntry, sentinel_wal::WalError>>()
627 .map(|r| r.map_err(Into::into)),
628 )
629 as Pin<
630 Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>,
631 >)
632 },
633 |wal| {
634 let name = self.name().to_owned();
635 debug!("Streaming WAL entries for collection {}", name);
636 let stream = wal
637 .stream_entries()
638 .map(|result| result.map_err(crate::error::SentinelError::from));
639 let _wal = wal.clone();
640 // let stream = wal.stream_entries().map(|result| result.map_err(|e|
641 // crate::error::SentinelError::from(e)));
642 let stream = Box::pin(stream) as Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send + 'static>>;
643 Ok(stream)
644 },
645 )
646 }
647
648 async fn verify_against_wal(&self) -> crate::Result<WalVerificationResult> {
649 if let Some(wal) = self.wal_manager.as_ref() {
650 debug!("Starting WAL verification for collection {}", self.name());
651 let result = verify_wal_consistency(wal, self).await?;
652 if result.passed {
653 info!(
654 "WAL verification passed for collection {} ({} entries processed)",
655 self.name(),
656 result.entries_processed
657 );
658 }
659 else {
660 warn!(
661 "WAL verification failed for collection {}: {} issues found",
662 self.name(),
663 result.issues.len()
664 );
665 for issue in &result.issues {
666 warn!(" Verification issue: {}", issue.description);
667 }
668 }
669 Ok(result)
670 }
671 else {
672 debug!(
673 "No WAL manager configured for collection {}, skipping verification",
674 self.name()
675 );
676 Ok(WalVerificationResult {
677 issues: vec![],
678 passed: true,
679 entries_processed: 0,
680 affected_documents: 0,
681 })
682 }
683 }
684
685 async fn recover_from_wal(&self) -> crate::Result<WalRecoveryResult> {
686 if let Some(wal) = self.wal_manager.as_ref() {
687 info!("Starting WAL recovery for collection {}", self.name());
688 let result = recover_from_wal_safe(wal, self).await?;
689 info!(
690 "WAL recovery completed for collection {}: {} operations recovered, {} skipped, {} failed",
691 self.name(),
692 result.recovered_operations,
693 result.skipped_operations,
694 result.failed_operations
695 );
696
697 if !result.failures.is_empty() {
698 warn!("Recovery failures for collection {}:", self.name());
699 for failure in &result.failures {
700 warn!(" - {:?}", failure);
701 }
702 }
703 Ok(result)
704 }
705 else {
706 debug!(
707 "No WAL manager configured for collection {}, skipping recovery",
708 self.name()
709 );
710 Ok(WalRecoveryResult {
711 recovered_operations: 0,
712 skipped_operations: 0,
713 failed_operations: 0,
714 failures: vec![],
715 })
716 }
717 }
718
719 async fn wal_size(&self) -> crate::Result<u64> {
720 if let Some(wal) = self.wal_manager.as_ref() {
721 let size = wal.size().await?;
722 debug!("WAL size for collection {}: {} bytes", self.name(), size);
723 Ok(size)
724 }
725 else {
726 debug!("No WAL manager configured for collection {}", self.name());
727 Ok(0)
728 }
729 }
730
731 async fn wal_entries_count(&self) -> crate::Result<usize> {
732 if let Some(wal) = self.wal_manager.as_ref() {
733 let count = wal.entries_count().await?;
734 debug!(
735 "WAL entries count for collection {}: {}",
736 self.name(),
737 count
738 );
739 Ok(count)
740 }
741 else {
742 debug!("No WAL manager configured for collection {}", self.name());
743 Ok(0)
744 }
745 }
746}
747
748#[cfg(test)]
749mod tests {
750 use tempfile::tempdir;
751 use sentinel_wal::StoreWalConfig;
752
753 use super::*;
754 use crate::Store;
755
756 /// Helper to create a test store with a collection
757 async fn create_test_store_with_collection() -> (tempfile::TempDir, Store, String) {
758 let temp_dir = tempdir().unwrap();
759 let store = Store::new_with_config(
760 temp_dir.path().to_path_buf(),
761 None,
762 StoreWalConfig::default(),
763 )
764 .await
765 .unwrap();
766 let collection_name = "test_wal_collection".to_string();
767 let _ = collection_with_config(&store, &collection_name, None)
768 .await
769 .unwrap();
770 (temp_dir, store, collection_name)
771 }
772
773 // ============ CollectionWalOps Tests ============
774
775 #[tokio::test]
776 async fn test_checkpoint_wal_with_wal_manager() {
777 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
778 let collection = collection_with_config(&store, &collection_name, None)
779 .await
780 .unwrap();
781
782 // Insert some data to create WAL entries
783 collection
784 .insert("doc-1", serde_json::json!({"test": 1}))
785 .await
786 .unwrap();
787 collection
788 .insert("doc-2", serde_json::json!({"test": 2}))
789 .await
790 .unwrap();
791
792 // Checkpoint should succeed
793 let result = collection.checkpoint_wal().await;
794 assert!(result.is_ok());
795 }
796
797 #[tokio::test]
798 async fn test_checkpoint_wal_without_wal_manager() {
799 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
800 let collection = collection_with_config(&store, &collection_name, None)
801 .await
802 .unwrap();
803
804 // Create collection without WAL config - checkpoint should still succeed (no-op)
805 // The default collection may not have a WAL manager configured
806 let result = collection.checkpoint_wal().await;
807 assert!(result.is_ok());
808 }
809
810 #[tokio::test]
811 async fn test_stream_wal_entries_with_data() {
812 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
813 let collection = collection_with_config(&store, &collection_name, None)
814 .await
815 .unwrap();
816
817 // Insert some data
818 collection
819 .insert("doc-1", serde_json::json!({"name": "Test1"}))
820 .await
821 .unwrap();
822 collection
823 .insert("doc-2", serde_json::json!({"name": "Test2"}))
824 .await
825 .unwrap();
826
827 // Stream entries
828 let stream = collection.stream_wal_entries().await.unwrap();
829
830 // Collect entries
831 let entries: Vec<_> = stream.collect().await;
832 assert!(!entries.is_empty());
833
834 // Each entry should be Ok
835 for entry in entries {
836 assert!(entry.is_ok());
837 }
838 }
839
840 #[tokio::test]
841 async fn test_stream_wal_entries_empty() {
842 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
843 let collection = collection_with_config(&store, &collection_name, None)
844 .await
845 .unwrap();
846
847 // Stream on empty collection
848 let stream = collection.stream_wal_entries().await.unwrap();
849 let entries: Vec<_> = stream.collect().await;
850
851 // May be empty or have entries depending on WAL config
852 // Just verify it doesn't error
853 assert!(entries.is_empty() || entries.iter().all(|e| e.is_ok()));
854 }
855
856 #[tokio::test]
857 async fn test_verify_against_wal() {
858 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
859 let collection = collection_with_config(&store, &collection_name, None)
860 .await
861 .unwrap();
862
863 // Insert some data
864 collection
865 .insert("doc-1", serde_json::json!({"verify": true}))
866 .await
867 .unwrap();
868
869 // Verify should succeed
870 let result = collection.verify_against_wal().await;
871 assert!(result.is_ok());
872
873 let verification = result.unwrap();
874 // If there are issues, verification didn't fully pass
875 assert!(verification.passed || verification.issues.is_empty());
876 }
877
878 #[tokio::test]
879 async fn test_recover_from_wal() {
880 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
881 let collection = collection_with_config(&store, &collection_name, None)
882 .await
883 .unwrap();
884
885 // Insert some data
886 collection
887 .insert("doc-to-recover", serde_json::json!({"data": "test"}))
888 .await
889 .unwrap();
890
891 // Recovery should succeed (even if no recovery needed)
892 let result = collection.recover_from_wal().await;
893 assert!(result.is_ok());
894 }
895
896 #[tokio::test]
897 async fn test_wal_size() {
898 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
899 let collection = collection_with_config(&store, &collection_name, None)
900 .await
901 .unwrap();
902
903 // Get initial size
904 let initial_size = collection.wal_size().await.unwrap();
905
906 // Insert data
907 collection
908 .insert("doc-for-size", serde_json::json!({"size": "test data"}))
909 .await
910 .unwrap();
911
912 // Get size after insert
913 let new_size = collection.wal_size().await.unwrap();
914 assert!(new_size >= initial_size);
915 }
916
917 #[tokio::test]
918 async fn test_wal_entries_count() {
919 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
920 let collection = collection_with_config(&store, &collection_name, None)
921 .await
922 .unwrap();
923
924 // Get initial count
925 let initial_count = collection.wal_entries_count().await.unwrap();
926
927 // Insert some data
928 collection
929 .insert("doc-1", serde_json::json!({"count": 1}))
930 .await
931 .unwrap();
932 collection
933 .insert("doc-2", serde_json::json!({"count": 2}))
934 .await
935 .unwrap();
936
937 // Get count after inserts
938 let new_count = collection.wal_entries_count().await.unwrap();
939 assert!(new_count >= initial_count);
940 }
941
942 // ============ StoreWalOps Tests ============
943
944 #[tokio::test]
945 async fn test_checkpoint_all_collections() {
946 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
947
948 // Insert some data
949 let collection = collection_with_config(&store, "test1", None).await.unwrap();
950 collection
951 .insert("doc-1", serde_json::json!({"test": 1}))
952 .await
953 .unwrap();
954
955 let collection2 = collection_with_config(&store, "test2", None).await.unwrap();
956 collection2
957 .insert("doc-2", serde_json::json!({"test": 2}))
958 .await
959 .unwrap();
960
961 // Checkpoint all should succeed
962 let result = store.checkpoint_all_collections().await;
963 assert!(result.is_ok());
964 }
965
966 #[tokio::test]
967 async fn test_stream_all_wal_entries() {
968 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
969
970 // Insert some data in multiple collections
971 let collection1 = collection_with_config(&store, "stream-collection-1", None)
972 .await
973 .unwrap();
974 collection1
975 .insert("doc-1", serde_json::json!({"stream": 1}))
976 .await
977 .unwrap();
978
979 let collection2 = collection_with_config(&store, "stream-collection-2", None)
980 .await
981 .unwrap();
982 collection2
983 .insert("doc-2", serde_json::json!({"stream": 2}))
984 .await
985 .unwrap();
986
987 // Stream all entries
988 let stream = store.stream_all_wal_entries().await.unwrap();
989 let entries: Vec<_> = stream.collect().await;
990
991 // Should have entries from both collections
992 // Each entry is (collection_name, LogEntry)
993 for entry in entries {
994 assert!(entry.is_ok());
995 let (_name, log_entry) = entry.unwrap();
996 // Verify it's a valid log entry
997 assert!(!log_entry.document_id.is_empty());
998 }
999 }
1000
1001 #[tokio::test]
1002 async fn test_verify_all_collections() {
1003 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1004
1005 // Create multiple collections with data
1006 let collection1 = collection_with_config(&store, "verify-1", None)
1007 .await
1008 .unwrap();
1009 collection1
1010 .insert("doc-1", serde_json::json!({"verify": 1}))
1011 .await
1012 .unwrap();
1013
1014 let collection2 = collection_with_config(&store, "verify-2", None)
1015 .await
1016 .unwrap();
1017 collection2
1018 .insert("doc-2", serde_json::json!({"verify": 2}))
1019 .await
1020 .unwrap();
1021
1022 // Verify all should succeed
1023 let result = store.verify_all_collections().await;
1024 assert!(result.is_ok());
1025
1026 let issues = result.unwrap();
1027 // Both collections should have passed verification
1028 for (name, collection_issues) in &issues {
1029 for issue in collection_issues {
1030 assert!(!issue.is_critical || issue.description.is_empty());
1031 }
1032 // Debug output
1033 if !collection_issues.is_empty() {
1034 eprintln!("Collection {} has {} issues", name, collection_issues.len());
1035 }
1036 }
1037 }
1038
1039 #[tokio::test]
1040 async fn test_recover_all_collections() {
1041 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1042
1043 // Create collections with data
1044 let collection1 = collection_with_config(&store, "recover-1", None)
1045 .await
1046 .unwrap();
1047 collection1
1048 .insert("doc-1", serde_json::json!({"recover": 1}))
1049 .await
1050 .unwrap();
1051
1052 let collection2 = collection_with_config(&store, "recover-2", None)
1053 .await
1054 .unwrap();
1055 collection2
1056 .insert("doc-2", serde_json::json!({"recover": 2}))
1057 .await
1058 .unwrap();
1059
1060 // Recover all should succeed
1061 let result = store.recover_all_collections().await;
1062 assert!(result.is_ok());
1063
1064 let recovery_stats = result.unwrap();
1065 // Should have entries for the test collections (may include others)
1066 assert!(
1067 recovery_stats.len() >= 2,
1068 "Expected at least 2 collections, got {}",
1069 recovery_stats.len()
1070 );
1071
1072 for (name, operations) in &recovery_stats {
1073 eprintln!("Collection {} recovered {} operations", name, operations);
1074 }
1075 }
1076
1077 // ============ Edge Case Tests ============
1078
1079 #[tokio::test]
1080 async fn test_wal_operations_on_empty_store() {
1081 let temp_dir = tempdir().unwrap();
1082 let store = Store::new_with_config(
1083 temp_dir.path().to_path_buf(),
1084 None,
1085 StoreWalConfig::default(),
1086 )
1087 .await
1088 .unwrap();
1089
1090 // Verify empty store
1091 let result = store.verify_all_collections().await;
1092 assert!(result.is_ok());
1093 let issues = result.unwrap();
1094 assert!(issues.is_empty());
1095
1096 // Recover empty store
1097 let result = store.recover_all_collections().await;
1098 assert!(result.is_ok());
1099 let stats = result.unwrap();
1100 assert!(stats.is_empty());
1101
1102 // Stream empty store
1103 let stream = store.stream_all_wal_entries().await.unwrap();
1104 let entries: Vec<_> = stream.collect().await;
1105 assert!(entries.is_empty());
1106 }
1107
1108 #[tokio::test]
1109 async fn test_checkpoint_empty_collection() {
1110 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1111 let collection = collection_with_config(&store, &collection_name, None)
1112 .await
1113 .unwrap();
1114
1115 // Checkpoint empty collection should succeed
1116 let result = collection.checkpoint_wal().await;
1117 assert!(result.is_ok());
1118 }
1119
1120 // ============ Edge Case Tests - Additional Coverage ============
1121
1122 #[tokio::test]
1123 async fn test_wal_ops_stream_entries_with_verify_all() {
1124 // Test streaming entries with verify_all option
1125 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1126 let collection = collection_with_config(&store, &collection_name, None)
1127 .await
1128 .unwrap();
1129
1130 // Insert some data
1131 collection
1132 .insert("verify-doc", serde_json::json!({"verify": "test"}))
1133 .await
1134 .unwrap();
1135
1136 // Stream and verify entries
1137 let stream = store.stream_all_wal_entries().await.unwrap();
1138 let entries: Vec<_> = stream.collect().await;
1139
1140 // Should have entries that can be verified
1141 assert!(!entries.is_empty());
1142 for entry in entries {
1143 // Each entry is a Result, so we need to handle both ok and err cases
1144 if let Ok(result) = entry {
1145 assert!(!result.1.document_id.is_empty());
1146 }
1147 }
1148 }
1149
1150 #[tokio::test]
1151 async fn test_wal_ops_verify_collection_with_no_wal_manager() {
1152 // Test verify_against_wal when no WAL manager is configured
1153 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1154 let collection = collection_with_config(&store, &collection_name, None)
1155 .await
1156 .unwrap();
1157
1158 // Verify should return empty result (passed) when no WAL
1159 let result = collection.verify_against_wal().await;
1160 assert!(result.is_ok());
1161
1162 let verification = result.unwrap();
1163 assert!(verification.passed);
1164 assert_eq!(verification.entries_processed, 0);
1165 }
1166
1167 #[tokio::test]
1168 async fn test_wal_ops_recover_from_wal_with_no_wal_manager() {
1169 // Test recover_from_wal when no WAL manager is configured
1170 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1171 let collection = collection_with_config(&store, &collection_name, None)
1172 .await
1173 .unwrap();
1174
1175 // Recovery should return empty result when no WAL
1176 let result = collection.recover_from_wal().await;
1177 assert!(result.is_ok());
1178
1179 let recovery = result.unwrap();
1180 assert_eq!(recovery.recovered_operations, 0);
1181 assert_eq!(recovery.skipped_operations, 0);
1182 assert_eq!(recovery.failed_operations, 0);
1183 }
1184
1185 #[tokio::test]
1186 async fn test_wal_ops_wal_size_with_no_wal_manager() {
1187 // Test wal_size when no WAL manager is configured
1188 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1189 let collection = collection_with_config(&store, &collection_name, None)
1190 .await
1191 .unwrap();
1192
1193 // Size should return 0 when no WAL
1194 let size = collection.wal_size().await.unwrap();
1195 assert_eq!(size, 0);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_wal_ops_wal_entries_count_with_no_wal_manager() {
1200 // Test wal_entries_count when no WAL manager is configured
1201 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1202 let collection = collection_with_config(&store, &collection_name, None)
1203 .await
1204 .unwrap();
1205
1206 // Count should return 0 when no WAL
1207 let count = collection.wal_entries_count().await.unwrap();
1208 assert_eq!(count, 0);
1209 }
1210
1211 #[tokio::test]
1212 async fn test_wal_ops_stream_wal_entries_with_no_wal_manager() {
1213 // Test stream_wal_entries when no WAL manager is configured
1214 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1215 let collection = collection_with_config(&store, &collection_name, None)
1216 .await
1217 .unwrap();
1218
1219 // Stream should return empty stream when no WAL
1220 let stream = collection.stream_wal_entries().await.unwrap();
1221 let entries: Vec<_> = stream.collect().await;
1222
1223 assert!(entries.is_empty());
1224 }
1225
1226 #[tokio::test]
1227 async fn test_wal_ops_verify_all_with_mixed_collections() {
1228 // Test verify_all_collections with multiple collections
1229 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1230
1231 // Create collections with varying data
1232 for i in 0 .. 3 {
1233 let collection = collection_with_config(&store, &format!("verify-multi-{}", i), None)
1234 .await
1235 .unwrap();
1236 collection
1237 .insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
1238 .await
1239 .unwrap();
1240 }
1241
1242 // Verify all should succeed
1243 let result = store.verify_all_collections().await;
1244 assert!(result.is_ok());
1245
1246 let issues = result.unwrap();
1247 // All collections should have passed verification (no critical issues)
1248 for (name, collection_issues) in &issues {
1249 for issue in collection_issues {
1250 assert!(
1251 !issue.is_critical || issue.description.is_empty(),
1252 "Collection {} has critical issue: {}",
1253 name,
1254 issue.description
1255 );
1256 }
1257 }
1258 }
1259
1260 #[tokio::test]
1261 async fn test_wal_ops_checkpoint_all_with_empty_store() {
1262 // Test checkpoint_all_collections on empty store
1263 let temp_dir = tempdir().unwrap();
1264 let store = Store::new_with_config(
1265 temp_dir.path().to_path_buf(),
1266 None,
1267 StoreWalConfig::default(),
1268 )
1269 .await
1270 .unwrap();
1271
1272 // Checkpoint all should succeed on empty store
1273 let result = store.checkpoint_all_collections().await;
1274 assert!(result.is_ok());
1275 }
1276
1277 #[tokio::test]
1278 async fn test_wal_ops_recover_all_with_empty_store() {
1279 // Test recover_all_collections on empty store
1280 let temp_dir = tempdir().unwrap();
1281 let store = Store::new_with_config(
1282 temp_dir.path().to_path_buf(),
1283 None,
1284 StoreWalConfig::default(),
1285 )
1286 .await
1287 .unwrap();
1288
1289 // Recover all should succeed on empty store
1290 let result = store.recover_all_collections().await;
1291 assert!(result.is_ok());
1292
1293 let stats = result.unwrap();
1294 assert!(stats.is_empty());
1295 }
1296
1297 // ============ Additional "No WAL Manager" Branch Tests ============
1298
1299 #[tokio::test]
1300 async fn test_wal_ops_checkpoint_with_verification_options() {
1301 // Test checkpoint_wal with custom verification options
1302 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1303 let collection = collection_with_config(&store, &collection_name, None)
1304 .await
1305 .unwrap();
1306
1307 // Insert some data
1308 collection
1309 .insert("doc-1", serde_json::json!({"test": 1}))
1310 .await
1311 .unwrap();
1312
1313 // Checkpoint should work with verification enabled
1314 let result = collection.checkpoint_wal().await;
1315 assert!(result.is_ok());
1316 }
1317
1318 #[tokio::test]
1319 async fn test_wal_ops_stream_all_with_no_collections() {
1320 // Test stream_all_wal_entries when there are no collections
1321 let temp_dir = tempdir().unwrap();
1322 let store = Store::new_with_config(
1323 temp_dir.path().to_path_buf(),
1324 None,
1325 StoreWalConfig::default(),
1326 )
1327 .await
1328 .unwrap();
1329
1330 // Stream should return empty stream when no collections
1331 let stream = store.stream_all_wal_entries().await.unwrap();
1332 let entries: Vec<_> = stream.collect().await;
1333
1334 assert!(entries.is_empty());
1335 }
1336
1337 #[tokio::test]
1338 async fn test_wal_ops_verify_all_with_no_issues() {
1339 // Test verify_all_collections when all collections pass verification
1340 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1341
1342 // Create a collection and insert data
1343 let collection = collection_with_config(&store, "verify-pass", None)
1344 .await
1345 .unwrap();
1346 collection
1347 .insert("doc-1", serde_json::json!({"verify": true}))
1348 .await
1349 .unwrap();
1350
1351 // Verify all - should return empty map (no issues)
1352 let result = store.verify_all_collections().await;
1353 assert!(result.is_ok());
1354
1355 let issues = result.unwrap();
1356 // Collections with no issues should not be in the map
1357 assert!(issues.get("verify-pass").is_none() || issues.get("verify-pass").map_or(true, |v| v.is_empty()));
1358 }
1359
1360 #[tokio::test]
1361 async fn test_wal_ops_recover_all_with_partial_failures() {
1362 // Test recover_all_collections handles partial failures gracefully
1363 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1364
1365 // Create a collection with data
1366 let collection = collection_with_config(&store, "recover-test", None)
1367 .await
1368 .unwrap();
1369 collection
1370 .insert("doc-1", serde_json::json!({"recover": true}))
1371 .await
1372 .unwrap();
1373
1374 // Recovery should work even if some collections fail
1375 let result = store.recover_all_collections().await;
1376 assert!(result.is_ok());
1377
1378 let stats = result.unwrap();
1379 // Should have stats for our collection
1380 if let Some(_count) = stats.get("recover-test") {
1381 // Stats are present
1382 }
1383 }
1384
1385 #[tokio::test]
1386 async fn test_wal_ops_stream_entries_with_large_wal() {
1387 // Test streaming many WAL entries
1388 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1389 let collection = collection_with_config(&store, &collection_name, None)
1390 .await
1391 .unwrap();
1392
1393 // Insert many documents to create a larger WAL
1394 for i in 0 .. 50 {
1395 collection
1396 .insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
1397 .await
1398 .unwrap();
1399 }
1400
1401 // Stream entries
1402 let stream = collection.stream_wal_entries().await.unwrap();
1403 let entries: Vec<_> = stream.collect().await;
1404
1405 // Should have streamed all entries
1406 assert_eq!(entries.len(), 50);
1407 for entry in entries {
1408 assert!(entry.is_ok());
1409 }
1410 }
1411
1412 #[tokio::test]
1413 async fn test_wal_ops_verify_with_empty_wal() {
1414 // Test verify_against_wal on collection with empty WAL
1415 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1416 let collection = collection_with_config(&store, &collection_name, None)
1417 .await
1418 .unwrap();
1419
1420 // Insert a document (creates WAL entry)
1421 collection
1422 .insert("doc-1", serde_json::json!({"test": 1}))
1423 .await
1424 .unwrap();
1425
1426 // Verify should pass for valid state
1427 let result = collection.verify_against_wal().await;
1428 assert!(result.is_ok());
1429 }
1430
1431 #[tokio::test]
1432 async fn test_wal_ops_wal_entries_count_after_rotation() {
1433 // Test wal_entries_count after potential WAL rotation
1434 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1435 let collection = collection_with_config(&store, &collection_name, None)
1436 .await
1437 .unwrap();
1438
1439 // Get initial count
1440 let initial_count = collection.wal_entries_count().await.unwrap();
1441
1442 // Insert some data
1443 for i in 0 .. 5 {
1444 collection
1445 .insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
1446 .await
1447 .unwrap();
1448 }
1449
1450 // Get count after inserts
1451 let new_count = collection.wal_entries_count().await.unwrap();
1452 assert!(new_count >= initial_count + 5);
1453 }
1454
1455 #[tokio::test]
1456 async fn test_wal_ops_checkpoint_preserves_data() {
1457 // Test that checkpoint preserves data integrity
1458 let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1459 let collection = collection_with_config(&store, &collection_name, None)
1460 .await
1461 .unwrap();
1462
1463 // Insert data
1464 collection
1465 .insert("doc-1", serde_json::json!({"name": "Test", "value": 42}))
1466 .await
1467 .unwrap();
1468
1469 // Checkpoint
1470 collection.checkpoint_wal().await.unwrap();
1471
1472 // Data should still be accessible
1473 let doc = collection.get("doc-1").await.unwrap();
1474 assert!(doc.is_some());
1475 assert_eq!(doc.unwrap().data()["value"], 42);
1476 }
1477
1478 #[tokio::test]
1479 async fn test_wal_ops_stream_all_with_mixed_collections() {
1480 // Test stream_all_wal_entries with collections that have different WAL states
1481 let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1482
1483 // Create collections with different amounts of data
1484 for i in 0 .. 3 {
1485 let collection = collection_with_config(&store, &format!("stream-mixed-{}", i), None)
1486 .await
1487 .unwrap();
1488
1489 // Insert varying numbers of documents
1490 for j in 0 .. i + 1 {
1491 collection
1492 .insert(
1493 &format!("doc-{}", j),
1494 serde_json::json!({"collection": i, "doc": j}),
1495 )
1496 .await
1497 .unwrap();
1498 }
1499 }
1500
1501 // Stream all entries - should get entries from all collections
1502 let stream = store.stream_all_wal_entries().await.unwrap();
1503 let entries: Vec<_> = stream.collect().await;
1504
1505 // Should have entries from all 3 collections (1 + 2 + 3 = 6 entries)
1506 assert_eq!(entries.len(), 6);
1507
1508 // Verify entries come from different collections
1509 let collections: std::collections::HashSet<String> = entries
1510 .into_iter()
1511 .filter_map(|e| e.ok())
1512 .map(|(name, _)| name)
1513 .collect();
1514
1515 assert_eq!(collections.len(), 3);
1516 }
1517}