use super::{LocalStorage, StorageError};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
pub mod connection;
pub mod crdt_store;
pub mod errors;
pub mod operations;
use connection::IndexedDbConnection;
use crdt_store::CrdtStore;
use errors::IndexedDbError;
use operations::IndexedDbOperations;
pub struct IndexedDbStorage {
db_name: String,
store_name: String,
fallback: super::memory::MemoryStorage,
connection: Option<Arc<IndexedDbConnection>>,
operations: Option<IndexedDbOperations>,
crdt_store: Option<CrdtStore>,
initialized: bool,
}
impl IndexedDbStorage {
pub fn new(db_name: String, store_name: String) -> Self {
Self {
db_name,
store_name,
fallback: super::memory::MemoryStorage::new(),
connection: None,
operations: None,
crdt_store: None,
initialized: false,
}
}
pub async fn initialize(&mut self) -> Result<(), StorageError> {
if self.initialized {
return Ok(());
}
match IndexedDbConnection::open(&self.db_name, 1).await {
Ok(connection) => {
let connection = Arc::new(connection);
let operations = IndexedDbOperations::new(connection.clone());
let crdt_store = CrdtStore::new(connection.clone());
self.connection = Some(connection);
self.operations = Some(operations);
self.crdt_store = Some(crdt_store);
self.initialized = true;
Ok(())
}
Err(e) => {
tracing::warn!(
"Failed to initialize IndexedDB: {:?}, falling back to memory storage",
e
);
self.initialized = true; Ok(()) }
}
}
async fn ensure_initialized(&mut self) -> Result<(), StorageError> {
if !self.initialized {
self.initialize().await?;
}
Ok(())
}
fn get_operations(&self) -> Option<&IndexedDbOperations> {
self.operations.as_ref()
}
fn get_crdt_store(&self) -> Option<&CrdtStore> {
self.crdt_store.as_ref()
}
pub fn is_indexeddb_available(&self) -> bool {
self.connection.is_some()
}
pub async fn get_stats(&self) -> Result<crdt_store::StorageStats, StorageError> {
if let Some(crdt_store) = &self.crdt_store {
crdt_store.get_stats().await.map_err(Into::into)
} else {
Ok(crdt_store::StorageStats {
collections_count: 0,
deltas_count: 0,
peers_count: 0,
})
}
}
pub async fn store_delta(
&mut self,
collection_id: &str,
delta: &[u8],
replica_id: &str,
operation_type: &str,
) -> Result<(), StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store
.store_delta(collection_id, delta, replica_id, operation_type)
.await
.map_err(Into::into)
} else {
let key = format!("delta:{}:{}:{}", collection_id, replica_id, operation_type);
self.fallback.set(&key, &delta.to_vec()).await
}
}
pub async fn get_deltas(
&mut self,
collection_id: &str,
from_timestamp: Option<u64>,
to_timestamp: Option<u64>,
) -> Result<Vec<crdt_store::DeltaRecord>, StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store
.get_deltas(collection_id, from_timestamp, to_timestamp)
.await
.map_err(Into::into)
} else {
Ok(Vec::new())
}
}
pub async fn store_metadata(
&mut self,
metadata: &crdt_store::CollectionMetadata,
) -> Result<(), StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store
.store_metadata(metadata)
.await
.map_err(Into::into)
} else {
let key = format!("metadata:{}", metadata.id);
self.fallback.set(&key, metadata).await
}
}
pub async fn get_metadata(
&mut self,
collection_id: &str,
) -> Result<Option<crdt_store::CollectionMetadata>, StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store
.get_metadata(collection_id)
.await
.map_err(Into::into)
} else {
let key = format!("metadata:{}", collection_id);
self.fallback.get(&key).await
}
}
pub async fn list_collections(
&mut self,
) -> Result<Vec<crdt_store::CollectionMetadata>, StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store.list_collections().await.map_err(Into::into)
} else {
Ok(Vec::new())
}
}
pub async fn store_peer(&mut self, peer: &crdt_store::PeerInfo) -> Result<(), StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store.store_peer(peer).await.map_err(Into::into)
} else {
let key = format!("peer:{}", peer.id);
self.fallback.set(&key, peer).await
}
}
pub async fn get_peer(
&mut self,
peer_id: &str,
) -> Result<Option<crdt_store::PeerInfo>, StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store.get_peer(peer_id).await.map_err(Into::into)
} else {
let key = format!("peer:{}", peer_id);
self.fallback.get(&key).await
}
}
pub async fn list_peers(&mut self) -> Result<Vec<crdt_store::PeerInfo>, StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store.list_peers().await.map_err(Into::into)
} else {
Ok(Vec::new())
}
}
pub async fn cleanup_old_deltas(
&mut self,
collection_id: &str,
keep_count: usize,
) -> Result<(), StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store
.cleanup_old_deltas(collection_id, keep_count)
.await
.map_err(Into::into)
} else {
Ok(())
}
}
pub async fn clear_all(&mut self) -> Result<(), StorageError> {
self.ensure_initialized().await?;
if let Some(crdt_store) = &self.crdt_store {
crdt_store
.clear_all()
.await
.map_err(|e| StorageError::OperationFailed(e.to_string()))?;
}
self.fallback.clear().await?;
Ok(())
}
}
impl Clone for IndexedDbStorage {
fn clone(&self) -> Self {
Self {
db_name: self.db_name.clone(),
store_name: self.store_name.clone(),
fallback: self.fallback.clone(),
connection: None, operations: None, crdt_store: None, initialized: false, }
}
}
#[async_trait]
impl LocalStorage for IndexedDbStorage {
async fn set<T: Serialize + Send + Sync>(
&self,
key: &str,
value: &T,
) -> Result<(), StorageError> {
if let Some(operations) = &self.operations {
operations
.set(&self.store_name, key, value)
.await
.map_err(Into::into)
} else {
self.fallback.set(key, value).await
}
}
async fn get<T: DeserializeOwned + Send + Sync>(
&self,
key: &str,
) -> Result<Option<T>, StorageError> {
if let Some(operations) = &self.operations {
operations
.get(&self.store_name, key)
.await
.map_err(Into::into)
} else {
self.fallback.get(key).await
}
}
async fn remove(&self, key: &str) -> Result<(), StorageError> {
if let Some(operations) = &self.operations {
operations
.delete(&self.store_name, key)
.await
.map_err(Into::into)
} else {
self.fallback.remove(key).await
}
}
async fn keys(&self) -> Result<Vec<String>, StorageError> {
if let Some(operations) = &self.operations {
operations.keys(&self.store_name).await.map_err(Into::into)
} else {
self.fallback.keys().await
}
}
async fn contains_key(&self, key: &str) -> Result<bool, StorageError> {
if let Some(operations) = &self.operations {
operations
.contains_key(&self.store_name, key)
.await
.map_err(Into::into)
} else {
self.fallback.contains_key(key).await
}
}
async fn len(&self) -> Result<usize, StorageError> {
if let Some(operations) = &self.operations {
operations.count(&self.store_name).await.map_err(Into::into)
} else {
self.fallback.len().await
}
}
async fn is_empty(&self) -> Result<bool, StorageError> {
let len = self.len().await?;
Ok(len == 0)
}
async fn clear(&self) -> Result<(), StorageError> {
if let Some(operations) = &self.operations {
operations.clear(&self.store_name).await.map_err(Into::into)
} else {
self.fallback.clear().await
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_indexeddb_storage_creation() {
let storage = IndexedDbStorage::new("test_db".to_string(), "test_store".to_string());
assert_eq!(storage.db_name, "test_db");
assert_eq!(storage.store_name, "test_store");
assert!(!storage.initialized);
assert!(!storage.is_indexeddb_available());
}
#[tokio::test]
async fn test_indexeddb_storage_fallback() {
let mut storage = IndexedDbStorage::new("test_db".to_string(), "test_store".to_string());
assert!(storage.set("key1", &"value1".to_string()).await.is_ok());
let value = storage.get::<String>("key1").await.unwrap();
assert!(storage.remove("key1").await.is_ok());
}
#[tokio::test]
async fn test_indexeddb_storage_clone() {
let storage = IndexedDbStorage::new("test_db".to_string(), "test_store".to_string());
let cloned_storage = storage.clone();
assert!(cloned_storage
.set("key1", &"value1".to_string())
.await
.is_ok());
}
#[tokio::test]
async fn test_indexeddb_storage_stats() {
let mut storage = IndexedDbStorage::new("test_db".to_string(), "test_store".to_string());
let stats = storage.get_stats().await.unwrap();
assert_eq!(stats.collections_count, 0);
assert_eq!(stats.deltas_count, 0);
assert_eq!(stats.peers_count, 0);
}
}