use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use crate::RecordId;
use crate::blob_store::traits::{BatchBlobStore, BlobStore, BlobStoreStats, IterableBlobStore};
use crate::error::{Result, ZiporaError};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct MemoryBlobStore {
data: HashMap<RecordId, Vec<u8>>,
next_id: AtomicU32,
stats: BlobStoreStats,
}
impl MemoryBlobStore {
pub fn new() -> Self {
Self {
data: HashMap::new(),
next_id: AtomicU32::new(1), stats: BlobStoreStats::new(),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
data: HashMap::with_capacity(capacity),
next_id: AtomicU32::new(1),
stats: BlobStoreStats::new(),
}
}
pub fn from_data(data: HashMap<RecordId, Vec<u8>>) -> Self {
let next_id = data.keys().max().map(|&id| id + 1).unwrap_or(1);
let mut stats = BlobStoreStats::new();
for blob in data.values() {
stats.record_put(blob.len());
}
Self {
data,
next_id: AtomicU32::new(next_id),
stats,
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.data.capacity()
}
pub fn reserve(&mut self, additional: usize) {
self.data.reserve(additional);
}
pub fn shrink_to_fit(&mut self) {
self.data.shrink_to_fit();
}
pub fn clear(&mut self) {
self.data.clear();
self.next_id.store(1, Ordering::Relaxed);
self.stats = BlobStoreStats::new();
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn internal_data(&self) -> &HashMap<RecordId, Vec<u8>> {
&self.data
}
fn next_record_id(&self) -> RecordId {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
}
impl Clone for MemoryBlobStore {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
next_id: AtomicU32::new(self.next_id.load(Ordering::Relaxed)),
stats: self.stats.clone(),
}
}
}
impl Default for MemoryBlobStore {
fn default() -> Self {
Self::new()
}
}
impl BlobStore for MemoryBlobStore {
fn get(&self, id: RecordId) -> Result<Vec<u8>> {
self.data
.get(&id)
.cloned()
.ok_or_else(|| ZiporaError::not_found(format!("Blob with ID {} not found", id)))
}
fn put(&mut self, data: &[u8]) -> Result<RecordId> {
let id = self.next_record_id();
let blob_data = data.to_vec();
self.data.insert(id, blob_data);
self.stats.record_put(data.len());
Ok(id)
}
fn remove(&mut self, id: RecordId) -> Result<()> {
match self.data.remove(&id) {
Some(data) => {
self.stats.record_remove(data.len());
Ok(())
}
None => Err(ZiporaError::not_found(format!(
"Blob with ID {} not found",
id
))),
}
}
fn contains(&self, id: RecordId) -> bool {
self.data.contains_key(&id)
}
fn size(&self, id: RecordId) -> Result<Option<usize>> {
Ok(self.data.get(&id).map(|data| data.len()))
}
fn len(&self) -> usize {
self.data.len()
}
fn stats(&self) -> BlobStoreStats {
self.stats.clone()
}
}
impl IterableBlobStore for MemoryBlobStore {
type IdIter = std::vec::IntoIter<RecordId>;
fn iter_ids(&self) -> Self::IdIter {
let mut ids: Vec<RecordId> = self.data.keys().copied().collect();
ids.sort_unstable();
ids.into_iter()
}
}
impl BatchBlobStore for MemoryBlobStore {
fn put_batch<I>(&mut self, blobs: I) -> Result<Vec<RecordId>>
where
I: IntoIterator<Item = Vec<u8>>,
{
let mut ids = Vec::new();
for blob in blobs {
let id = self.put(&blob)?;
ids.push(id);
}
Ok(ids)
}
fn get_batch<I>(&self, ids: I) -> Result<Vec<Option<Vec<u8>>>>
where
I: IntoIterator<Item = RecordId>,
{
let mut results = Vec::new();
for id in ids {
results.push(self.data.get(&id).cloned());
}
Ok(results)
}
fn remove_batch<I>(&mut self, ids: I) -> Result<usize>
where
I: IntoIterator<Item = RecordId>,
{
let mut removed_count = 0;
for id in ids {
if let Some(data) = self.data.remove(&id) {
self.stats.record_remove(data.len());
removed_count += 1;
}
}
Ok(removed_count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blob_store::traits::IterableBlobStore;
#[test]
fn test_memory_blob_store_basic_operations() {
let mut store = MemoryBlobStore::new();
assert_eq!(store.len(), 0);
assert!(store.is_empty());
let data1 = b"hello world";
let id1 = store.put(data1).unwrap();
assert_eq!(store.len(), 1);
assert!(!store.is_empty());
assert!(store.contains(id1));
let retrieved = store.get(id1).unwrap();
assert_eq!(data1, &retrieved[..]);
let size = store.size(id1).unwrap();
assert_eq!(size, Some(data1.len()));
let data2 = b"goodbye world";
let id2 = store.put(data2).unwrap();
assert_eq!(store.len(), 2);
assert_ne!(id1, id2);
store.remove(id1).unwrap();
assert_eq!(store.len(), 1);
assert!(!store.contains(id1));
assert!(store.contains(id2));
assert!(store.get(id1).is_err());
let retrieved2 = store.get(id2).unwrap();
assert_eq!(data2, &retrieved2[..]);
}
#[test]
fn test_memory_blob_store_errors() {
let mut store = MemoryBlobStore::new();
let result = store.get(999);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
let result = store.remove(999);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
let size = store.size(999).unwrap();
assert_eq!(size, None);
}
#[test]
fn test_memory_blob_store_iteration() {
let mut store = MemoryBlobStore::new();
let data1 = b"blob1";
let data2 = b"blob2";
let data3 = b"blob3";
let id1 = store.put(data1).unwrap();
let id2 = store.put(data2).unwrap();
let id3 = store.put(data3).unwrap();
let ids: Vec<RecordId> = store.iter_ids().collect();
assert_eq!(ids.len(), 3);
assert!(ids.contains(&id1));
assert!(ids.contains(&id2));
assert!(ids.contains(&id3));
let blobs: Result<Vec<(RecordId, Vec<u8>)>> = store.iter_blobs().collect();
let blobs = blobs.unwrap();
assert_eq!(blobs.len(), 3);
for (id, data) in blobs {
match id {
_ if id == id1 => assert_eq!(&data, data1),
_ if id == id2 => assert_eq!(&data, data2),
_ if id == id3 => assert_eq!(&data, data3),
_ => panic!("Unexpected blob ID: {}", id),
}
}
}
#[test]
fn test_memory_blob_store_batch_operations() {
let mut store = MemoryBlobStore::new();
let blobs = vec![b"blob1".to_vec(), b"blob2".to_vec(), b"blob3".to_vec()];
let ids = store.put_batch(blobs.clone()).unwrap();
assert_eq!(ids.len(), 3);
assert_eq!(store.len(), 3);
let retrieved = store.get_batch(ids.clone()).unwrap();
assert_eq!(retrieved.len(), 3);
for (i, blob_opt) in retrieved.iter().enumerate() {
assert!(blob_opt.is_some());
assert_eq!(blob_opt.as_ref().unwrap(), &blobs[i]);
}
let mut test_ids = ids.clone();
test_ids.push(999); let retrieved = store.get_batch(test_ids).unwrap();
assert_eq!(retrieved.len(), 4);
assert!(retrieved[3].is_none());
let removed_count = store.remove_batch(ids).unwrap();
assert_eq!(removed_count, 3);
assert_eq!(store.len(), 0);
}
#[test]
fn test_memory_blob_store_capacity_management() {
let mut store = MemoryBlobStore::with_capacity(10);
assert!(store.capacity() >= 10);
store.reserve(100);
assert!(store.capacity() >= 100);
for i in 0..5 {
store.put(format!("blob{}", i).as_bytes()).unwrap();
}
store.shrink_to_fit();
assert!(store.capacity() >= store.len());
store.clear();
assert_eq!(store.len(), 0);
assert!(store.is_empty());
}
#[test]
fn test_memory_blob_store_from_data() {
let mut data = HashMap::new();
data.insert(5, b"blob1".to_vec());
data.insert(10, b"blob2".to_vec());
data.insert(15, b"blob3".to_vec());
let store = MemoryBlobStore::from_data(data.clone());
assert_eq!(store.len(), 3);
let next_id = store.next_id.load(Ordering::Relaxed);
assert_eq!(next_id, 16);
for (id, expected_data) in data {
let retrieved = store.get(id).unwrap();
assert_eq!(retrieved, expected_data);
}
}
#[test]
fn test_memory_blob_store_stats() {
let mut store = MemoryBlobStore::new();
let initial_stats = store.stats();
assert_eq!(initial_stats.blob_count, 0);
assert_eq!(initial_stats.total_size, 0);
store.put(b"blob1").unwrap();
store.put(b"blob22").unwrap();
let stats = store.stats();
assert_eq!(stats.blob_count, 2);
assert_eq!(stats.total_size, 11); assert_eq!(stats.put_count, 2);
}
#[test]
fn test_record_id_generation() {
let store = MemoryBlobStore::new();
let id1 = store.next_record_id();
let id2 = store.next_record_id();
let id3 = store.next_record_id();
assert_eq!(id1, 1);
assert_eq!(id2, 2);
assert_eq!(id3, 3);
use std::sync::Arc;
use std::thread;
let store = Arc::new(MemoryBlobStore::new());
let mut handles = vec![];
for _ in 0..10 {
let store_clone = Arc::clone(&store);
let handle = thread::spawn(move || {
(0..100)
.map(|_| store_clone.next_record_id())
.collect::<Vec<_>>()
});
handles.push(handle);
}
let mut all_ids = Vec::new();
for handle in handles {
all_ids.extend(handle.join().unwrap());
}
all_ids.sort_unstable();
for window in all_ids.windows(2) {
assert_ne!(window[0], window[1], "Found duplicate ID: {}", window[0]);
}
}
}