pub mod manager;
pub mod protocol;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub use manager::SyncManager;
pub use protocol::SyncProtocol;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SyncStrategy {
Manual,
Periodic,
Incremental,
Batch,
Realtime,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SyncStatus {
NotSynced,
Syncing,
Synced,
Failed(String),
Pending,
}
impl SyncStatus {
pub fn is_complete(&self) -> bool {
matches!(self, Self::Synced)
}
pub fn is_syncing(&self) -> bool {
matches!(self, Self::Syncing)
}
pub fn is_failed(&self) -> bool {
matches!(self, Self::Failed(_))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncMetadata {
pub sync_id: String,
pub strategy: SyncStrategy,
pub status: SyncStatus,
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub items_synced: usize,
pub bytes_transferred: usize,
pub error: Option<String>,
}
impl SyncMetadata {
pub fn new(sync_id: String, strategy: SyncStrategy) -> Self {
Self {
sync_id,
strategy,
status: SyncStatus::Pending,
started_at: Utc::now(),
completed_at: None,
items_synced: 0,
bytes_transferred: 0,
error: None,
}
}
pub fn start(&mut self) {
self.status = SyncStatus::Syncing;
self.started_at = Utc::now();
}
pub fn complete(&mut self, items: usize, bytes: usize) {
self.status = SyncStatus::Synced;
self.completed_at = Some(Utc::now());
self.items_synced = items;
self.bytes_transferred = bytes;
}
pub fn fail(&mut self, error: String) {
self.status = SyncStatus::Failed(error.clone());
self.completed_at = Some(Utc::now());
self.error = Some(error);
}
pub fn duration(&self) -> Option<chrono::Duration> {
self.completed_at.map(|end| end - self.started_at)
}
pub fn throughput(&self) -> Option<f64> {
self.duration().map(|d| {
let secs = d.num_milliseconds() as f64 / 1000.0;
if secs > 0.0 {
self.bytes_transferred as f64 / secs
} else {
0.0
}
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncItem {
pub id: String,
pub key: String,
pub data: Vec<u8>,
pub version: u64,
pub modified_at: DateTime<Utc>,
pub checksum: String,
}
impl SyncItem {
pub fn new(id: String, key: String, data: Vec<u8>, version: u64) -> Self {
let checksum = Self::calculate_checksum(&data);
Self {
id,
key,
data,
version,
modified_at: Utc::now(),
checksum,
}
}
fn calculate_checksum(data: &[u8]) -> String {
let hash = blake3::hash(data);
hash.to_hex().to_string()
}
pub fn verify_checksum(&self) -> bool {
Self::calculate_checksum(&self.data) == self.checksum
}
pub fn size(&self) -> usize {
self.data.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncBatch {
pub batch_id: String,
pub items: Vec<SyncItem>,
pub created_at: DateTime<Utc>,
pub compressed: bool,
}
impl SyncBatch {
pub fn new(batch_id: String) -> Self {
Self {
batch_id,
items: Vec::new(),
created_at: Utc::now(),
compressed: false,
}
}
pub fn add_item(&mut self, item: SyncItem) {
self.items.push(item);
}
pub fn size(&self) -> usize {
self.items.iter().map(|item| item.size()).sum()
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncState {
pub last_sync: Option<DateTime<Utc>>,
pub pending_items: HashMap<String, SyncItem>,
pub current_sync: Option<SyncMetadata>,
pub history: Vec<SyncMetadata>,
}
impl Default for SyncState {
fn default() -> Self {
Self::new()
}
}
impl SyncState {
pub fn new() -> Self {
Self {
last_sync: None,
pending_items: HashMap::new(),
current_sync: None,
history: Vec::new(),
}
}
pub fn add_pending(&mut self, item: SyncItem) {
self.pending_items.insert(item.id.clone(), item);
}
pub fn remove_pending(&mut self, item_id: &str) -> Option<SyncItem> {
self.pending_items.remove(item_id)
}
pub fn pending_count(&self) -> usize {
self.pending_items.len()
}
pub fn start_sync(&mut self, metadata: SyncMetadata) {
self.current_sync = Some(metadata);
}
pub fn complete_sync(&mut self) {
if let Some(mut sync) = self.current_sync.take() {
sync.complete(0, 0);
self.last_sync = Some(Utc::now());
self.history.push(sync);
if self.history.len() > 100 {
self.history.remove(0);
}
}
}
pub fn fail_sync(&mut self, error: String) {
if let Some(mut sync) = self.current_sync.take() {
sync.fail(error);
self.history.push(sync);
if self.history.len() > 100 {
self.history.remove(0);
}
}
}
pub fn statistics(&self) -> SyncStatistics {
let total_syncs = self.history.len();
let successful = self
.history
.iter()
.filter(|s| s.status.is_complete())
.count();
let failed = self.history.iter().filter(|s| s.status.is_failed()).count();
let avg_throughput = if successful > 0 {
let sum: f64 = self.history.iter().filter_map(|s| s.throughput()).sum();
sum / successful as f64
} else {
0.0
};
SyncStatistics {
total_syncs,
successful_syncs: successful,
failed_syncs: failed,
pending_items: self.pending_count(),
last_sync: self.last_sync,
avg_throughput_bps: avg_throughput,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncStatistics {
pub total_syncs: usize,
pub successful_syncs: usize,
pub failed_syncs: usize,
pub pending_items: usize,
pub last_sync: Option<DateTime<Utc>>,
pub avg_throughput_bps: f64,
}
impl SyncStatistics {
pub fn success_rate(&self) -> f64 {
if self.total_syncs == 0 {
return 100.0;
}
(self.successful_syncs as f64 / self.total_syncs as f64) * 100.0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sync_metadata() {
let mut metadata = SyncMetadata::new("sync-1".to_string(), SyncStrategy::Incremental);
assert_eq!(metadata.status, SyncStatus::Pending);
metadata.start();
assert_eq!(metadata.status, SyncStatus::Syncing);
metadata.complete(10, 1024);
assert_eq!(metadata.status, SyncStatus::Synced);
assert_eq!(metadata.items_synced, 10);
assert_eq!(metadata.bytes_transferred, 1024);
}
#[test]
fn test_sync_item() {
let item = SyncItem::new(
"item-1".to_string(),
"key-1".to_string(),
vec![1, 2, 3, 4, 5],
1,
);
assert_eq!(item.size(), 5);
assert!(item.verify_checksum());
}
#[test]
fn test_sync_batch() {
let mut batch = SyncBatch::new("batch-1".to_string());
assert!(batch.is_empty());
let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
batch.add_item(item);
assert_eq!(batch.len(), 1);
assert_eq!(batch.size(), 3);
}
#[test]
fn test_sync_state() {
let mut state = SyncState::new();
assert_eq!(state.pending_count(), 0);
let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
state.add_pending(item);
assert_eq!(state.pending_count(), 1);
let removed = state.remove_pending("item-1");
assert!(removed.is_some());
assert_eq!(state.pending_count(), 0);
}
#[test]
fn test_sync_statistics() {
let mut state = SyncState::new();
for i in 0..5 {
let mut metadata = SyncMetadata::new(format!("sync-{}", i), SyncStrategy::Incremental);
metadata.start();
metadata.complete(10, 1024);
state.history.push(metadata);
}
let stats = state.statistics();
assert_eq!(stats.total_syncs, 5);
assert_eq!(stats.successful_syncs, 5);
assert_eq!(stats.success_rate(), 100.0);
}
}