use super::{
auth::{Authorizer, Claims, JwtManager},
conflicts::{Conflict, ConflictManager, ConflictType},
Acknowledgment, ConflictResolution, Operation, RowDelta, SyncError, SyncRequest, SyncResponse,
VectorClock,
};
use chrono::Utc;
use parking_lot::RwLock;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone)]
struct VersionedChange {
version: u64,
delta: RowDelta,
}
pub struct SyncServer {
current_version: u64,
vector_clock: VectorClock,
conflict_manager: ConflictManager,
jwt_manager: JwtManager,
authorizer: Authorizer,
change_log: RwLock<Vec<VersionedChange>>,
row_versions: RwLock<HashMap<(String, Vec<u8>), (u64, VectorClock)>>,
}
impl SyncServer {
pub fn new() -> Self {
Self {
current_version: 0,
vector_clock: VectorClock::new(),
conflict_manager: ConflictManager::new(ConflictResolution::UseServer),
jwt_manager: JwtManager::from_env_or_default(),
authorizer: Authorizer::new(),
change_log: RwLock::new(Vec::new()),
row_versions: RwLock::new(HashMap::new()),
}
}
pub fn with_jwt_secret(secret: &[u8]) -> Self {
Self {
current_version: 0,
vector_clock: VectorClock::new(),
conflict_manager: ConflictManager::new(ConflictResolution::UseServer),
jwt_manager: JwtManager::new(secret),
authorizer: Authorizer::new(),
change_log: RwLock::new(Vec::new()),
row_versions: RwLock::new(HashMap::new()),
}
}
pub fn with_auth(jwt_manager: JwtManager, authorizer: Authorizer) -> Self {
Self {
current_version: 0,
vector_clock: VectorClock::new(),
conflict_manager: ConflictManager::new(ConflictResolution::UseServer),
jwt_manager,
authorizer,
change_log: RwLock::new(Vec::new()),
row_versions: RwLock::new(HashMap::new()),
}
}
pub async fn handle_sync_request(
&mut self,
request: SyncRequest,
jwt_token: &str,
) -> Result<SyncResponse, SyncError> {
let claims = self.authenticate(&jwt_token).await?;
if claims.client_id != request.client_id {
tracing::warn!(
"Client ID mismatch: JWT={}, Request={}",
claims.client_id,
request.client_id
);
return Err(SyncError::Authentication);
}
let delta = self.get_changes_since(request.last_sync_version).await?;
let conflicts = self.detect_conflicts(&request.vector_clock, &delta).await?;
Ok(SyncResponse {
server_version: self.current_version,
delta,
conflicts,
continuation_token: None,
vector_clock: self.vector_clock.clone(),
})
}
pub async fn handle_client_deltas(
&mut self,
client_id: Uuid,
deltas: Vec<RowDelta>,
jwt_token: &str,
) -> Result<Acknowledgment, SyncError> {
let claims = self.authenticate(&jwt_token).await?;
if claims.client_id != client_id {
tracing::warn!(
"Client ID mismatch in deltas: JWT={}, Request={}",
claims.client_id,
client_id
);
return Err(SyncError::Authentication);
}
let applied_count = deltas.len() as u32;
self.current_version += 1;
Ok(Acknowledgment {
new_version: self.current_version,
applied_count,
failed: vec![],
vector_clock: self.vector_clock.clone(),
})
}
async fn authenticate(&self, token: &str) -> Result<Claims, SyncError> {
let claims = self
.jwt_manager
.validate_with_scope(token, "sync:read")
.map_err(|e| {
tracing::warn!("JWT validation failed: {:?}", e);
SyncError::Authentication
})?;
if claims.is_expired() {
tracing::warn!("Token expired for user: {}", claims.sub);
return Err(SyncError::Authentication);
}
self.authorizer.validate_claims(&claims).map_err(|e| {
tracing::warn!("Authorization failed for tenant: {}", claims.tenant_id);
SyncError::Authentication
})?;
tracing::debug!(
"Successfully authenticated client: {} (tenant: {}, user: {})",
claims.client_id,
claims.tenant_id,
claims.sub
);
Ok(claims)
}
pub fn generate_token(
&self,
user_id: String,
tenant_id: String,
client_id: Uuid,
) -> Result<String, SyncError> {
self.jwt_manager
.generate_token(user_id, tenant_id, client_id)
}
pub fn generate_token_pair(
&self,
user_id: String,
tenant_id: String,
client_id: Uuid,
) -> Result<super::TokenPair, SyncError> {
let access_token = self
.jwt_manager
.generate_token(user_id.clone(), tenant_id.clone(), client_id)?;
let refresh_token = self
.jwt_manager
.generate_refresh_token(user_id, tenant_id, client_id)?;
Ok(super::TokenPair::new(access_token, refresh_token, 3600))
}
pub fn refresh_token(&self, refresh_token: &str) -> Result<String, SyncError> {
self.jwt_manager.refresh_access_token(refresh_token)
}
pub fn add_tenant(&mut self, tenant_id: String) {
self.authorizer.add_tenant(tenant_id);
}
pub fn remove_tenant(&mut self, tenant_id: &str) -> bool {
self.authorizer.remove_tenant(tenant_id)
}
async fn get_changes_since(&self, version: u64) -> Result<Vec<RowDelta>, SyncError> {
let change_log = self.change_log.read();
let deltas: Vec<RowDelta> = change_log
.iter()
.filter(|entry| entry.version > version)
.map(|entry| entry.delta.clone())
.collect();
tracing::debug!(
"Retrieved {} changes since version {} (current: {})",
deltas.len(),
version,
self.current_version
);
Ok(deltas)
}
async fn detect_conflicts(
&self,
client_clock: &VectorClock,
deltas: &[RowDelta],
) -> Result<Vec<Conflict>, SyncError> {
let mut conflicts = Vec::new();
let row_versions = self.row_versions.read();
for delta in deltas {
let key = (delta.table.clone(), delta.row_id.clone());
if let Some((server_version, server_clock)) = row_versions.get(&key) {
if self.conflict_manager.detect_conflict(client_clock, server_clock) {
let conflict_type = match &delta.operation {
Operation::Delete => ConflictType::DeleteUpdate,
Operation::Insert => ConflictType::UniqueViolation,
Operation::Update { .. } => ConflictType::ConcurrentUpdate,
};
let server_data = self.get_row_data(&delta.table, &delta.row_id);
let conflict = Conflict {
id: Uuid::new_v4(),
table: delta.table.clone(),
row_id: delta.row_id.clone(),
conflict_type,
client_version: delta.data.clone(),
server_version: server_data,
resolution: self.conflict_manager.strategy().clone(),
};
tracing::warn!(
"Conflict detected: table={}, row_id={:?}, type={:?}",
delta.table,
delta.row_id,
conflict_type
);
conflicts.push(conflict);
}
}
}
tracing::debug!(
"Conflict detection complete: {} conflicts found in {} deltas",
conflicts.len(),
deltas.len()
);
Ok(conflicts)
}
pub fn record_change(&mut self, delta: RowDelta) {
let version = self.current_version;
{
let mut row_versions = self.row_versions.write();
let key = (delta.table.clone(), delta.row_id.clone());
row_versions.insert(key, (version, delta.vector_clock.clone()));
}
{
let mut change_log = self.change_log.write();
change_log.push(VersionedChange {
version,
delta,
});
}
tracing::debug!("Recorded change at version {}", version);
}
fn get_row_data(&self, table: &str, row_id: &[u8]) -> Vec<u8> {
let change_log = self.change_log.read();
for entry in change_log.iter().rev() {
if entry.delta.table == table && entry.delta.row_id == row_id {
return entry.delta.data.clone();
}
}
Vec::new() }
pub fn compact_change_log(&self, older_than_version: u64) -> usize {
let mut change_log = self.change_log.write();
let original_len = change_log.len();
change_log.retain(|entry| entry.version >= older_than_version);
let removed = original_len - change_log.len();
tracing::info!(
"Compacted change log: removed {} entries older than version {}",
removed,
older_than_version
);
removed
}
pub fn version(&self) -> u64 {
self.current_version
}
pub fn change_log_size(&self) -> usize {
self.change_log.read().len()
}
}
impl Default for SyncServer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::sync::SyncMode;
#[tokio::test]
async fn test_sync_server_creation() {
let server = SyncServer::new();
assert_eq!(server.current_version, 0);
}
#[tokio::test]
async fn test_handle_sync_request_with_auth() {
let mut server = SyncServer::with_jwt_secret(b"test-secret");
let client_id = Uuid::new_v4();
let token = server
.generate_token("user123".to_string(), "tenant456".to_string(), client_id)
.unwrap();
let request = SyncRequest {
client_id,
last_sync_version: 0,
changed_tables: vec![],
pending_changes: 0,
vector_clock: VectorClock::new(),
sync_mode: SyncMode::Incremental,
};
let response = server.handle_sync_request(request, &token).await.unwrap();
assert_eq!(response.server_version, 0);
}
#[tokio::test]
async fn test_handle_sync_request_invalid_token() {
let mut server = SyncServer::with_jwt_secret(b"test-secret");
let request = SyncRequest {
client_id: Uuid::new_v4(),
last_sync_version: 0,
changed_tables: vec![],
pending_changes: 0,
vector_clock: VectorClock::new(),
sync_mode: SyncMode::Incremental,
};
let result = server.handle_sync_request(request, "invalid.token.here").await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), SyncError::Authentication));
}
#[tokio::test]
async fn test_handle_sync_request_mismatched_client_id() {
let mut server = SyncServer::with_jwt_secret(b"test-secret");
let client_id = Uuid::new_v4();
let different_client_id = Uuid::new_v4();
let token = server
.generate_token("user123".to_string(), "tenant456".to_string(), client_id)
.unwrap();
let request = SyncRequest {
client_id: different_client_id,
last_sync_version: 0,
changed_tables: vec![],
pending_changes: 0,
vector_clock: VectorClock::new(),
sync_mode: SyncMode::Incremental,
};
let result = server.handle_sync_request(request, &token).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), SyncError::Authentication));
}
#[tokio::test]
async fn test_handle_client_deltas_with_auth() {
let mut server = SyncServer::with_jwt_secret(b"test-secret");
let client_id = Uuid::new_v4();
let token = server
.generate_token("user123".to_string(), "tenant456".to_string(), client_id)
.unwrap();
let deltas = vec![];
let ack = server
.handle_client_deltas(client_id, deltas, &token)
.await
.unwrap();
assert_eq!(ack.applied_count, 0);
assert_eq!(ack.new_version, 1);
}
#[tokio::test]
async fn test_tenant_authorization() {
let jwt_manager = super::super::JwtManager::new(b"test-secret");
let mut authorizer = super::super::Authorizer::new();
authorizer.add_tenant("allowed-tenant".to_string());
let mut server = SyncServer::with_auth(jwt_manager, authorizer);
let client_id = Uuid::new_v4();
let token = server
.generate_token(
"user123".to_string(),
"allowed-tenant".to_string(),
client_id,
)
.unwrap();
let request = SyncRequest {
client_id,
last_sync_version: 0,
changed_tables: vec![],
pending_changes: 0,
vector_clock: VectorClock::new(),
sync_mode: SyncMode::Incremental,
};
let result = server.handle_sync_request(request, &token).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_tenant_authorization_denied() {
let jwt_manager = super::super::JwtManager::new(b"test-secret");
let mut authorizer = super::super::Authorizer::new();
authorizer.add_tenant("allowed-tenant".to_string());
let mut server = SyncServer::with_auth(jwt_manager, authorizer);
let client_id = Uuid::new_v4();
let token = server
.generate_token(
"user123".to_string(),
"forbidden-tenant".to_string(),
client_id,
)
.unwrap();
let request = SyncRequest {
client_id,
last_sync_version: 0,
changed_tables: vec![],
pending_changes: 0,
vector_clock: VectorClock::new(),
sync_mode: SyncMode::Incremental,
};
let result = server.handle_sync_request(request, &token).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), SyncError::Authentication));
}
#[tokio::test]
async fn test_token_generation() {
let server = SyncServer::with_jwt_secret(b"test-secret");
let client_id = Uuid::new_v4();
let token = server
.generate_token("user123".to_string(), "tenant456".to_string(), client_id)
.unwrap();
assert!(!token.is_empty());
let claims = server.authenticate(&token).await.unwrap();
assert_eq!(claims.sub, "user123");
assert_eq!(claims.tenant_id, "tenant456");
assert_eq!(claims.client_id, client_id);
}
#[tokio::test]
async fn test_token_pair_generation() {
let server = SyncServer::with_jwt_secret(b"test-secret");
let client_id = Uuid::new_v4();
let token_pair = server
.generate_token_pair("user123".to_string(), "tenant456".to_string(), client_id)
.unwrap();
assert!(!token_pair.access_token.is_empty());
assert!(!token_pair.refresh_token.is_empty());
assert_eq!(token_pair.token_type, "Bearer");
let claims = server.authenticate(&token_pair.access_token).await.unwrap();
assert_eq!(claims.sub, "user123");
}
#[tokio::test]
async fn test_refresh_token_flow() {
let server = SyncServer::with_jwt_secret(b"test-secret");
let client_id = Uuid::new_v4();
let token_pair = server
.generate_token_pair("user123".to_string(), "tenant456".to_string(), client_id)
.unwrap();
let new_access_token = server.refresh_token(&token_pair.refresh_token).unwrap();
let claims = server.authenticate(&new_access_token).await.unwrap();
assert_eq!(claims.sub, "user123");
assert_eq!(claims.tenant_id, "tenant456");
}
#[tokio::test]
async fn test_tenant_management() {
let mut server = SyncServer::with_jwt_secret(b"test-secret");
server.add_tenant("tenant1".to_string());
server.add_tenant("tenant2".to_string());
assert!(server.remove_tenant("tenant1"));
assert!(!server.remove_tenant("nonexistent"));
}
}