use super::connection::IndexedDbConnection;
use super::errors::IndexedDbError;
use super::operations::IndexedDbOperations;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaRecord {
pub id: String,
pub collection_id: String,
pub delta: Vec<u8>,
pub timestamp: u64,
pub replica_id: String,
pub operation_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionMetadata {
pub id: String,
pub name: String,
pub crdt_type: String,
pub version: u32,
pub last_sync: u64,
pub replica_count: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub id: String,
pub last_seen: u64,
pub status: String,
pub version: u32,
}
pub struct CrdtStore {
operations: IndexedDbOperations,
}
impl CrdtStore {
pub fn new(connection: Arc<IndexedDbConnection>) -> Self {
let operations = IndexedDbOperations::new(connection);
Self { operations }
}
pub async fn store_delta(&self, collection_id: &str, delta: &[u8], replica_id: &str, operation_type: &str) -> Result<(), IndexedDbError> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let delta_id = format!("{}#{}#{}", collection_id, timestamp, replica_id);
let record = DeltaRecord {
id: delta_id,
collection_id: collection_id.to_string(),
delta: delta.to_vec(),
timestamp,
replica_id: replica_id.to_string(),
operation_type: operation_type.to_string(),
};
self.operations.set("deltas", &record.id, &record).await
}
pub async fn get_deltas(&self, collection_id: &str, from_timestamp: Option<u64>, to_timestamp: Option<u64>) -> Result<Vec<DeltaRecord>, IndexedDbError> {
let all_keys = self.operations.keys("deltas").await?;
let mut deltas = Vec::new();
for key in all_keys {
if let Some(record) = self.operations.get::<DeltaRecord>("deltas", &key).await? {
if record.collection_id == collection_id {
let include = match (from_timestamp, to_timestamp) {
(Some(from), Some(to)) => record.timestamp >= from && record.timestamp <= to,
(Some(from), None) => record.timestamp >= from,
(None, Some(to)) => record.timestamp <= to,
(None, None) => true,
};
if include {
deltas.push(record);
}
}
}
}
deltas.sort_by_key(|d| d.timestamp);
Ok(deltas)
}
pub async fn get_latest_delta(&self, collection_id: &str) -> Result<Option<DeltaRecord>, IndexedDbError> {
let deltas = self.get_deltas(collection_id, None, None).await?;
Ok(deltas.into_iter().last())
}
pub async fn store_metadata(&self, metadata: &CollectionMetadata) -> Result<(), IndexedDbError> {
self.operations.set("metadata", &metadata.id, metadata).await
}
pub async fn get_metadata(&self, collection_id: &str) -> Result<Option<CollectionMetadata>, IndexedDbError> {
self.operations.get("metadata", collection_id).await
}
pub async fn list_collections(&self) -> Result<Vec<CollectionMetadata>, IndexedDbError> {
let keys = self.operations.keys("metadata").await?;
let mut collections = Vec::new();
for key in keys {
if let Some(metadata) = self.operations.get::<CollectionMetadata>("metadata", &key).await? {
collections.push(metadata);
}
}
Ok(collections)
}
pub async fn store_peer(&self, peer: &PeerInfo) -> Result<(), IndexedDbError> {
self.operations.set("peers", &peer.id, peer).await
}
pub async fn get_peer(&self, peer_id: &str) -> Result<Option<PeerInfo>, IndexedDbError> {
self.operations.get("peers", peer_id).await
}
pub async fn list_peers(&self) -> Result<Vec<PeerInfo>, IndexedDbError> {
let keys = self.operations.keys("peers").await?;
let mut peers = Vec::new();
for key in keys {
if let Some(peer) = self.operations.get::<PeerInfo>("peers", &key).await? {
peers.push(peer);
}
}
Ok(peers)
}
pub async fn update_peer_last_seen(&self, peer_id: &str) -> Result<(), IndexedDbError> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
if let Some(mut peer) = self.get_peer(peer_id).await? {
peer.last_seen = timestamp;
self.store_peer(&peer).await?;
}
Ok(())
}
pub async fn cleanup_old_deltas(&self, collection_id: &str, keep_count: usize) -> Result<(), IndexedDbError> {
let mut deltas = self.get_deltas(collection_id, None, None).await?;
if deltas.len() > keep_count {
deltas.sort_by_key(|d| d.timestamp);
let to_remove = deltas.len() - keep_count;
for delta in deltas.into_iter().take(to_remove) {
self.operations.delete("deltas", &delta.id).await?;
}
}
Ok(())
}
pub async fn get_stats(&self) -> Result<StorageStats, IndexedDbError> {
let collections_count = self.operations.count("metadata").await?;
let deltas_count = self.operations.count("deltas").await?;
let peers_count = self.operations.count("peers").await?;
Ok(StorageStats {
collections_count,
deltas_count,
peers_count,
})
}
pub async fn clear_all(&self) -> Result<(), IndexedDbError> {
self.operations.clear("collections").await?;
self.operations.clear("metadata").await?;
self.operations.clear("deltas").await?;
self.operations.clear("peers").await?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageStats {
pub collections_count: usize,
pub deltas_count: usize,
pub peers_count: usize,
}
impl Clone for CrdtStore {
fn clone(&self) -> Self {
Self {
operations: self.operations.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_crdt_store_creation() {
#[cfg(not(target_arch = "wasm32"))]
{
}
}
#[test]
fn test_delta_record_serialization() {
let record = DeltaRecord {
id: "test#123#replica1".to_string(),
collection_id: "test_collection".to_string(),
delta: b"test delta data".to_vec(),
timestamp: 1234567890,
replica_id: "replica1".to_string(),
operation_type: "set".to_string(),
};
let serialized = serde_json::to_string(&record).unwrap();
let deserialized: DeltaRecord = serde_json::from_str(&serialized).unwrap();
assert_eq!(record.id, deserialized.id);
assert_eq!(record.collection_id, deserialized.collection_id);
assert_eq!(record.delta, deserialized.delta);
assert_eq!(record.timestamp, deserialized.timestamp);
assert_eq!(record.replica_id, deserialized.replica_id);
assert_eq!(record.operation_type, deserialized.operation_type);
}
#[test]
fn test_collection_metadata_serialization() {
let metadata = CollectionMetadata {
id: "test_collection".to_string(),
name: "Test Collection".to_string(),
crdt_type: "LwwMap".to_string(),
version: 1,
last_sync: 1234567890,
replica_count: 3,
};
let serialized = serde_json::to_string(&metadata).unwrap();
let deserialized: CollectionMetadata = serde_json::from_str(&serialized).unwrap();
assert_eq!(metadata.id, deserialized.id);
assert_eq!(metadata.name, deserialized.name);
assert_eq!(metadata.crdt_type, deserialized.crdt_type);
assert_eq!(metadata.version, deserialized.version);
assert_eq!(metadata.last_sync, deserialized.last_sync);
assert_eq!(metadata.replica_count, deserialized.replica_count);
}
#[test]
fn test_peer_info_serialization() {
let peer = PeerInfo {
id: "peer1".to_string(),
last_seen: 1234567890,
status: "connected".to_string(),
version: 1,
};
let serialized = serde_json::to_string(&peer).unwrap();
let deserialized: PeerInfo = serde_json::from_str(&serialized).unwrap();
assert_eq!(peer.id, deserialized.id);
assert_eq!(peer.last_seen, deserialized.last_seen);
assert_eq!(peer.status, deserialized.status);
assert_eq!(peer.version, deserialized.version);
}
}