use std::{collections::HashMap, pin::Pin};
use async_trait::async_trait;
use futures::{Stream, StreamExt as _};
use tracing::{debug, error, info, warn};
use sentinel_wal::{
recover_from_wal_safe,
verify_wal_consistency,
LogEntry,
WalRecoveryResult,
WalVerificationIssue,
WalVerificationResult,
};
use crate::{store::operations::collection_with_config, Collection, Store};
#[async_trait]
pub trait StoreWalOps {
async fn checkpoint_all_collections(&self) -> crate::Result<()>;
async fn stream_all_wal_entries(
&self,
) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<(String, LogEntry)>> + Send + 'static>>>;
async fn verify_all_collections(&self) -> crate::Result<HashMap<String, Vec<WalVerificationIssue>>>;
async fn recover_all_collections(&self) -> crate::Result<HashMap<String, usize>>;
}
#[async_trait]
pub trait CollectionWalOps {
async fn checkpoint_wal(&self) -> crate::Result<()>;
async fn stream_wal_entries(self) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>>>;
async fn verify_against_wal(&self) -> crate::Result<WalVerificationResult>;
async fn recover_from_wal(&self) -> crate::Result<WalRecoveryResult>;
async fn wal_size(&self) -> crate::Result<u64>;
async fn wal_entries_count(&self) -> crate::Result<usize>;
}
#[async_trait]
impl StoreWalOps for Store {
async fn checkpoint_all_collections(&self) -> crate::Result<()> {
let collections = self.list_collections().await?;
info!("Starting checkpoint for {} collections", collections.len());
for collection_name in collections {
debug!("Checkpointing collection: {}", collection_name);
let collection = collection_with_config(self, &collection_name, None).await?;
CollectionWalOps::checkpoint_wal(&collection).await?;
}
info!("Checkpoint completed for all collections");
Ok(())
}
async fn stream_all_wal_entries(
&self,
) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<(String, LogEntry)>> + Send>>> {
let collection_names = self.list_collections().await?;
debug!(
"Streaming WAL entries from {} collections",
collection_names.len()
);
let mut collections = Vec::new();
for name in collection_names {
match collection_with_config(self, &name, None).await {
Ok(collection) => collections.push(collection),
Err(e) => warn!("Failed to load collection {}: {}", name, e),
}
}
let stream_of_streams = futures::stream::iter(collections).filter_map(|collection| {
async move {
let name = collection.name().to_owned();
CollectionWalOps::stream_wal_entries(collection)
.await
.map_or_else(
|_| None,
|stream| Some(stream.map(move |entry| entry.map(|e| (name.clone(), e)))),
)
}
});
let combined_stream = stream_of_streams.flatten();
Ok(Box::pin(combined_stream))
}
async fn verify_all_collections(&self) -> crate::Result<HashMap<String, Vec<WalVerificationIssue>>> {
let collections = self.list_collections().await?;
info!(
"Starting WAL verification for {} collections",
collections.len()
);
let mut results = HashMap::new();
let mut total_issues: usize = 0;
for collection_name in collections {
debug!("Verifying collection: {}", collection_name);
let collection = collection_with_config(self, &collection_name, None).await?;
match CollectionWalOps::verify_against_wal(&collection).await {
Ok(verification_result) => {
if !verification_result.issues.is_empty() {
let issue_count = verification_result.issues.len();
total_issues = total_issues
.checked_add(issue_count)
.unwrap_or(total_issues);
results.insert(collection_name.clone(), verification_result.issues);
warn!(
"Collection {} has {} verification issues",
collection_name, issue_count
);
}
else {
debug!("Collection {} verification passed", collection_name);
}
},
Err(e) => {
error!("Failed to verify collection {}: {}", collection_name, e);
results.insert(
collection_name.clone(),
vec![WalVerificationIssue {
transaction_id: "unknown".to_owned(),
document_id: "unknown".to_owned(),
description: format!("Verification failed: {}", e),
is_critical: true,
}],
);
total_issues = total_issues.checked_add(1).unwrap_or(total_issues);
},
}
}
if total_issues > 0 {
warn!(
"WAL verification completed with {} total issues across {} collections",
total_issues,
results.len()
);
}
else {
info!("WAL verification completed successfully - no issues found");
}
Ok(results)
}
async fn recover_all_collections(&self) -> crate::Result<HashMap<String, usize>> {
let collections = self.list_collections().await?;
info!(
"Starting WAL recovery for {} collections",
collections.len()
);
let mut results = HashMap::new();
let mut total_operations: usize = 0;
for collection_name in collections {
debug!("Recovering collection: {}", collection_name);
let collection = collection_with_config(self, &collection_name, None).await?;
match CollectionWalOps::recover_from_wal(&collection).await {
Ok(recovery_result) => {
let operations = recovery_result.recovered_operations;
results.insert(collection_name.clone(), operations);
total_operations = total_operations
.checked_add(operations)
.unwrap_or(total_operations);
if operations > 0 {
info!(
"Recovered {} operations for collection {}",
operations, collection_name
);
}
else {
debug!("No recovery needed for collection {}", collection_name);
}
},
Err(e) => {
error!("Failed to recover collection {}: {}", collection_name, e);
return Err(e);
},
}
}
info!(
"WAL recovery completed - {} total operations recovered across {} collections",
total_operations,
results.len()
);
Ok(results)
}
}
#[async_trait]
impl CollectionWalOps for Collection {
async fn checkpoint_wal(&self) -> crate::Result<()> {
if let Some(wal) = self.wal_manager.as_ref() {
debug!("Starting WAL checkpoint for collection {}", self.name());
wal.checkpoint().await?;
info!("WAL checkpoint completed for collection {}", self.name());
}
else {
debug!("No WAL manager configured for collection {}", self.name());
}
Ok(())
}
async fn stream_wal_entries(self) -> crate::Result<Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>>> {
self.wal_manager.as_ref().map_or_else(
|| {
debug!(
"No WAL manager configured for collection {}, returning empty stream",
self.name()
);
Ok(Box::pin(
futures::stream::empty::<std::result::Result<LogEntry, sentinel_wal::WalError>>()
.map(|r| r.map_err(Into::into)),
)
as Pin<
Box<dyn Stream<Item = crate::Result<LogEntry>> + Send>,
>)
},
|wal| {
let name = self.name().to_owned();
debug!("Streaming WAL entries for collection {}", name);
let stream = wal
.stream_entries()
.map(|result| result.map_err(crate::error::SentinelError::from));
let _wal = wal.clone();
let stream = Box::pin(stream) as Pin<Box<dyn Stream<Item = crate::Result<LogEntry>> + Send + 'static>>;
Ok(stream)
},
)
}
async fn verify_against_wal(&self) -> crate::Result<WalVerificationResult> {
if let Some(wal) = self.wal_manager.as_ref() {
debug!("Starting WAL verification for collection {}", self.name());
let result = verify_wal_consistency(wal, self).await?;
if result.passed {
info!(
"WAL verification passed for collection {} ({} entries processed)",
self.name(),
result.entries_processed
);
}
else {
warn!(
"WAL verification failed for collection {}: {} issues found",
self.name(),
result.issues.len()
);
for issue in &result.issues {
warn!(" Verification issue: {}", issue.description);
}
}
Ok(result)
}
else {
debug!(
"No WAL manager configured for collection {}, skipping verification",
self.name()
);
Ok(WalVerificationResult {
issues: vec![],
passed: true,
entries_processed: 0,
affected_documents: 0,
})
}
}
async fn recover_from_wal(&self) -> crate::Result<WalRecoveryResult> {
if let Some(wal) = self.wal_manager.as_ref() {
info!("Starting WAL recovery for collection {}", self.name());
let result = recover_from_wal_safe(wal, self).await?;
info!(
"WAL recovery completed for collection {}: {} operations recovered, {} skipped, {} failed",
self.name(),
result.recovered_operations,
result.skipped_operations,
result.failed_operations
);
if !result.failures.is_empty() {
warn!("Recovery failures for collection {}:", self.name());
for failure in &result.failures {
warn!(" - {:?}", failure);
}
}
Ok(result)
}
else {
debug!(
"No WAL manager configured for collection {}, skipping recovery",
self.name()
);
Ok(WalRecoveryResult {
recovered_operations: 0,
skipped_operations: 0,
failed_operations: 0,
failures: vec![],
})
}
}
async fn wal_size(&self) -> crate::Result<u64> {
if let Some(wal) = self.wal_manager.as_ref() {
let size = wal.size().await?;
debug!("WAL size for collection {}: {} bytes", self.name(), size);
Ok(size)
}
else {
debug!("No WAL manager configured for collection {}", self.name());
Ok(0)
}
}
async fn wal_entries_count(&self) -> crate::Result<usize> {
if let Some(wal) = self.wal_manager.as_ref() {
let count = wal.entries_count().await?;
debug!(
"WAL entries count for collection {}: {}",
self.name(),
count
);
Ok(count)
}
else {
debug!("No WAL manager configured for collection {}", self.name());
Ok(0)
}
}
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use sentinel_wal::StoreWalConfig;
use super::*;
use crate::Store;
async fn create_test_store_with_collection() -> (tempfile::TempDir, Store, String) {
let temp_dir = tempdir().unwrap();
let store = Store::new_with_config(
temp_dir.path().to_path_buf(),
None,
StoreWalConfig::default(),
)
.await
.unwrap();
let collection_name = "test_wal_collection".to_string();
let _ = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
(temp_dir, store, collection_name)
}
#[tokio::test]
async fn test_checkpoint_wal_with_wal_manager() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"test": 1}))
.await
.unwrap();
collection
.insert("doc-2", serde_json::json!({"test": 2}))
.await
.unwrap();
let result = collection.checkpoint_wal().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_checkpoint_wal_without_wal_manager() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let result = collection.checkpoint_wal().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_stream_wal_entries_with_data() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"name": "Test1"}))
.await
.unwrap();
collection
.insert("doc-2", serde_json::json!({"name": "Test2"}))
.await
.unwrap();
let stream = collection.stream_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert!(!entries.is_empty());
for entry in entries {
assert!(entry.is_ok());
}
}
#[tokio::test]
async fn test_stream_wal_entries_empty() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let stream = collection.stream_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert!(entries.is_empty() || entries.iter().all(|e| e.is_ok()));
}
#[tokio::test]
async fn test_verify_against_wal() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"verify": true}))
.await
.unwrap();
let result = collection.verify_against_wal().await;
assert!(result.is_ok());
let verification = result.unwrap();
assert!(verification.passed || verification.issues.is_empty());
}
#[tokio::test]
async fn test_recover_from_wal() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("doc-to-recover", serde_json::json!({"data": "test"}))
.await
.unwrap();
let result = collection.recover_from_wal().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wal_size() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let initial_size = collection.wal_size().await.unwrap();
collection
.insert("doc-for-size", serde_json::json!({"size": "test data"}))
.await
.unwrap();
let new_size = collection.wal_size().await.unwrap();
assert!(new_size >= initial_size);
}
#[tokio::test]
async fn test_wal_entries_count() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let initial_count = collection.wal_entries_count().await.unwrap();
collection
.insert("doc-1", serde_json::json!({"count": 1}))
.await
.unwrap();
collection
.insert("doc-2", serde_json::json!({"count": 2}))
.await
.unwrap();
let new_count = collection.wal_entries_count().await.unwrap();
assert!(new_count >= initial_count);
}
#[tokio::test]
async fn test_checkpoint_all_collections() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, "test1", None).await.unwrap();
collection
.insert("doc-1", serde_json::json!({"test": 1}))
.await
.unwrap();
let collection2 = collection_with_config(&store, "test2", None).await.unwrap();
collection2
.insert("doc-2", serde_json::json!({"test": 2}))
.await
.unwrap();
let result = store.checkpoint_all_collections().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_stream_all_wal_entries() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
let collection1 = collection_with_config(&store, "stream-collection-1", None)
.await
.unwrap();
collection1
.insert("doc-1", serde_json::json!({"stream": 1}))
.await
.unwrap();
let collection2 = collection_with_config(&store, "stream-collection-2", None)
.await
.unwrap();
collection2
.insert("doc-2", serde_json::json!({"stream": 2}))
.await
.unwrap();
let stream = store.stream_all_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
for entry in entries {
assert!(entry.is_ok());
let (_name, log_entry) = entry.unwrap();
assert!(!log_entry.document_id.is_empty());
}
}
#[tokio::test]
async fn test_verify_all_collections() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
let collection1 = collection_with_config(&store, "verify-1", None)
.await
.unwrap();
collection1
.insert("doc-1", serde_json::json!({"verify": 1}))
.await
.unwrap();
let collection2 = collection_with_config(&store, "verify-2", None)
.await
.unwrap();
collection2
.insert("doc-2", serde_json::json!({"verify": 2}))
.await
.unwrap();
let result = store.verify_all_collections().await;
assert!(result.is_ok());
let issues = result.unwrap();
for (name, collection_issues) in &issues {
for issue in collection_issues {
assert!(!issue.is_critical || issue.description.is_empty());
}
if !collection_issues.is_empty() {
eprintln!("Collection {} has {} issues", name, collection_issues.len());
}
}
}
#[tokio::test]
async fn test_recover_all_collections() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
let collection1 = collection_with_config(&store, "recover-1", None)
.await
.unwrap();
collection1
.insert("doc-1", serde_json::json!({"recover": 1}))
.await
.unwrap();
let collection2 = collection_with_config(&store, "recover-2", None)
.await
.unwrap();
collection2
.insert("doc-2", serde_json::json!({"recover": 2}))
.await
.unwrap();
let result = store.recover_all_collections().await;
assert!(result.is_ok());
let recovery_stats = result.unwrap();
assert!(
recovery_stats.len() >= 2,
"Expected at least 2 collections, got {}",
recovery_stats.len()
);
for (name, operations) in &recovery_stats {
eprintln!("Collection {} recovered {} operations", name, operations);
}
}
#[tokio::test]
async fn test_wal_operations_on_empty_store() {
let temp_dir = tempdir().unwrap();
let store = Store::new_with_config(
temp_dir.path().to_path_buf(),
None,
StoreWalConfig::default(),
)
.await
.unwrap();
let result = store.verify_all_collections().await;
assert!(result.is_ok());
let issues = result.unwrap();
assert!(issues.is_empty());
let result = store.recover_all_collections().await;
assert!(result.is_ok());
let stats = result.unwrap();
assert!(stats.is_empty());
let stream = store.stream_all_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert!(entries.is_empty());
}
#[tokio::test]
async fn test_checkpoint_empty_collection() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let result = collection.checkpoint_wal().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wal_ops_stream_entries_with_verify_all() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("verify-doc", serde_json::json!({"verify": "test"}))
.await
.unwrap();
let stream = store.stream_all_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert!(!entries.is_empty());
for entry in entries {
if let Ok(result) = entry {
assert!(!result.1.document_id.is_empty());
}
}
}
#[tokio::test]
async fn test_wal_ops_verify_collection_with_no_wal_manager() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let result = collection.verify_against_wal().await;
assert!(result.is_ok());
let verification = result.unwrap();
assert!(verification.passed);
assert_eq!(verification.entries_processed, 0);
}
#[tokio::test]
async fn test_wal_ops_recover_from_wal_with_no_wal_manager() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let result = collection.recover_from_wal().await;
assert!(result.is_ok());
let recovery = result.unwrap();
assert_eq!(recovery.recovered_operations, 0);
assert_eq!(recovery.skipped_operations, 0);
assert_eq!(recovery.failed_operations, 0);
}
#[tokio::test]
async fn test_wal_ops_wal_size_with_no_wal_manager() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let size = collection.wal_size().await.unwrap();
assert_eq!(size, 0);
}
#[tokio::test]
async fn test_wal_ops_wal_entries_count_with_no_wal_manager() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let count = collection.wal_entries_count().await.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_wal_ops_stream_wal_entries_with_no_wal_manager() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let stream = collection.stream_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert!(entries.is_empty());
}
#[tokio::test]
async fn test_wal_ops_verify_all_with_mixed_collections() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
for i in 0 .. 3 {
let collection = collection_with_config(&store, &format!("verify-multi-{}", i), None)
.await
.unwrap();
collection
.insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
.await
.unwrap();
}
let result = store.verify_all_collections().await;
assert!(result.is_ok());
let issues = result.unwrap();
for (name, collection_issues) in &issues {
for issue in collection_issues {
assert!(
!issue.is_critical || issue.description.is_empty(),
"Collection {} has critical issue: {}",
name,
issue.description
);
}
}
}
#[tokio::test]
async fn test_wal_ops_checkpoint_all_with_empty_store() {
let temp_dir = tempdir().unwrap();
let store = Store::new_with_config(
temp_dir.path().to_path_buf(),
None,
StoreWalConfig::default(),
)
.await
.unwrap();
let result = store.checkpoint_all_collections().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wal_ops_recover_all_with_empty_store() {
let temp_dir = tempdir().unwrap();
let store = Store::new_with_config(
temp_dir.path().to_path_buf(),
None,
StoreWalConfig::default(),
)
.await
.unwrap();
let result = store.recover_all_collections().await;
assert!(result.is_ok());
let stats = result.unwrap();
assert!(stats.is_empty());
}
#[tokio::test]
async fn test_wal_ops_checkpoint_with_verification_options() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"test": 1}))
.await
.unwrap();
let result = collection.checkpoint_wal().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wal_ops_stream_all_with_no_collections() {
let temp_dir = tempdir().unwrap();
let store = Store::new_with_config(
temp_dir.path().to_path_buf(),
None,
StoreWalConfig::default(),
)
.await
.unwrap();
let stream = store.stream_all_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert!(entries.is_empty());
}
#[tokio::test]
async fn test_wal_ops_verify_all_with_no_issues() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, "verify-pass", None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"verify": true}))
.await
.unwrap();
let result = store.verify_all_collections().await;
assert!(result.is_ok());
let issues = result.unwrap();
assert!(issues.get("verify-pass").is_none() || issues.get("verify-pass").map_or(true, |v| v.is_empty()));
}
#[tokio::test]
async fn test_wal_ops_recover_all_with_partial_failures() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, "recover-test", None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"recover": true}))
.await
.unwrap();
let result = store.recover_all_collections().await;
assert!(result.is_ok());
let stats = result.unwrap();
if let Some(_count) = stats.get("recover-test") {
}
}
#[tokio::test]
async fn test_wal_ops_stream_entries_with_large_wal() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
for i in 0 .. 50 {
collection
.insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
.await
.unwrap();
}
let stream = collection.stream_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert_eq!(entries.len(), 50);
for entry in entries {
assert!(entry.is_ok());
}
}
#[tokio::test]
async fn test_wal_ops_verify_with_empty_wal() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"test": 1}))
.await
.unwrap();
let result = collection.verify_against_wal().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wal_ops_wal_entries_count_after_rotation() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
let initial_count = collection.wal_entries_count().await.unwrap();
for i in 0 .. 5 {
collection
.insert(&format!("doc-{}", i), serde_json::json!({"index": i}))
.await
.unwrap();
}
let new_count = collection.wal_entries_count().await.unwrap();
assert!(new_count >= initial_count + 5);
}
#[tokio::test]
async fn test_wal_ops_checkpoint_preserves_data() {
let (_temp_dir, store, collection_name) = create_test_store_with_collection().await;
let collection = collection_with_config(&store, &collection_name, None)
.await
.unwrap();
collection
.insert("doc-1", serde_json::json!({"name": "Test", "value": 42}))
.await
.unwrap();
collection.checkpoint_wal().await.unwrap();
let doc = collection.get("doc-1").await.unwrap();
assert!(doc.is_some());
assert_eq!(doc.unwrap().data()["value"], 42);
}
#[tokio::test]
async fn test_wal_ops_stream_all_with_mixed_collections() {
let (_temp_dir, store, _collection_name) = create_test_store_with_collection().await;
for i in 0 .. 3 {
let collection = collection_with_config(&store, &format!("stream-mixed-{}", i), None)
.await
.unwrap();
for j in 0 .. i + 1 {
collection
.insert(
&format!("doc-{}", j),
serde_json::json!({"collection": i, "doc": j}),
)
.await
.unwrap();
}
}
let stream = store.stream_all_wal_entries().await.unwrap();
let entries: Vec<_> = stream.collect().await;
assert_eq!(entries.len(), 6);
let collections: std::collections::HashSet<String> = entries
.into_iter()
.filter_map(|e| e.ok())
.map(|(name, _)| name)
.collect();
assert_eq!(collections.len(), 3);
}
}