use super::{
auth::{JwtManager, TokenPair},
ChangeLogImpl as ChangeLog,
delta_applicator::DeltaApplicator,
protocol::{ChangeEntry, ChangeOperation, SyncMessage, PROTOCOL_VERSION},
vector_clock::VectorClock,
Result, SyncError,
};
use crate::storage::StorageEngine;
use parking_lot::RwLock;
use reqwest::{Client as HttpClient, StatusCode};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct SyncConfig {
pub pull_interval_secs: u64,
pub push_on_commit: bool,
pub batch_size: usize,
pub retry_attempts: u32,
pub retry_initial_delay_ms: u64,
pub retry_max_delay_ms: u64,
pub connection_timeout_secs: u64,
pub request_timeout_secs: u64,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
pull_interval_secs: 30,
push_on_commit: true,
batch_size: 1000,
retry_attempts: 3,
retry_initial_delay_ms: 1000,
retry_max_delay_ms: 30000,
connection_timeout_secs: 30,
request_timeout_secs: 60,
}
}
}
#[derive(Debug, Clone)]
struct ClientState {
last_known_lsn: u64,
last_pull_time: SystemTime,
last_push_time: SystemTime,
is_syncing: bool,
vector_clock: VectorClock,
bg_task_running: bool,
}
impl Default for ClientState {
fn default() -> Self {
Self {
last_known_lsn: 0,
last_pull_time: SystemTime::UNIX_EPOCH,
last_push_time: SystemTime::UNIX_EPOCH,
is_syncing: false,
vector_clock: VectorClock::new(),
bg_task_running: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PullResult {
pub applied: usize,
pub conflicts: usize,
pub has_more: bool,
pub server_lsn: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushResult {
pub pushed: usize,
pub conflicts: Vec<String>,
pub server_lsn: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResult {
pub pull: PullResult,
pub push: PushResult,
pub duration_ms: u64,
}
pub struct SyncClient {
client_id: String,
server_url: String,
local_storage: Arc<StorageEngine>,
change_log: Arc<ChangeLog>,
delta_applicator: Arc<DeltaApplicator>,
http_client: HttpClient,
config: SyncConfig,
state: Arc<RwLock<ClientState>>,
jwt_manager: JwtManager,
access_token: Arc<RwLock<Option<String>>>,
refresh_token: Arc<RwLock<Option<String>>>,
node_id: Uuid,
}
impl SyncClient {
pub fn new(
client_id: String,
server_url: String,
local_storage: Arc<StorageEngine>,
config: SyncConfig,
) -> Result<Self> {
let change_log = Arc::new(
ChangeLog::new(Arc::clone(&local_storage.db))
.map_err(|e| SyncError::Storage(e.to_string()))?,
);
let conflict_detector = Arc::new(crate::sync::conflict::ConflictDetector::default());
let delta_applicator = Arc::new(
DeltaApplicator::new(Arc::clone(&local_storage), conflict_detector)
.map_err(|e| SyncError::Storage(e.to_string()))?,
);
let http_client = HttpClient::builder()
.connect_timeout(Duration::from_secs(config.connection_timeout_secs))
.timeout(Duration::from_secs(config.request_timeout_secs))
.build()
.map_err(|e| SyncError::Network(e.to_string()))?;
let node_id = Uuid::new_v4();
info!(
"Sync client initialized: client_id={}, server={}, node_id={}",
client_id, server_url, node_id
);
Ok(Self {
client_id,
server_url,
local_storage,
change_log,
delta_applicator,
http_client,
config,
state: Arc::new(RwLock::new(ClientState::default())),
jwt_manager: JwtManager::from_env_or_default(),
access_token: Arc::new(RwLock::new(None)),
refresh_token: Arc::new(RwLock::new(None)),
node_id,
})
}
pub fn set_tokens(&self, token_pair: TokenPair) {
*self.access_token.write() = Some(token_pair.access_token);
*self.refresh_token.write() = Some(token_pair.refresh_token);
info!("Authentication tokens updated for client {}", self.client_id);
}
pub async fn register(&self) -> Result<()> {
info!("Registering client {} with server", self.client_id);
let state = self.state.read();
let message = SyncMessage::RegisterClient {
version: PROTOCOL_VERSION,
client_id: self.client_id.clone(),
last_known_lsn: state.last_known_lsn,
vector_clock: state.vector_clock.clone(),
metadata: std::collections::HashMap::new(),
};
drop(state);
self.send_message_with_retry("/api/v1/sync/register", &message)
.await?;
info!("Client {} registered successfully", self.client_id);
Ok(())
}
pub async fn pull(&self) -> Result<PullResult> {
debug!("Starting pull operation for client {}", self.client_id);
{
let mut state = self.state.write();
if state.is_syncing {
return Err(SyncError::InvalidMessage(
"Sync already in progress".to_string(),
));
}
state.is_syncing = true;
}
let result = self.pull_internal().await;
{
let mut state = self.state.write();
state.is_syncing = false;
}
result
}
async fn pull_internal(&self) -> Result<PullResult> {
let message_id = Uuid::new_v4();
let state = self.state.read();
let since_lsn = state.last_known_lsn;
drop(state);
let request = SyncMessage::PullRequest {
message_id,
client_id: self.client_id.clone(),
since_lsn,
max_entries: self.config.batch_size,
continuation_token: None,
};
let response = self
.send_message_with_retry("/api/v1/sync/pull", &request)
.await?;
match response {
SyncMessage::PullResponse {
changes,
server_lsn,
has_more,
vector_clock,
..
} => {
debug!(
"Received {} changes from server (LSN: {}, has_more: {})",
changes.len(),
server_lsn,
has_more
);
let apply_result = self
.delta_applicator
.apply_batch(
changes
.into_iter()
.map(|c| self.convert_protocol_change_to_delta(c))
.collect(),
)
.map_err(|e| SyncError::Storage(e.to_string()))?;
{
let mut state = self.state.write();
state.last_known_lsn = server_lsn;
state.last_pull_time = SystemTime::now();
state.vector_clock.merge(&vector_clock);
}
Ok(PullResult {
applied: apply_result.applied.len(),
conflicts: apply_result.conflicts.len(),
has_more,
server_lsn,
})
}
SyncMessage::SyncError { message, .. } => {
Err(SyncError::InvalidMessage(format!("Server error: {}", message)))
}
_ => Err(SyncError::InvalidMessage(
"Invalid response to PullRequest".to_string(),
)),
}
}
pub async fn push(&self) -> Result<PushResult> {
debug!("Starting push operation for client {}", self.client_id);
let state = self.state.read();
let since_lsn = state.last_known_lsn;
let vector_clock = state.vector_clock.clone();
drop(state);
let local_changes = self
.change_log
.query_since_lsn(since_lsn, Some(self.config.batch_size))
.map_err(|e| SyncError::Storage(e.to_string()))?;
if local_changes.is_empty() {
debug!("No local changes to push");
return Ok(PushResult {
pushed: 0,
conflicts: vec![],
server_lsn: since_lsn,
});
}
debug!("Pushing {} local changes to server", local_changes.len());
let protocol_changes: Vec<ChangeEntry> = local_changes
.into_iter()
.map(|c| self.convert_change_log_to_protocol(c))
.collect();
let message_id = Uuid::new_v4();
let request = SyncMessage::PushChanges {
message_id,
client_id: self.client_id.clone(),
changes: protocol_changes,
vector_clock,
};
let response = self
.send_message_with_retry("/api/v1/sync/push", &request)
.await?;
match response {
SyncMessage::PushAck {
accepted_lsns,
conflicts,
server_lsn,
vector_clock,
..
} => {
debug!(
"Push acknowledged: {} accepted, {} conflicts",
accepted_lsns.len(),
conflicts.len()
);
{
let mut state = self.state.write();
state.last_known_lsn = server_lsn;
state.last_push_time = SystemTime::now();
state.vector_clock.merge(&vector_clock);
}
Ok(PushResult {
pushed: accepted_lsns.len(),
conflicts: conflicts
.into_iter()
.map(|c| format!("{:?}", c.conflict_type))
.collect(),
server_lsn,
})
}
SyncMessage::SyncError { message, .. } => {
Err(SyncError::InvalidMessage(format!("Server error: {}", message)))
}
_ => Err(SyncError::InvalidMessage(
"Invalid response to PushChanges".to_string(),
)),
}
}
pub async fn sync(&self) -> Result<SyncResult> {
let start = SystemTime::now();
info!("Starting full sync for client {}", self.client_id);
let pull = self.pull().await?;
let push = self.push().await?;
let duration = start
.elapsed()
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
info!(
"Sync complete: pulled {} changes, pushed {} changes in {}ms",
pull.applied, push.pushed, duration
);
Ok(SyncResult {
pull,
push,
duration_ms: duration,
})
}
pub async fn start_background_sync(&self) -> Result<JoinHandle<()>> {
{
let mut state = self.state.write();
if state.bg_task_running {
return Err(SyncError::InvalidMessage(
"Background sync already running".to_string(),
));
}
state.bg_task_running = true;
}
let client_clone = self.clone_for_background();
let pull_interval = Duration::from_secs(self.config.pull_interval_secs);
info!(
"Starting background sync with interval of {}s",
self.config.pull_interval_secs
);
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(pull_interval);
loop {
interval.tick().await;
if let Err(e) = client_clone.sync().await {
error!("Background sync failed: {}", e);
}
if let Err(e) = client_clone.send_heartbeat().await {
error!("Heartbeat failed: {}", e);
}
}
});
Ok(handle)
}
pub async fn stop_background_sync(&self) -> Result<()> {
let mut state = self.state.write();
state.bg_task_running = false;
info!("Background sync stopped for client {}", self.client_id);
Ok(())
}
async fn send_heartbeat(&self) -> Result<()> {
let state = self.state.read();
let current_lsn = state.last_known_lsn;
drop(state);
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let heartbeat = SyncMessage::Heartbeat {
client_id: self.client_id.clone(),
timestamp,
current_lsn,
};
debug!("Sending heartbeat to server");
self.send_message("/api/v1/sync/heartbeat", &heartbeat)
.await?;
Ok(())
}
async fn send_message_with_retry(
&self,
endpoint: &str,
message: &SyncMessage,
) -> Result<SyncMessage> {
let mut retry_delay = Duration::from_millis(self.config.retry_initial_delay_ms);
let max_delay = Duration::from_millis(self.config.retry_max_delay_ms);
for attempt in 0..self.config.retry_attempts {
match self.send_message(endpoint, message).await {
Ok(response) => return Ok(response),
Err(e) => {
if attempt + 1 < self.config.retry_attempts {
warn!(
"Request failed (attempt {}/{}): {}. Retrying in {:?}",
attempt + 1,
self.config.retry_attempts,
e,
retry_delay
);
tokio::time::sleep(retry_delay).await;
retry_delay = std::cmp::min(retry_delay * 2, max_delay);
} else {
error!("Request failed after {} attempts: {}", self.config.retry_attempts, e);
return Err(e);
}
}
}
}
Err(SyncError::Network("All retry attempts failed".to_string()))
}
async fn send_message(&self, endpoint: &str, message: &SyncMessage) -> Result<SyncMessage> {
let url = format!("{}{}", self.server_url, endpoint);
let token = self.access_token.read().clone();
let mut request_builder = self.http_client.post(&url).json(message);
if let Some(token) = token {
request_builder = request_builder.header("Authorization", format!("Bearer {}", token));
}
let response = request_builder
.send()
.await
.map_err(|e| SyncError::Network(e.to_string()))?;
let status = response.status();
if !status.is_success() {
if status == StatusCode::UNAUTHORIZED {
if self.try_refresh_token().await.is_ok() {
return self.send_message(endpoint, message).await;
}
return Err(SyncError::Authentication);
}
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(SyncError::Network(format!(
"HTTP {}: {}",
status, error_text
)));
}
let sync_response: SyncMessage = response
.json()
.await
.map_err(|e| SyncError::Serialization(e.to_string()))?;
Ok(sync_response)
}
async fn try_refresh_token(&self) -> Result<()> {
let refresh_token = self.refresh_token.read();
let refresh_token_str = refresh_token
.as_ref()
.ok_or(SyncError::Authentication)?
.clone();
drop(refresh_token);
debug!("Attempting to refresh access token");
let new_access_token = self
.jwt_manager
.refresh_access_token(&refresh_token_str)
.map_err(|_| SyncError::Authentication)?;
*self.access_token.write() = Some(new_access_token);
info!("Access token refreshed successfully");
Ok(())
}
fn convert_protocol_change_to_delta(
&self,
change: ChangeEntry,
) -> crate::sync::delta_applicator::ChangeEntry {
crate::sync::delta_applicator::ChangeEntry {
lsn: change.lsn,
table: change.table,
operation: match change.operation {
ChangeOperation::Insert => crate::sync::message::Operation::Insert,
ChangeOperation::Update => {
crate::sync::message::Operation::Update { columns: vec![] }
}
ChangeOperation::Delete => crate::sync::message::Operation::Delete,
},
row_id: change.key,
data: change.data,
vector_clock: change.vector_clock,
checksum: change.checksum,
node_id: self.node_id,
}
}
fn convert_change_log_to_protocol(
&self,
change: crate::sync::change_log::ChangeEntry,
) -> ChangeEntry {
use crate::sync::change_log::ChangeType;
let (operation, table, key, data) = match change.change_type {
ChangeType::Insert { table, row_id, data } => {
(ChangeOperation::Insert, table, row_id.to_be_bytes().to_vec(), data)
}
ChangeType::Update {
table,
row_id,
new_data,
..
} => {
(ChangeOperation::Update, table, row_id.to_be_bytes().to_vec(), new_data)
}
ChangeType::Delete { table, row_id, .. } => {
(ChangeOperation::Delete, table, row_id.to_be_bytes().to_vec(), vec![])
}
_ => {
return ChangeEntry {
lsn: change.lsn,
table: "unsupported".to_string(),
operation: ChangeOperation::Insert,
key: vec![],
data: vec![],
vector_clock: change.vector_clock,
timestamp: chrono::DateTime::from_timestamp_millis(change.timestamp as i64)
.unwrap_or_default(),
checksum: 0,
compressed: false,
};
}
};
let mut entry = ChangeEntry {
lsn: change.lsn,
table,
operation,
key,
data,
vector_clock: change.vector_clock,
timestamp: chrono::DateTime::from_timestamp_millis(change.timestamp as i64)
.unwrap_or_default(),
checksum: 0,
compressed: false,
};
entry.checksum = entry.calculate_checksum();
entry
}
fn clone_for_background(&self) -> Self {
Self {
client_id: self.client_id.clone(),
server_url: self.server_url.clone(),
local_storage: Arc::clone(&self.local_storage),
change_log: Arc::clone(&self.change_log),
delta_applicator: Arc::clone(&self.delta_applicator),
http_client: self.http_client.clone(),
config: self.config.clone(),
state: Arc::clone(&self.state),
jwt_manager: self.jwt_manager.clone(),
access_token: Arc::clone(&self.access_token),
refresh_token: Arc::clone(&self.refresh_token),
node_id: self.node_id,
}
}
pub fn status(&self) -> ClientStatus {
let state = self.state.read();
ClientStatus {
client_id: self.client_id.clone(),
last_known_lsn: state.last_known_lsn,
is_syncing: state.is_syncing,
bg_task_running: state.bg_task_running,
last_pull_time: state.last_pull_time,
last_push_time: state.last_push_time,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientStatus {
pub client_id: String,
pub last_known_lsn: u64,
pub is_syncing: bool,
pub bg_task_running: bool,
pub last_pull_time: SystemTime,
pub last_push_time: SystemTime,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Config;
fn create_test_client() -> SyncClient {
let config = Config::default();
let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());
SyncClient::new(
"test-client".to_string(),
"http://localhost:8080".to_string(),
storage,
SyncConfig::default(),
)
.unwrap()
}
#[test]
fn test_client_creation() {
let client = create_test_client();
assert_eq!(client.client_id, "test-client");
assert_eq!(client.server_url, "http://localhost:8080");
}
#[test]
fn test_config_defaults() {
let config = SyncConfig::default();
assert_eq!(config.pull_interval_secs, 30);
assert_eq!(config.batch_size, 1000);
assert_eq!(config.retry_attempts, 3);
assert!(config.push_on_commit);
}
#[test]
fn test_client_status() {
let client = create_test_client();
let status = client.status();
assert_eq!(status.client_id, "test-client");
assert_eq!(status.last_known_lsn, 0);
assert!(!status.is_syncing);
assert!(!status.bg_task_running);
}
#[test]
fn test_set_tokens() {
let client = create_test_client();
let token_pair = TokenPair::new(
"access-token".to_string(),
"refresh-token".to_string(),
3600,
);
client.set_tokens(token_pair);
let access = client.access_token.read();
assert!(access.is_some());
assert_eq!(access.as_ref().unwrap(), "access-token");
}
#[tokio::test]
async fn test_concurrent_sync_prevention() {
let client = Arc::new(create_test_client());
{
let mut state = client.state.write();
state.is_syncing = true;
}
let result = client.pull().await;
assert!(result.is_err());
match result {
Err(SyncError::InvalidMessage(msg)) => {
assert!(msg.contains("already in progress"));
}
_ => panic!("Expected InvalidMessage error"),
}
}
#[tokio::test]
async fn test_background_sync_already_running() {
let client = create_test_client();
{
let mut state = client.state.write();
state.bg_task_running = true;
}
let result = client.start_background_sync().await;
assert!(result.is_err());
}
}