use super::{SyncItem, SyncMetadata, SyncStrategy};
use crate::error::{EdgeError, Result};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[async_trait]
pub trait SyncProtocol: Send + Sync {
async fn push(&self, items: Vec<SyncItem>) -> Result<SyncMetadata>;
async fn pull(&self, since: Option<chrono::DateTime<chrono::Utc>>) -> Result<Vec<SyncItem>>;
async fn sync(&self, local_items: Vec<SyncItem>) -> Result<SyncResult>;
async fn is_connected(&self) -> bool;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResult {
pub pushed: Vec<String>,
pub pulled: Vec<SyncItem>,
pub metadata: SyncMetadata,
pub conflicts: Vec<Conflict>,
}
impl SyncResult {
pub fn new(metadata: SyncMetadata) -> Self {
Self {
pushed: Vec::new(),
pulled: Vec::new(),
metadata,
conflicts: Vec::new(),
}
}
pub fn is_successful(&self) -> bool {
self.metadata.status.is_complete()
}
pub fn total_items(&self) -> usize {
self.pushed.len() + self.pulled.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Conflict {
pub item_id: String,
pub local_version: u64,
pub remote_version: u64,
pub resolution: ConflictResolution,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictResolution {
UseLocal,
UseRemote,
Merge,
Manual,
}
pub struct MockSyncProtocol {
storage: parking_lot::RwLock<HashMap<String, SyncItem>>,
connected: parking_lot::RwLock<bool>,
}
impl Default for MockSyncProtocol {
fn default() -> Self {
Self::new()
}
}
impl MockSyncProtocol {
pub fn new() -> Self {
Self {
storage: parking_lot::RwLock::new(HashMap::new()),
connected: parking_lot::RwLock::new(true),
}
}
pub fn set_connected(&self, connected: bool) {
*self.connected.write() = connected;
}
}
#[async_trait]
impl SyncProtocol for MockSyncProtocol {
async fn push(&self, items: Vec<SyncItem>) -> Result<SyncMetadata> {
if !self.is_connected().await {
return Err(EdgeError::network("Not connected"));
}
let mut storage = self.storage.write();
let mut metadata = SyncMetadata::new(
format!("sync-{}", chrono::Utc::now().timestamp()),
SyncStrategy::Manual,
);
metadata.start();
for item in items {
storage.insert(item.id.clone(), item);
}
metadata.complete(storage.len(), 0);
Ok(metadata)
}
async fn pull(&self, _since: Option<chrono::DateTime<chrono::Utc>>) -> Result<Vec<SyncItem>> {
if !self.is_connected().await {
return Err(EdgeError::network("Not connected"));
}
let storage = self.storage.read();
Ok(storage.values().cloned().collect())
}
async fn sync(&self, local_items: Vec<SyncItem>) -> Result<SyncResult> {
if !self.is_connected().await {
return Err(EdgeError::network("Not connected"));
}
let mut metadata = SyncMetadata::new(
format!("sync-{}", chrono::Utc::now().timestamp()),
SyncStrategy::Manual,
);
metadata.start();
let mut storage = self.storage.write();
let mut pushed = Vec::new();
for item in &local_items {
storage.insert(item.id.clone(), item.clone());
pushed.push(item.id.clone());
}
let pulled: Vec<SyncItem> = storage
.values()
.filter(|item| !local_items.iter().any(|l| l.id == item.id))
.cloned()
.collect();
metadata.complete(pushed.len() + pulled.len(), 0);
Ok(SyncResult {
pushed,
pulled,
metadata,
conflicts: Vec::new(),
})
}
async fn is_connected(&self) -> bool {
*self.connected.read()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_protocol_push() -> Result<()> {
let protocol = MockSyncProtocol::new();
let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
let metadata = protocol.push(vec![item]).await?;
assert!(metadata.status.is_complete());
Ok(())
}
#[tokio::test]
async fn test_mock_protocol_pull() -> Result<()> {
let protocol = MockSyncProtocol::new();
let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
protocol.push(vec![item.clone()]).await?;
let items = protocol.pull(None).await?;
assert_eq!(items.len(), 1);
assert_eq!(items[0].id, item.id);
Ok(())
}
#[tokio::test]
async fn test_mock_protocol_sync() -> Result<()> {
let protocol = MockSyncProtocol::new();
let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
let result = protocol.sync(vec![item]).await?;
assert!(result.is_successful());
assert_eq!(result.pushed.len(), 1);
Ok(())
}
#[tokio::test]
async fn test_mock_protocol_connectivity() -> Result<()> {
let protocol = MockSyncProtocol::new();
assert!(protocol.is_connected().await);
protocol.set_connected(false);
assert!(!protocol.is_connected().await);
Ok(())
}
#[tokio::test]
async fn test_mock_protocol_offline() {
let protocol = MockSyncProtocol::new();
protocol.set_connected(false);
let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
let result = protocol.push(vec![item]).await;
assert!(result.is_err());
}
}