Skip to main content

sentinel_dbms/wal/
ops.rs

1//! Core WAL operations and trait definitions.
2//!
3//! This module provides Write-Ahead Logging (WAL) operations for both Store and Collection
4//! entities. WAL ensures data durability and consistency by logging operations before
5//! they are applied to the main data store.
6//!
7//! # Architecture
8//!
9//! The WAL system is organized into two layers:
10//! - **Low-level operations** in the `sentinel-wal` crate handle raw WAL file management
11//! - **High-level operations** in this module provide trait-based interfaces for Store and
12//!   Collection
13//!
14//! # Key Concepts
15//!
16//! - **Checkpoint**: Flushes accumulated WAL entries to the main data store and truncates the log
17//! - **Recovery**: Replays WAL entries to restore data consistency after a crash
18//! - **Verification**: Validates WAL integrity and consistency with the main data store
19//! - **Streaming**: Provides real-time access to WAL entries for monitoring and replication
20//!
21//! # Examples
22//!
23//! ## Basic WAL Operations on a Collection
24//!
25//! ```rust,no_run
26//! # use sentinel_dbms::{Store, Collection};
27//! # use futures::StreamExt;
28//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
29//! # let store = Store::new("/tmp/store", None).await?;
30//! # let collection = store.collection_with_config("users", None).await?;
31//! use sentinel_dbms::wal::ops::CollectionWalOps;
32//!
33//! // Insert some data
34//! collection.insert("user-123", serde_json::json!({"name": "Alice"})).await?;
35//!
36//! // Checkpoint the WAL to persist changes
37//! collection.checkpoint_wal().await?;
38//!
39//! // Get WAL statistics
40//! let size = collection.wal_size().await?;
41//! let count = collection.wal_entries_count().await?;
42//! println!("WAL size: {} bytes, entries: {}", size, count);
43//!
44//! // Stream WAL entries for monitoring
45//! let mut stream = collection.stream_wal_entries().await?;
46//! while let Some(entry) = stream.next().await {
47//!     let entry = entry?;
48//!     println!("WAL entry: {:?}", entry);
49//! }
50//! # Ok(())
51//! # }
52//! ```
53//!
54//! ## Store-level WAL Operations
55//!
56//! ```rust,no_run
57//! # use sentinel_dbms::Store;
58//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
59//! # let store = Store::new("/tmp/store", None).await?;
60//! use sentinel_dbms::wal::ops::StoreWalOps;
61//!
62//! // Checkpoint all collections
63//! store.checkpoint_all_collections().await?;
64//!
65//! // Verify all collections against their WALs
66//! let issues = store.verify_all_collections().await?;
67//! for (collection_name, collection_issues) in issues {
68//!     println!(
69//!         "Collection {} has {} issues",
70//!         collection_name,
71//!         collection_issues.len()
72//!     );
73//! }
74//!
75//! // Recover all collections from WAL
76//! let recovery_stats = store.recover_all_collections().await?;
77//! for (collection_name, operations) in recovery_stats {
78//!     println!(
79//!         "Recovered {} operations for {}",
80//!         operations, collection_name
81//!     );
82//! }
83//! # Ok(())
84//! # }
85//! ```
86
87use std::{collections::HashMap, pin::Pin};
88
89use async_trait::async_trait;
90use futures::{Stream, StreamExt as _};
91use tracing::{debug, error, info, warn};
92use sentinel_wal::{
93    recover_from_wal_safe,
94    verify_wal_consistency,
95    LogEntry,
96    WalRecoveryResult,
97    WalVerificationIssue,
98    WalVerificationResult,
99};
100
101use crate::{store::operations::collection_with_config, Collection, Store};
102
103/// Extension trait for Store to add WAL operations.
104///
105/// This trait provides high-level WAL operations that work across all collections
106/// in a store. Operations are performed sequentially on each collection to ensure
107/// consistency and avoid resource conflicts.
108///
109/// # Thread Safety
110///
111/// All operations are async and can be called concurrently, but the trait implementations
112/// handle internal synchronization through the Store's collection management.
113#[async_trait]
114pub trait StoreWalOps {
115    /// Perform a checkpoint on all collections in the store.
116    ///
117    /// This operation iterates through all collections and checkpoints each one's WAL,
118    /// ensuring all pending operations are flushed to the main data store. This is
119    /// typically called during maintenance windows or before backups.
120    ///
121    /// # Returns
122    ///
123    /// Returns `Ok(())` on success, or an error if any collection fails to checkpoint.
124    ///
125    /// # Examples
126    ///
127    /// ```rust,no_run
128    /// # use sentinel_dbms::Store;
129    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
130    /// # let store = Store::new("/tmp/store", None).await?;
131    /// use sentinel_dbms::wal::ops::StoreWalOps;
132    ///
133    /// store.checkpoint_all_collections().await?;
134    /// println!("All collections checkpointed successfully");
135    /// # Ok(())
136    /// # }
137    /// ```
138    async fn checkpoint_all_collections(&self) -> crate::Result<()>;
139
140    /// Stream WAL entries from all collections in the store.
141    ///
142    /// Creates a unified stream that yields WAL entries from all collections,
143    /// prefixed with the collection name. This is useful for monitoring, auditing,
144    /// and replication across the entire store.
145    ///
146    /// # Returns
147    ///
148    /// Returns a stream yielding `(collection_name, LogEntry)` tuples, or an error
149    /// if the collection list cannot be retrieved.
150    ///
151    /// # Examples
152    ///
153    /// ```rust,no_run
154    /// # use sentinel_dbms::Store;
155    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
156    /// # let store = Store::new("/tmp/store", None).await?;
157    /// use sentinel_dbms::wal::ops::StoreWalOps;
158    /// use futures::StreamExt;
159    ///
160    /// let mut stream = store.stream_all_wal_entries().await?;
161    /// while let Some(result) = stream.next().await {
162    ///     let (collection_name, entry) = result?;
163    ///     println!("Collection {}: {:?}", collection_name, entry);
164    /// }
165    /// # Ok(())
166    /// # }
167    /// ```
168    async fn stream_all_wal_entries(
169        &self,
170    ) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<(String, LogEntry)>> + Send + 'static>>>;
171
172    /// Verify all collections against their WAL files.
173    ///
174    /// Performs consistency checks on all collections to ensure WAL entries match
175    /// the current state of documents. Returns a map of collection names to any
176    /// verification issues found.
177    ///
178    /// # Returns
179    ///
180    /// Returns a `HashMap` where keys are collection names and values are vectors
181    /// of `WalVerificationIssue`s. Collections with no issues are not included.
182    ///
183    /// # Examples
184    ///
185    /// ```rust,no_run
186    /// # use sentinel_dbms::Store;
187    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
188    /// # let store = Store::new("/tmp/store", None).await?;
189    /// use sentinel_dbms::wal::ops::StoreWalOps;
190    ///
191    /// let issues = store.verify_all_collections().await?;
192    /// if issues.is_empty() {
193    ///     println!("All collections are consistent with their WALs");
194    /// }
195    /// else {
196    ///     for (collection_name, collection_issues) in issues {
197    ///         println!(
198    ///             "Collection {} has {} issues:",
199    ///             collection_name,
200    ///             collection_issues.len()
201    ///         );
202    ///         for issue in collection_issues {
203    ///             println!("  - {}", issue.description);
204    ///         }
205    ///     }
206    /// }
207    /// # Ok(())
208    /// # }
209    /// ```
210    async fn verify_all_collections(&self) -> crate::Result<HashMap<String, Vec<WalVerificationIssue>>>;
211
212    /// Recover all collections from their WAL files.
213    ///
214    /// Performs crash recovery on all collections by replaying WAL entries.
215    /// This is typically called during store initialization after an unclean shutdown.
216    ///
217    /// # Returns
218    ///
219    /// Returns a `HashMap` where keys are collection names and values are the number
220    /// of operations recovered for each collection.
221    ///
222    /// # Examples
223    ///
224    /// ```rust,no_run
225    /// # use sentinel_dbms::Store;
226    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
227    /// # let store = Store::new("/tmp/store", None).await?;
228    /// use sentinel_dbms::wal::ops::StoreWalOps;
229    ///
230    /// let recovery_stats = store.recover_all_collections().await?;
231    /// let total_operations: usize = recovery_stats.values().sum();
232    /// println!(
233    ///     "Recovered {} operations across {} collections",
234    ///     total_operations,
235    ///     recovery_stats.len()
236    /// );
237    /// # Ok(())
238    /// # }
239    /// ```
240    async fn recover_all_collections(&self) -> crate::Result<HashMap<String, usize>>;
241}
242
243/// Extension trait for Collection to add WAL operations.
244///
245/// This trait provides WAL operations specific to individual collections,
246/// including checkpointing, verification, recovery, and monitoring capabilities.
247///
248/// # Thread Safety
249///
250/// All operations are async and work with the collection's internal locking mechanisms.
251#[async_trait]
252pub trait CollectionWalOps {
253    /// Perform a checkpoint on this collection's WAL.
254    ///
255    /// Flushes all pending WAL entries to the main data store and truncates the WAL file.
256    /// This operation ensures durability and can help manage WAL file size.
257    ///
258    /// # Returns
259    ///
260    /// Returns `Ok(())` on success, or an error if the checkpoint operation fails.
261    ///
262    /// # Examples
263    ///
264    /// ```rust,no_run
265    /// # use sentinel_dbms::{Store, Collection};
266    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
267    /// # let store = Store::new("/tmp/store", None).await?;
268    /// # let collection = store.collection_with_config("users", None).await?;
269    /// use sentinel_dbms::wal::ops::CollectionWalOps;
270    ///
271    /// // Perform operations
272    /// collection.insert("user-123", serde_json::json!({"name": "Alice"})).await?;
273    /// collection.update("user-123", serde_json::json!({"name": "Alice", "age": 30})).await?;
274    ///
275    /// // Checkpoint to persist changes
276    /// collection.checkpoint_wal().await?;
277    /// println!("WAL checkpoint completed");
278    /// # Ok(())
279    /// # }
280    /// ```
281    async fn checkpoint_wal(&self) -> crate::Result<()>;
282
283    /// Stream WAL entries for this collection.
284    ///
285    /// Creates a stream that yields all current WAL entries for the collection.
286    /// This is useful for monitoring recent operations, auditing, or replication.
287    ///
288    /// # Returns
289    ///
290    /// Returns a stream yielding `LogEntry` items, or an error if WAL access fails.
291    ///
292    /// # Examples
293    ///
294    /// ```rust,no_run
295    /// # use sentinel_dbms::{Store, Collection};
296    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
297    /// # let store = Store::new("/tmp/store", None).await?;
298    /// # let collection = store.collection_with_config("users", None).await?;
299    /// use sentinel_dbms::wal::ops::CollectionWalOps;
300    /// use futures::StreamExt;
301    ///
302    /// let mut stream = collection.stream_wal_entries().await?;
303    /// let mut operation_count = 0;
304    /// while let Some(result) = stream.next().await {
305    ///     let entry = result?;
306    ///     operation_count += 1;
307    ///     println!("Operation {}: {:?}", operation_count, entry.entry_type);
308    /// }
309    /// println!("Total operations in WAL: {}", operation_count);
310    /// # Ok(())
311    /// # }
312    /// ```
313    async fn stream_wal_entries(self) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>>>;
314
315    /// Verify this collection against its WAL file.
316    ///
317    /// Performs consistency checks to ensure WAL entries match the current state
318    /// of documents in the collection. This helps detect corruption or inconsistencies.
319    ///
320    /// # Returns
321    ///
322    /// Returns a `WalVerificationResult` containing verification statistics and any issues found.
323    ///
324    /// # Examples
325    ///
326    /// ```rust,no_run
327    /// # use sentinel_dbms::{Store, Collection};
328    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
329    /// # let store = Store::new("/tmp/store", None).await?;
330    /// # let collection = store.collection_with_config("users", None).await?;
331    /// use sentinel_dbms::wal::ops::CollectionWalOps;
332    ///
333    /// let result = collection.verify_against_wal().await?;
334    /// println!(
335    ///     "Verification result: {}",
336    ///     if result.passed { "PASSED" } else { "FAILED" }
337    /// );
338    /// println!("Entries processed: {}", result.entries_processed);
339    /// println!("Issues found: {}", result.issues.len());
340    ///
341    /// for issue in &result.issues {
342    ///     println!("Issue: {}", issue.description);
343    /// }
344    /// # Ok(())
345    /// # }
346    /// ```
347    async fn verify_against_wal(&self) -> crate::Result<WalVerificationResult>;
348
349    /// Recover this collection from its WAL file.
350    ///
351    /// Replays WAL entries to restore the collection to a consistent state after
352    /// a crash or unclean shutdown. This operation is safe and will not overwrite
353    /// newer data.
354    ///
355    /// # Returns
356    ///
357    /// Returns a `WalRecoveryResult` with recovery statistics and any failures encountered.
358    ///
359    /// # Examples
360    ///
361    /// ```rust,no_run
362    /// # use sentinel_dbms::{Store, Collection};
363    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
364    /// # let store = Store::new("/tmp/store", None).await?;
365    /// # let collection = store.collection_with_config("users", None).await?;
366    /// use sentinel_dbms::wal::ops::CollectionWalOps;
367    ///
368    /// let result = collection.recover_from_wal().await?;
369    /// println!("Recovery completed:");
370    /// println!("  Operations recovered: {}", result.recovered_operations);
371    /// println!("  Operations skipped: {}", result.skipped_operations);
372    /// println!("  Operations failed: {}", result.failed_operations);
373    ///
374    /// if !result.failures.is_empty() {
375    ///     println!("Recovery failures:");
376    ///     for failure in &result.failures {
377    ///         println!("  - {:?}", failure);
378    ///     }
379    /// }
380    /// # Ok(())
381    /// # }
382    /// ```
383    async fn recover_from_wal(&self) -> crate::Result<WalRecoveryResult>;
384
385    /// Get the current WAL size in bytes.
386    ///
387    /// Returns the size of the WAL file on disk. This can be used to monitor
388    /// WAL growth and determine when checkpointing might be beneficial.
389    ///
390    /// # Returns
391    ///
392    /// Returns the WAL file size in bytes, or 0 if no WAL is configured.
393    ///
394    /// # Examples
395    ///
396    /// ```rust,no_run
397    /// # use sentinel_dbms::{Store, Collection};
398    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
399    /// # let store = Store::new("/tmp/store", None).await?;
400    /// # let collection = store.collection_with_config("users", None).await?;
401    /// use sentinel_dbms::wal::ops::CollectionWalOps;
402    ///
403    /// let size_bytes = collection.wal_size().await?;
404    /// let size_mb = size_bytes as f64 / (1024.0 * 1024.0);
405    /// println!("WAL size: {:.2} MB", size_mb);
406    ///
407    /// if size_bytes > 100 * 1024 * 1024 {
408    ///     // 100 MB
409    ///     println!("WAL is getting large, consider checkpointing");
410    ///     collection.checkpoint_wal().await?;
411    /// }
412    /// # Ok(())
413    /// # }
414    /// ```
415    async fn wal_size(&self) -> crate::Result<u64>;
416
417    /// Get the number of entries in the WAL.
418    ///
419    /// Returns the count of operations logged in the WAL. This can be used to
420    /// monitor operation frequency and determine checkpoint timing.
421    ///
422    /// # Returns
423    ///
424    /// Returns the number of WAL entries, or 0 if no WAL is configured.
425    ///
426    /// # Examples
427    ///
428    /// ```rust,no_run
429    /// # use sentinel_dbms::{Store, Collection};
430    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
431    /// # let store = Store::new("/tmp/store", None).await?;
432    /// # let collection = store.collection_with_config("users", None).await?;
433    /// use sentinel_dbms::wal::ops::CollectionWalOps;
434    ///
435    /// let count = collection.wal_entries_count().await?;
436    /// println!("WAL contains {} entries", count);
437    ///
438    /// if count > 1000 {
439    ///     println!("Many pending operations, checkpointing...");
440    ///     collection.checkpoint_wal().await?;
441    /// }
442    /// # Ok(())
443    /// # }
444    /// ```
445    async fn wal_entries_count(&self) -> crate::Result<usize>;
446}
447
448#[async_trait]
449impl StoreWalOps for Store {
450    async fn checkpoint_all_collections(&self) -> crate::Result<()> {
451        let collections = self.list_collections().await?;
452        info!("Starting checkpoint for {} collections", collections.len());
453
454        for collection_name in collections {
455            debug!("Checkpointing collection: {}", collection_name);
456            let collection = collection_with_config(self, &collection_name, None).await?;
457            CollectionWalOps::checkpoint_wal(&collection).await?;
458        }
459
460        info!("Checkpoint completed for all collections");
461        Ok(())
462    }
463
464    async fn stream_all_wal_entries(
465        &self,
466    ) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<(String, LogEntry)>> + Send>>> {
467        let collection_names = self.list_collections().await?;
468        debug!(
469            "Streaming WAL entries from {} collections",
470            collection_names.len()
471        );
472
473        // Collect all collections to avoid borrowing self in the stream
474        let mut collections = Vec::new();
475        for name in collection_names {
476            match collection_with_config(self, &name, None).await {
477                Ok(collection) => collections.push(collection),
478                Err(e) => warn!("Failed to load collection {}: {}", name, e),
479            }
480        }
481
482        let stream_of_streams = futures::stream::iter(collections).filter_map(|collection| {
483            async move {
484                let name = collection.name().to_owned();
485                CollectionWalOps::stream_wal_entries(collection)
486                    .await
487                    .map_or_else(
488                        |_| None,
489                        |stream| Some(stream.map(move |entry| entry.map(|e| (name.clone(), e)))),
490                    )
491            }
492        });
493
494        let combined_stream = stream_of_streams.flatten();
495        Ok(Box::pin(combined_stream))
496    }
497
498    async fn verify_all_collections(&self) -> crate::Result<HashMap<String, Vec<WalVerificationIssue>>> {
499        let collections = self.list_collections().await?;
500        info!(
501            "Starting WAL verification for {} collections",
502            collections.len()
503        );
504
505        let mut results = HashMap::new();
506        let mut total_issues: usize = 0;
507
508        for collection_name in collections {
509            debug!("Verifying collection: {}", collection_name);
510            let collection = collection_with_config(self, &collection_name, None).await?;
511            match CollectionWalOps::verify_against_wal(&collection).await {
512                Ok(verification_result) => {
513                    if !verification_result.issues.is_empty() {
514                        let issue_count = verification_result.issues.len();
515                        total_issues = total_issues
516                            .checked_add(issue_count)
517                            .unwrap_or(total_issues);
518                        results.insert(collection_name.clone(), verification_result.issues);
519                        warn!(
520                            "Collection {} has {} verification issues",
521                            collection_name, issue_count
522                        );
523                    }
524                    else {
525                        debug!("Collection {} verification passed", collection_name);
526                    }
527                },
528                Err(e) => {
529                    error!("Failed to verify collection {}: {}", collection_name, e);
530                    results.insert(
531                        collection_name.clone(),
532                        vec![WalVerificationIssue {
533                            transaction_id: "unknown".to_owned(),
534                            document_id:    "unknown".to_owned(),
535                            description:    format!("Verification failed: {}", e),
536                            is_critical:    true,
537                        }],
538                    );
539                    total_issues = total_issues.checked_add(1).unwrap_or(total_issues);
540                },
541            }
542        }
543
544        if total_issues > 0 {
545            warn!(
546                "WAL verification completed with {} total issues across {} collections",
547                total_issues,
548                results.len()
549            );
550        }
551        else {
552            info!("WAL verification completed successfully - no issues found");
553        }
554
555        Ok(results)
556    }
557
558    async fn recover_all_collections(&self) -> crate::Result<HashMap<String, usize>> {
559        let collections = self.list_collections().await?;
560        info!(
561            "Starting WAL recovery for {} collections",
562            collections.len()
563        );
564
565        let mut results = HashMap::new();
566        let mut total_operations: usize = 0;
567
568        for collection_name in collections {
569            debug!("Recovering collection: {}", collection_name);
570            let collection = collection_with_config(self, &collection_name, None).await?;
571            match CollectionWalOps::recover_from_wal(&collection).await {
572                Ok(recovery_result) => {
573                    let operations = recovery_result.recovered_operations;
574                    results.insert(collection_name.clone(), operations);
575                    total_operations = total_operations
576                        .checked_add(operations)
577                        .unwrap_or(total_operations);
578                    if operations > 0 {
579                        info!(
580                            "Recovered {} operations for collection {}",
581                            operations, collection_name
582                        );
583                    }
584                    else {
585                        debug!("No recovery needed for collection {}", collection_name);
586                    }
587                },
588                Err(e) => {
589                    error!("Failed to recover collection {}: {}", collection_name, e);
590                    return Err(e);
591                },
592            }
593        }
594
595        info!(
596            "WAL recovery completed - {} total operations recovered across {} collections",
597            total_operations,
598            results.len()
599        );
600        Ok(results)
601    }
602}
603
604#[async_trait]
605impl CollectionWalOps for Collection {
606    async fn checkpoint_wal(&self) -> crate::Result<()> {
607        if let Some(wal) = self.wal_manager.as_ref() {
608            debug!("Starting WAL checkpoint for collection {}", self.name());
609            wal.checkpoint().await?;
610            info!("WAL checkpoint completed for collection {}", self.name());
611        }
612        else {
613            debug!("No WAL manager configured for collection {}", self.name());
614        }
615        Ok(())
616    }
617
618    async fn stream_wal_entries(self) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>>> {
619        self.wal_manager.as_ref().map_or_else(
620            || {
621                debug!(
622                    "No WAL manager configured for collection {}, returning empty stream",
623                    self.name()
624                );
625                Ok(Box::pin(
626                    futures::stream::empty::<std::result::Result<LogEntry, sentinel_wal::WalError>>()
627                        .map(|r| r.map_err(Into::into)),
628                )
629                    as Pin<
630                        Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>,
631                    >)
632            },
633            |wal| {
634                let name = self.name().to_owned();
635                debug!("Streaming WAL entries for collection {}", name);
636                let stream = wal
637                    .stream_entries()
638                    .map(|result| result.map_err(crate::error::SentinelError::from));
639                let _wal = wal.clone();
640                // let stream = wal.stream_entries().map(|result| result.map_err(|e|
641                // crate::error::SentinelError::from(e)));
642                let stream = Box::pin(stream) as Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send + 'static>>;
643                Ok(stream)
644            },
645        )
646    }
647
648    async fn verify_against_wal(&self) -> crate::Result<WalVerificationResult> {
649        if let Some(wal) = self.wal_manager.as_ref() {
650            debug!("Starting WAL verification for collection {}", self.name());
651            let result = verify_wal_consistency(wal, self).await?;
652            if result.passed {
653                info!(
654                    "WAL verification passed for collection {} ({} entries processed)",
655                    self.name(),
656                    result.entries_processed
657                );
658            }
659            else {
660                warn!(
661                    "WAL verification failed for collection {}: {} issues found",
662                    self.name(),
663                    result.issues.len()
664                );
665                for issue in &result.issues {
666                    warn!("  Verification issue: {}", issue.description);
667                }
668            }
669            Ok(result)
670        }
671        else {
672            debug!(
673                "No WAL manager configured for collection {}, skipping verification",
674                self.name()
675            );
676            Ok(WalVerificationResult {
677                issues:             vec![],
678                passed:             true,
679                entries_processed:  0,
680                affected_documents: 0,
681            })
682        }
683    }
684
685    async fn recover_from_wal(&self) -> crate::Result<WalRecoveryResult> {
686        if let Some(wal) = self.wal_manager.as_ref() {
687            info!("Starting WAL recovery for collection {}", self.name());
688            let result = recover_from_wal_safe(wal, self).await?;
689            info!(
690                "WAL recovery completed for collection {}: {} operations recovered, {} skipped, {} failed",
691                self.name(),
692                result.recovered_operations,
693                result.skipped_operations,
694                result.failed_operations
695            );
696
697            if !result.failures.is_empty() {
698                warn!("Recovery failures for collection {}:", self.name());
699                for failure in &result.failures {
700                    warn!("  - {:?}", failure);
701                }
702            }
703            Ok(result)
704        }
705        else {
706            debug!(
707                "No WAL manager configured for collection {}, skipping recovery",
708                self.name()
709            );
710            Ok(WalRecoveryResult {
711                recovered_operations: 0,
712                skipped_operations:   0,
713                failed_operations:    0,
714                failures:             vec![],
715            })
716        }
717    }
718
719    async fn wal_size(&self) -> crate::Result<u64> {
720        if let Some(wal) = self.wal_manager.as_ref() {
721            let size = wal.size().await?;
722            debug!("WAL size for collection {}: {} bytes", self.name(), size);
723            Ok(size)
724        }
725        else {
726            debug!("No WAL manager configured for collection {}", self.name());
727            Ok(0)
728        }
729    }
730
731    async fn wal_entries_count(&self) -> crate::Result<usize> {
732        if let Some(wal) = self.wal_manager.as_ref() {
733            let count = wal.entries_count().await?;
734            debug!(
735                "WAL entries count for collection {}: {}",
736                self.name(),
737                count
738            );
739            Ok(count)
740        }
741        else {
742            debug!("No WAL manager configured for collection {}", self.name());
743            Ok(0)
744        }
745    }
746}
747
748#[cfg(test)]
749mod tests {
750    use tempfile::tempdir;
751    use sentinel_wal::StoreWalConfig;
752
753    use super::*;
754    use crate::Store;
755
756    /// Helper to create a test store with a collection
757    async fn create_test_store_with_collection() -> (tempfile::TempDir, Store, String) {
758        let temp_dir = tempdir().unwrap();
759        let store = Store::new_with_config(
760            temp_dir.path().to_path_buf(),
761            None,
762            StoreWalConfig::default(),
763        )
764        .await
765        .unwrap();
766        let collection_name = "test_wal_collection".to_string();
767        let _ = collection_with_config(&store, &collection_name, None)
768            .await
769            .unwrap();
770        (temp_dir, store, collection_name)
771    }
772
773    // ============ CollectionWalOps Tests ============
774
775    #[tokio::test]
776    async fn test_checkpoint_wal_with_wal_manager() {
777        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
778        let collection = collection_with_config(&store, &collection_name, None)
779            .await
780            .unwrap();
781
782        // Insert some data to create WAL entries
783        collection
784            .insert("doc-1", serde_json::json!({"test": 1}))
785            .await
786            .unwrap();
787        collection
788            .insert("doc-2", serde_json::json!({"test": 2}))
789            .await
790            .unwrap();
791
792        // Checkpoint should succeed
793        let result = collection.checkpoint_wal().await;
794        assert!(result.is_ok());
795    }
796
797    #[tokio::test]
798    async fn test_checkpoint_wal_without_wal_manager() {
799        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
800        let collection = collection_with_config(&store, &collection_name, None)
801            .await
802            .unwrap();
803
804        // Create collection without WAL config - checkpoint should still succeed (no-op)
805        // The default collection may not have a WAL manager configured
806        let result = collection.checkpoint_wal().await;
807        assert!(result.is_ok());
808    }
809
810    #[tokio::test]
811    async fn test_stream_wal_entries_with_data() {
812        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
813        let collection = collection_with_config(&store, &collection_name, None)
814            .await
815            .unwrap();
816
817        // Insert some data
818        collection
819            .insert("doc-1", serde_json::json!({"name": "Test1"}))
820            .await
821            .unwrap();
822        collection
823            .insert("doc-2", serde_json::json!({"name": "Test2"}))
824            .await
825            .unwrap();
826
827        // Stream entries
828        let stream = collection.stream_wal_entries().await.unwrap();
829
830        // Collect entries
831        let entries: Vec<_> = stream.collect().await;
832        assert!(!entries.is_empty());
833
834        // Each entry should be Ok
835        for entry in entries {
836            assert!(entry.is_ok());
837        }
838    }
839
840    #[tokio::test]
841    async fn test_stream_wal_entries_empty() {
842        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
843        let collection = collection_with_config(&store, &collection_name, None)
844            .await
845            .unwrap();
846
847        // Stream on empty collection
848        let stream = collection.stream_wal_entries().await.unwrap();
849        let entries: Vec<_> = stream.collect().await;
850
851        // May be empty or have entries depending on WAL config
852        // Just verify it doesn't error
853        assert!(entries.is_empty() || entries.iter().all(|e| e.is_ok()));
854    }
855
856    #[tokio::test]
857    async fn test_verify_against_wal() {
858        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
859        let collection = collection_with_config(&store, &collection_name, None)
860            .await
861            .unwrap();
862
863        // Insert some data
864        collection
865            .insert("doc-1", serde_json::json!({"verify": true}))
866            .await
867            .unwrap();
868
869        // Verify should succeed
870        let result = collection.verify_against_wal().await;
871        assert!(result.is_ok());
872
873        let verification = result.unwrap();
874        // If there are issues, verification didn't fully pass
875        assert!(verification.passed || verification.issues.is_empty());
876    }
877
878    #[tokio::test]
879    async fn test_recover_from_wal() {
880        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
881        let collection = collection_with_config(&store, &collection_name, None)
882            .await
883            .unwrap();
884
885        // Insert some data
886        collection
887            .insert("doc-to-recover", serde_json::json!({"data": "test"}))
888            .await
889            .unwrap();
890
891        // Recovery should succeed (even if no recovery needed)
892        let result = collection.recover_from_wal().await;
893        assert!(result.is_ok());
894    }
895
896    #[tokio::test]
897    async fn test_wal_size() {
898        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
899        let collection = collection_with_config(&store, &collection_name, None)
900            .await
901            .unwrap();
902
903        // Get initial size
904        let initial_size = collection.wal_size().await.unwrap();
905
906        // Insert data
907        collection
908            .insert("doc-for-size", serde_json::json!({"size": "test data"}))
909            .await
910            .unwrap();
911
912        // Get size after insert
913        let new_size = collection.wal_size().await.unwrap();
914        assert!(new_size >= initial_size);
915    }
916
917    #[tokio::test]
918    async fn test_wal_entries_count() {
919        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
920        let collection = collection_with_config(&store, &collection_name, None)
921            .await
922            .unwrap();
923
924        // Get initial count
925        let initial_count = collection.wal_entries_count().await.unwrap();
926
927        // Insert some data
928        collection
929            .insert("doc-1", serde_json::json!({"count": 1}))
930            .await
931            .unwrap();
932        collection
933            .insert("doc-2", serde_json::json!({"count": 2}))
934            .await
935            .unwrap();
936
937        // Get count after inserts
938        let new_count = collection.wal_entries_count().await.unwrap();
939        assert!(new_count >= initial_count);
940    }
941
942    // ============ StoreWalOps Tests ============
943
944    #[tokio::test]
945    async fn test_checkpoint_all_collections() {
946        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
947
948        // Insert some data
949        let collection = collection_with_config(&store, "test1", None).await.unwrap();
950        collection
951            .insert("doc-1", serde_json::json!({"test": 1}))
952            .await
953            .unwrap();
954
955        let collection2 = collection_with_config(&store, "test2", None).await.unwrap();
956        collection2
957            .insert("doc-2", serde_json::json!({"test": 2}))
958            .await
959            .unwrap();
960
961        // Checkpoint all should succeed
962        let result = store.checkpoint_all_collections().await;
963        assert!(result.is_ok());
964    }
965
966    #[tokio::test]
967    async fn test_stream_all_wal_entries() {
968        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
969
970        // Insert some data in multiple collections
971        let collection1 = collection_with_config(&store, "stream-collection-1", None)
972            .await
973            .unwrap();
974        collection1
975            .insert("doc-1", serde_json::json!({"stream": 1}))
976            .await
977            .unwrap();
978
979        let collection2 = collection_with_config(&store, "stream-collection-2", None)
980            .await
981            .unwrap();
982        collection2
983            .insert("doc-2", serde_json::json!({"stream": 2}))
984            .await
985            .unwrap();
986
987        // Stream all entries
988        let stream = store.stream_all_wal_entries().await.unwrap();
989        let entries: Vec<_> = stream.collect().await;
990
991        // Should have entries from both collections
992        // Each entry is (collection_name, LogEntry)
993        for entry in entries {
994            assert!(entry.is_ok());
995            let (_name, log_entry) = entry.unwrap();
996            // Verify it's a valid log entry
997            assert!(!log_entry.document_id.is_empty());
998        }
999    }
1000
1001    #[tokio::test]
1002    async fn test_verify_all_collections() {
1003        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1004
1005        // Create multiple collections with data
1006        let collection1 = collection_with_config(&store, "verify-1", None)
1007            .await
1008            .unwrap();
1009        collection1
1010            .insert("doc-1", serde_json::json!({"verify": 1}))
1011            .await
1012            .unwrap();
1013
1014        let collection2 = collection_with_config(&store, "verify-2", None)
1015            .await
1016            .unwrap();
1017        collection2
1018            .insert("doc-2", serde_json::json!({"verify": 2}))
1019            .await
1020            .unwrap();
1021
1022        // Verify all should succeed
1023        let result = store.verify_all_collections().await;
1024        assert!(result.is_ok());
1025
1026        let issues = result.unwrap();
1027        // Both collections should have passed verification
1028        for (name, collection_issues) in &issues {
1029            for issue in collection_issues {
1030                assert!(!issue.is_critical || issue.description.is_empty());
1031            }
1032            // Debug output
1033            if !collection_issues.is_empty() {
1034                eprintln!("Collection {} has {} issues", name, collection_issues.len());
1035            }
1036        }
1037    }
1038
1039    #[tokio::test]
1040    async fn test_recover_all_collections() {
1041        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1042
1043        // Create collections with data
1044        let collection1 = collection_with_config(&store, "recover-1", None)
1045            .await
1046            .unwrap();
1047        collection1
1048            .insert("doc-1", serde_json::json!({"recover": 1}))
1049            .await
1050            .unwrap();
1051
1052        let collection2 = collection_with_config(&store, "recover-2", None)
1053            .await
1054            .unwrap();
1055        collection2
1056            .insert("doc-2", serde_json::json!({"recover": 2}))
1057            .await
1058            .unwrap();
1059
1060        // Recover all should succeed
1061        let result = store.recover_all_collections().await;
1062        assert!(result.is_ok());
1063
1064        let recovery_stats = result.unwrap();
1065        // Should have entries for the test collections (may include others)
1066        assert!(
1067            recovery_stats.len() >= 2,
1068            "Expected at least 2 collections, got {}",
1069            recovery_stats.len()
1070        );
1071
1072        for (name, operations) in &recovery_stats {
1073            eprintln!("Collection {} recovered {} operations", name, operations);
1074        }
1075    }
1076
1077    // ============ Edge Case Tests ============
1078
1079    #[tokio::test]
1080    async fn test_wal_operations_on_empty_store() {
1081        let temp_dir = tempdir().unwrap();
1082        let store = Store::new_with_config(
1083            temp_dir.path().to_path_buf(),
1084            None,
1085            StoreWalConfig::default(),
1086        )
1087        .await
1088        .unwrap();
1089
1090        // Verify empty store
1091        let result = store.verify_all_collections().await;
1092        assert!(result.is_ok());
1093        let issues = result.unwrap();
1094        assert!(issues.is_empty());
1095
1096        // Recover empty store
1097        let result = store.recover_all_collections().await;
1098        assert!(result.is_ok());
1099        let stats = result.unwrap();
1100        assert!(stats.is_empty());
1101
1102        // Stream empty store
1103        let stream = store.stream_all_wal_entries().await.unwrap();
1104        let entries: Vec<_> = stream.collect().await;
1105        assert!(entries.is_empty());
1106    }
1107
1108    #[tokio::test]
1109    async fn test_checkpoint_empty_collection() {
1110        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1111        let collection = collection_with_config(&store, &collection_name, None)
1112            .await
1113            .unwrap();
1114
1115        // Checkpoint empty collection should succeed
1116        let result = collection.checkpoint_wal().await;
1117        assert!(result.is_ok());
1118    }
1119
1120    // ============ Edge Case Tests - Additional Coverage ============
1121
1122    #[tokio::test]
1123    async fn test_wal_ops_stream_entries_with_verify_all() {
1124        // Test streaming entries with verify_all option
1125        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1126        let collection = collection_with_config(&store, &collection_name, None)
1127            .await
1128            .unwrap();
1129
1130        // Insert some data
1131        collection
1132            .insert("verify-doc", serde_json::json!({"verify": "test"}))
1133            .await
1134            .unwrap();
1135
1136        // Stream and verify entries
1137        let stream = store.stream_all_wal_entries().await.unwrap();
1138        let entries: Vec<_> = stream.collect().await;
1139
1140        // Should have entries that can be verified
1141        assert!(!entries.is_empty());
1142        for entry in entries {
1143            // Each entry is a Result, so we need to handle both ok and err cases
1144            if let Ok(result) = entry {
1145                assert!(!result.1.document_id.is_empty());
1146            }
1147        }
1148    }
1149
1150    #[tokio::test]
1151    async fn test_wal_ops_verify_collection_with_no_wal_manager() {
1152        // Test verify_against_wal when no WAL manager is configured
1153        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1154        let collection = collection_with_config(&store, &collection_name, None)
1155            .await
1156            .unwrap();
1157
1158        // Verify should return empty result (passed) when no WAL
1159        let result = collection.verify_against_wal().await;
1160        assert!(result.is_ok());
1161
1162        let verification = result.unwrap();
1163        assert!(verification.passed);
1164        assert_eq!(verification.entries_processed, 0);
1165    }
1166
1167    #[tokio::test]
1168    async fn test_wal_ops_recover_from_wal_with_no_wal_manager() {
1169        // Test recover_from_wal when no WAL manager is configured
1170        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1171        let collection = collection_with_config(&store, &collection_name, None)
1172            .await
1173            .unwrap();
1174
1175        // Recovery should return empty result when no WAL
1176        let result = collection.recover_from_wal().await;
1177        assert!(result.is_ok());
1178
1179        let recovery = result.unwrap();
1180        assert_eq!(recovery.recovered_operations, 0);
1181        assert_eq!(recovery.skipped_operations, 0);
1182        assert_eq!(recovery.failed_operations, 0);
1183    }
1184
1185    #[tokio::test]
1186    async fn test_wal_ops_wal_size_with_no_wal_manager() {
1187        // Test wal_size when no WAL manager is configured
1188        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1189        let collection = collection_with_config(&store, &collection_name, None)
1190            .await
1191            .unwrap();
1192
1193        // Size should return 0 when no WAL
1194        let size = collection.wal_size().await.unwrap();
1195        assert_eq!(size, 0);
1196    }
1197
1198    #[tokio::test]
1199    async fn test_wal_ops_wal_entries_count_with_no_wal_manager() {
1200        // Test wal_entries_count when no WAL manager is configured
1201        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1202        let collection = collection_with_config(&store, &collection_name, None)
1203            .await
1204            .unwrap();
1205
1206        // Count should return 0 when no WAL
1207        let count = collection.wal_entries_count().await.unwrap();
1208        assert_eq!(count, 0);
1209    }
1210
1211    #[tokio::test]
1212    async fn test_wal_ops_stream_wal_entries_with_no_wal_manager() {
1213        // Test stream_wal_entries when no WAL manager is configured
1214        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1215        let collection = collection_with_config(&store, &collection_name, None)
1216            .await
1217            .unwrap();
1218
1219        // Stream should return empty stream when no WAL
1220        let stream = collection.stream_wal_entries().await.unwrap();
1221        let entries: Vec<_> = stream.collect().await;
1222
1223        assert!(entries.is_empty());
1224    }
1225
1226    #[tokio::test]
1227    async fn test_wal_ops_verify_all_with_mixed_collections() {
1228        // Test verify_all_collections with multiple collections
1229        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1230
1231        // Create collections with varying data
1232        for i in 0 .. 3 {
1233            let collection = collection_with_config(&store, &format!("verify-multi-{}", i), None)
1234                .await
1235                .unwrap();
1236            collection
1237                .insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
1238                .await
1239                .unwrap();
1240        }
1241
1242        // Verify all should succeed
1243        let result = store.verify_all_collections().await;
1244        assert!(result.is_ok());
1245
1246        let issues = result.unwrap();
1247        // All collections should have passed verification (no critical issues)
1248        for (name, collection_issues) in &issues {
1249            for issue in collection_issues {
1250                assert!(
1251                    !issue.is_critical || issue.description.is_empty(),
1252                    "Collection {} has critical issue: {}",
1253                    name,
1254                    issue.description
1255                );
1256            }
1257        }
1258    }
1259
1260    #[tokio::test]
1261    async fn test_wal_ops_checkpoint_all_with_empty_store() {
1262        // Test checkpoint_all_collections on empty store
1263        let temp_dir = tempdir().unwrap();
1264        let store = Store::new_with_config(
1265            temp_dir.path().to_path_buf(),
1266            None,
1267            StoreWalConfig::default(),
1268        )
1269        .await
1270        .unwrap();
1271
1272        // Checkpoint all should succeed on empty store
1273        let result = store.checkpoint_all_collections().await;
1274        assert!(result.is_ok());
1275    }
1276
1277    #[tokio::test]
1278    async fn test_wal_ops_recover_all_with_empty_store() {
1279        // Test recover_all_collections on empty store
1280        let temp_dir = tempdir().unwrap();
1281        let store = Store::new_with_config(
1282            temp_dir.path().to_path_buf(),
1283            None,
1284            StoreWalConfig::default(),
1285        )
1286        .await
1287        .unwrap();
1288
1289        // Recover all should succeed on empty store
1290        let result = store.recover_all_collections().await;
1291        assert!(result.is_ok());
1292
1293        let stats = result.unwrap();
1294        assert!(stats.is_empty());
1295    }
1296
1297    // ============ Additional "No WAL Manager" Branch Tests ============
1298
1299    #[tokio::test]
1300    async fn test_wal_ops_checkpoint_with_verification_options() {
1301        // Test checkpoint_wal with custom verification options
1302        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1303        let collection = collection_with_config(&store, &collection_name, None)
1304            .await
1305            .unwrap();
1306
1307        // Insert some data
1308        collection
1309            .insert("doc-1", serde_json::json!({"test": 1}))
1310            .await
1311            .unwrap();
1312
1313        // Checkpoint should work with verification enabled
1314        let result = collection.checkpoint_wal().await;
1315        assert!(result.is_ok());
1316    }
1317
1318    #[tokio::test]
1319    async fn test_wal_ops_stream_all_with_no_collections() {
1320        // Test stream_all_wal_entries when there are no collections
1321        let temp_dir = tempdir().unwrap();
1322        let store = Store::new_with_config(
1323            temp_dir.path().to_path_buf(),
1324            None,
1325            StoreWalConfig::default(),
1326        )
1327        .await
1328        .unwrap();
1329
1330        // Stream should return empty stream when no collections
1331        let stream = store.stream_all_wal_entries().await.unwrap();
1332        let entries: Vec<_> = stream.collect().await;
1333
1334        assert!(entries.is_empty());
1335    }
1336
1337    #[tokio::test]
1338    async fn test_wal_ops_verify_all_with_no_issues() {
1339        // Test verify_all_collections when all collections pass verification
1340        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1341
1342        // Create a collection and insert data
1343        let collection = collection_with_config(&store, "verify-pass", None)
1344            .await
1345            .unwrap();
1346        collection
1347            .insert("doc-1", serde_json::json!({"verify": true}))
1348            .await
1349            .unwrap();
1350
1351        // Verify all - should return empty map (no issues)
1352        let result = store.verify_all_collections().await;
1353        assert!(result.is_ok());
1354
1355        let issues = result.unwrap();
1356        // Collections with no issues should not be in the map
1357        assert!(issues.get("verify-pass").is_none() || issues.get("verify-pass").map_or(true, |v| v.is_empty()));
1358    }
1359
1360    #[tokio::test]
1361    async fn test_wal_ops_recover_all_with_partial_failures() {
1362        // Test recover_all_collections handles partial failures gracefully
1363        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1364
1365        // Create a collection with data
1366        let collection = collection_with_config(&store, "recover-test", None)
1367            .await
1368            .unwrap();
1369        collection
1370            .insert("doc-1", serde_json::json!({"recover": true}))
1371            .await
1372            .unwrap();
1373
1374        // Recovery should work even if some collections fail
1375        let result = store.recover_all_collections().await;
1376        assert!(result.is_ok());
1377
1378        let stats = result.unwrap();
1379        // Should have stats for our collection
1380        if let Some(_count) = stats.get("recover-test") {
1381            // Stats are present
1382        }
1383    }
1384
1385    #[tokio::test]
1386    async fn test_wal_ops_stream_entries_with_large_wal() {
1387        // Test streaming many WAL entries
1388        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1389        let collection = collection_with_config(&store, &collection_name, None)
1390            .await
1391            .unwrap();
1392
1393        // Insert many documents to create a larger WAL
1394        for i in 0 .. 50 {
1395            collection
1396                .insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
1397                .await
1398                .unwrap();
1399        }
1400
1401        // Stream entries
1402        let stream = collection.stream_wal_entries().await.unwrap();
1403        let entries: Vec<_> = stream.collect().await;
1404
1405        // Should have streamed all entries
1406        assert_eq!(entries.len(), 50);
1407        for entry in entries {
1408            assert!(entry.is_ok());
1409        }
1410    }
1411
1412    #[tokio::test]
1413    async fn test_wal_ops_verify_with_empty_wal() {
1414        // Test verify_against_wal on collection with empty WAL
1415        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1416        let collection = collection_with_config(&store, &collection_name, None)
1417            .await
1418            .unwrap();
1419
1420        // Insert a document (creates WAL entry)
1421        collection
1422            .insert("doc-1", serde_json::json!({"test": 1}))
1423            .await
1424            .unwrap();
1425
1426        // Verify should pass for valid state
1427        let result = collection.verify_against_wal().await;
1428        assert!(result.is_ok());
1429    }
1430
1431    #[tokio::test]
1432    async fn test_wal_ops_wal_entries_count_after_rotation() {
1433        // Test wal_entries_count after potential WAL rotation
1434        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1435        let collection = collection_with_config(&store, &collection_name, None)
1436            .await
1437            .unwrap();
1438
1439        // Get initial count
1440        let initial_count = collection.wal_entries_count().await.unwrap();
1441
1442        // Insert some data
1443        for i in 0 .. 5 {
1444            collection
1445                .insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
1446                .await
1447                .unwrap();
1448        }
1449
1450        // Get count after inserts
1451        let new_count = collection.wal_entries_count().await.unwrap();
1452        assert!(new_count >= initial_count + 5);
1453    }
1454
1455    #[tokio::test]
1456    async fn test_wal_ops_checkpoint_preserves_data() {
1457        // Test that checkpoint preserves data integrity
1458        let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
1459        let collection = collection_with_config(&store, &collection_name, None)
1460            .await
1461            .unwrap();
1462
1463        // Insert data
1464        collection
1465            .insert("doc-1", serde_json::json!({"name": "Test", "value": 42}))
1466            .await
1467            .unwrap();
1468
1469        // Checkpoint
1470        collection.checkpoint_wal().await.unwrap();
1471
1472        // Data should still be accessible
1473        let doc = collection.get("doc-1").await.unwrap();
1474        assert!(doc.is_some());
1475        assert_eq!(doc.unwrap().data()["value"], 42);
1476    }
1477
1478    #[tokio::test]
1479    async fn test_wal_ops_stream_all_with_mixed_collections() {
1480        // Test stream_all_wal_entries with collections that have different WAL states
1481        let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
1482
1483        // Create collections with different amounts of data
1484        for i in 0 .. 3 {
1485            let collection = collection_with_config(&store, &format!("stream-mixed-{}", i), None)
1486                .await
1487                .unwrap();
1488
1489            // Insert varying numbers of documents
1490            for j in 0 .. i + 1 {
1491                collection
1492                    .insert(
1493                        &format!("doc-{}", j),
1494                        serde_json::json!({"collection": i, "doc": j}),
1495                    )
1496                    .await
1497                    .unwrap();
1498            }
1499        }
1500
1501        // Stream all entries - should get entries from all collections
1502        let stream = store.stream_all_wal_entries().await.unwrap();
1503        let entries: Vec<_> = stream.collect().await;
1504
1505        // Should have entries from all 3 collections (1 + 2 + 3 = 6 entries)
1506        assert_eq!(entries.len(), 6);
1507
1508        // Verify entries come from different collections
1509        let collections: std::collections::HashSet<String> = entries
1510            .into_iter()
1511            .filter_map(|e| e.ok())
1512            .map(|(name, _)| name)
1513            .collect();
1514
1515        assert_eq!(collections.len(), 3);
1516    }
1517}