#![cfg(feature = "parallel")]
use rayon::prelude::*;
use simd_r_drive::{DataStore, traits::DataStoreWriter};
use std::collections::HashSet;
use tempfile::tempdir;
fn create_temp_storage() -> (tempfile::TempDir, DataStore) {
let dir = tempdir().expect("Failed to create temp dir");
let path = dir.path().join("test_storage.bin");
let storage = DataStore::open(&path).expect("Failed to open storage");
(dir, storage)
}
#[test]
fn test_par_iter_produces_correct_entries() {
let (_dir, storage) = create_temp_storage();
let entries = vec![
(b"key1".as_slice(), b"payload1".as_slice()),
(b"key2".as_slice(), b"payload2".as_slice()),
(b"key3".as_slice(), b"payload3".as_slice()),
];
storage.batch_write(&entries).expect("Batch write failed");
let expected_payloads: HashSet<Vec<u8>> = storage
.iter_entries()
.map(|e| e.as_slice().to_vec())
.collect();
let parallel_payloads: HashSet<Vec<u8>> = storage
.par_iter_entries()
.map(|e| e.as_slice().to_vec())
.collect();
assert_eq!(
expected_payloads, parallel_payloads,
"Parallel iterator should produce the same set of entries as the sequential one"
);
assert_eq!(parallel_payloads.len(), 3);
}
#[test]
fn test_par_iter_skips_deleted_entries() {
let (_dir, storage) = create_temp_storage();
let entries = vec![
(b"key1".as_slice(), b"payload1".as_slice()),
(b"key_to_delete".as_slice(), b"payload_to_delete".as_slice()),
(b"key3".as_slice(), b"payload3".as_slice()),
];
storage.batch_write(&entries).expect("Batch write failed");
storage.delete(b"key_to_delete").expect("Delete failed");
let found_payloads: Vec<Vec<u8>> = storage
.par_iter_entries()
.map(|e| e.as_slice().to_vec())
.collect();
assert_eq!(
found_payloads.len(),
2,
"Parallel iterator should not include deleted entries"
);
let deleted_payload = b"payload_to_delete".to_vec();
assert!(
!found_payloads.contains(&deleted_payload),
"Deleted payload should not be found in parallel iteration results"
);
}
#[test]
fn test_par_iter_on_empty_store() {
let (_dir, storage) = create_temp_storage();
let count = storage.par_iter_entries().count();
assert_eq!(
count, 0,
"Parallel iterator should produce zero items for an empty store"
);
}
#[test]
fn test_par_iter_yields_only_latest_version_of_updated_entry() {
let (_dir, storage) = create_temp_storage();
storage
.write(b"updated_key", b"version1")
.expect("Write failed");
storage
.write(b"stable_key", b"stable_version")
.expect("Write failed");
storage
.write(b"updated_key", b"version2_final")
.expect("Update failed");
let final_payloads: HashSet<Vec<u8>> = storage
.par_iter_entries()
.map(|e| e.as_slice().to_vec())
.collect();
assert_eq!(final_payloads.len(), 2);
assert!(final_payloads.contains(b"version2_final".as_slice()));
assert!(final_payloads.contains(b"stable_version".as_slice()));
assert!(!final_payloads.contains(b"version1".as_slice()));
}
#[test]
fn test_par_iter_excludes_entries_that_were_updated_then_deleted() {
let (_dir, storage) = create_temp_storage();
storage
.write(b"deleted_key", b"version1")
.expect("Write failed");
storage
.write(b"deleted_key", b"version2")
.expect("Update failed");
storage
.write(b"stable_key", b"stable_version")
.expect("Write failed");
storage.delete(b"deleted_key").expect("Delete failed");
let final_payloads: HashSet<Vec<u8>> = storage
.par_iter_entries()
.map(|e| e.as_slice().to_vec())
.collect();
assert_eq!(final_payloads.len(), 1);
assert!(final_payloads.contains(b"stable_version".as_slice()));
assert!(!final_payloads.contains(b"version1".as_slice()));
assert!(!final_payloads.contains(b"version2".as_slice()));
}