use crate::{
crdt::{Mergeable, ReplicaId},
storage::Storage,
transport::{SyncTransport, TransportError},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SyncEngineError {
#[error("Storage error: {0}")]
Storage(#[from] crate::storage::StorageError),
#[error("Transport error: {0}")]
Transport(#[from] TransportError),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("CRDT error: {0}")]
CrdtError(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("Sync operation failed: {0}")]
SyncFailed(String),
#[error("Conflict resolution failed: {0}")]
ConflictResolution(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SyncState {
NotSynced,
Syncing,
Synced,
Failed(String),
ResolvingConflicts,
Offline,
Connected,
Disconnected,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SyncMessage<T> {
Sync { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
Ack { key: String, replica_id: ReplicaId },
Presence { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
Conflict { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
Heartbeat { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
}
pub struct SyncEngine<Tr>
where
Tr: SyncTransport + Clone,
{
replica_id: ReplicaId,
state: Arc<RwLock<SyncState>>,
peers: Arc<RwLock<HashMap<ReplicaId, PeerInfo>>>,
storage: Storage,
transport: Tr,
sync_queue: Arc<RwLock<Vec<SyncMessage<Vec<u8>>>>>,
conflict_resolver: Arc<RwLock<Option<DefaultConflictResolver>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub replica_id: ReplicaId,
pub last_seen: chrono::DateTime<chrono::Utc>,
pub is_online: bool,
pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
pub sync_status: PeerSyncStatus,
pub id: ReplicaId,
pub status: PeerSyncStatus,
pub version: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum PeerSyncStatus {
Never,
Success { timestamp: chrono::DateTime<chrono::Utc> },
Failed { timestamp: chrono::DateTime<chrono::Utc>, error: String },
Syncing { started: chrono::DateTime<chrono::Utc> },
Connected,
Disconnected,
}
pub struct DefaultConflictResolver;
impl DefaultConflictResolver {
pub fn resolve<T: Mergeable>(&self, local: &T, remote: &T) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
let mut result = local.clone();
result.merge(remote).map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
Ok(result)
}
}
impl<Tr> SyncEngine<Tr>
where
Tr: SyncTransport + Clone + 'static,
{
pub fn new(storage: Storage, transport: Tr) -> Self {
Self {
replica_id: ReplicaId::default(),
state: Arc::new(RwLock::new(SyncState::NotSynced)),
peers: Arc::new(RwLock::new(HashMap::new())),
storage,
transport,
sync_queue: Arc::new(RwLock::new(Vec::new())),
conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
}
}
pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
Self {
replica_id,
state: Arc::new(RwLock::new(SyncState::NotSynced)),
peers: Arc::new(RwLock::new(HashMap::new())),
storage,
transport,
sync_queue: Arc::new(RwLock::new(Vec::new())),
conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
}
}
pub async fn state(&self) -> SyncState {
self.state.read().await.clone()
}
pub fn replica_id(&self) -> ReplicaId {
self.replica_id
}
pub async fn is_online(&self) -> bool {
self.transport.is_connected()
}
pub async fn peer_count(&self) -> usize {
self.peers.read().await.len()
}
pub async fn start_sync(&mut self) -> Result<(), SyncEngineError> {
let mut state = self.state.write().await;
*state = SyncState::Syncing;
if !self.transport.is_connected() {
tracing::info!("Transport not connected, attempting to connect...");
}
self.announce_presence().await?;
self.start_background_sync().await;
Ok(())
}
pub async fn stop_sync(&mut self) -> Result<(), SyncEngineError> {
let mut state = self.state.write().await;
*state = SyncState::NotSynced;
if self.transport.is_connected() {
tracing::info!("Stopping sync, disconnecting from transport...");
}
Ok(())
}
pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncEngineError>
where
V: Mergeable + Serialize + Send + Sync + Clone,
{
let data = serde_json::to_vec(value)?;
let message = SyncMessage::Sync {
key: key.to_string(),
data,
replica_id: self.replica_id,
timestamp: chrono::Utc::now(),
};
{
let mut queue = self.sync_queue.write().await;
queue.push(SyncMessage::Sync {
key: key.to_string(),
data: serde_json::to_vec(value)?,
replica_id: self.replica_id,
timestamp: chrono::Utc::now(),
});
}
let message_bytes = serde_json::to_vec(&message)?;
self.transport.send(&message_bytes).await
.map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
Ok(())
}
pub async fn process_messages(&mut self) -> Result<(), SyncEngineError> {
let messages = self.transport.receive().await
.map_err(|e| SyncEngineError::Transport(TransportError::ReceiveFailed(e.to_string())))?;
for message_bytes in messages {
let message: SyncMessage<Vec<u8>> = serde_json::from_slice(&message_bytes)?;
match message {
SyncMessage::Sync { key, data, replica_id, timestamp } => {
self.handle_sync_message(key, data, replica_id, timestamp).await?;
}
SyncMessage::Ack { key, replica_id } => {
self.handle_ack_message(key, replica_id).await?;
}
SyncMessage::Presence { replica_id, timestamp } => {
self.handle_presence_message(replica_id, timestamp).await?;
}
SyncMessage::Conflict { key, data, replica_id, timestamp } => {
self.handle_conflict_message(key, data, replica_id, timestamp).await?;
}
SyncMessage::Heartbeat { replica_id, timestamp } => {
self.handle_heartbeat_message(replica_id, timestamp).await?;
}
}
}
Ok(())
}
async fn announce_presence(&self) -> Result<(), SyncEngineError> {
let message: SyncMessage<()> = SyncMessage::Presence {
replica_id: self.replica_id,
timestamp: chrono::Utc::now(),
};
let message_bytes = serde_json::to_vec(&message)?;
self.transport.send(&message_bytes).await
.map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
Ok(())
}
async fn send_heartbeat(&self) -> Result<(), SyncEngineError> {
let message: SyncMessage<()> = SyncMessage::Heartbeat {
replica_id: self.replica_id,
timestamp: chrono::Utc::now(),
};
let message_bytes = serde_json::to_vec(&message)?;
self.transport.send(&message_bytes).await
.map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
Ok(())
}
async fn start_background_sync(&self) {
let transport = self.transport.clone();
let replica_id = self.replica_id;
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let message: SyncMessage<()> = SyncMessage::Heartbeat {
replica_id,
timestamp: chrono::Utc::now(),
};
if let Ok(message_bytes) = serde_json::to_vec(&message) {
let _ = transport.send(&message_bytes).await;
}
}
});
}
async fn handle_sync_message(&mut self, key: String, _data: Vec<u8>, replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
tracing::debug!("Received sync message for key {} from replica {}", key, replica_id);
let ack: SyncMessage<()> = SyncMessage::Ack {
key,
replica_id,
};
let ack_bytes = serde_json::to_vec(&ack)?;
self.transport.send(&ack_bytes).await
.map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
Ok(())
}
async fn handle_ack_message(&mut self, _key: String, _replica_id: ReplicaId) -> Result<(), SyncEngineError> {
tracing::debug!("Received ack message");
Ok(())
}
async fn handle_presence_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
let mut peers = self.peers.write().await;
let peer_info = PeerInfo {
replica_id: replica_id.clone(),
last_seen: timestamp,
is_online: true,
last_sync: None,
sync_status: PeerSyncStatus::Never,
id: replica_id,
status: PeerSyncStatus::Never,
version: 1,
};
peers.insert(replica_id, peer_info);
tracing::debug!("Updated peer info for replica {}", replica_id);
Ok(())
}
async fn handle_conflict_message(&mut self, _key: String, _data: Vec<u8>, _replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
tracing::debug!("Received conflict message");
Ok(())
}
async fn handle_heartbeat_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
let mut peers = self.peers.write().await;
if let Some(peer_info) = peers.get_mut(&replica_id) {
peer_info.last_seen = timestamp;
tracing::debug!("Updated heartbeat for replica {}", replica_id);
}
Ok(())
}
pub async fn peers(&self) -> impl Iterator<Item = (ReplicaId, PeerInfo)> + 'static {
let peers = self.peers.read().await;
peers.clone().into_iter()
}
fn has_conflict<V: Mergeable>(&self, _local: &V, _remote: &V) -> bool {
false
}
}