use actix_web::{web, HttpResponse};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncDevice {
pub id: String,
pub user_id: String,
pub name: String,
pub device_type: DeviceType,
pub platform: DevicePlatform,
pub last_sync: Option<DateTime<Utc>>,
pub last_seen: DateTime<Utc>,
pub registered_at: DateTime<Utc>,
pub is_online: bool,
pub sync_config: DeviceSyncConfig,
pub capabilities: DeviceCapabilities,
pub push_token: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DeviceType {
Desktop,
Laptop,
Phone,
Tablet,
Browser,
Server,
Other,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DevicePlatform {
pub os: String,
pub os_version: Option<String>,
pub app_version: String,
pub browser: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceSyncConfig {
pub sync_enabled: bool,
pub sync_on_cellular: bool,
pub wifi_only: bool,
pub max_cache_size: u64,
pub sync_interval: u32,
pub sync_scope: SyncScope,
}
impl Default for DeviceSyncConfig {
fn default() -> Self {
Self {
sync_enabled: true,
sync_on_cellular: true,
wifi_only: false,
max_cache_size: 100 * 1024 * 1024, sync_interval: 0, sync_scope: SyncScope::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncScope {
pub sessions: bool,
pub workspaces: bool,
pub settings: bool,
pub favorites: bool,
pub tags: bool,
}
impl Default for SyncScope {
fn default() -> Self {
Self {
sessions: true,
workspaces: true,
settings: true,
favorites: true,
tags: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceCapabilities {
pub supports_push: bool,
pub supports_background_sync: bool,
pub supports_encryption: bool,
pub max_payload_size: u64,
}
impl Default for DeviceCapabilities {
fn default() -> Self {
Self {
supports_push: true,
supports_background_sync: true,
supports_encryption: true,
max_payload_size: 10 * 1024 * 1024, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncOperation {
pub id: String,
pub timestamp: DateTime<Utc>,
pub logical_clock: u64,
pub device_id: String,
pub user_id: String,
pub operation: OperationType,
pub resource_type: SyncResourceType,
pub resource_id: String,
pub data: serde_json::Value,
pub acknowledged: bool,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationType {
Create,
Update,
Delete,
Move,
Merge,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum SyncResourceType {
Session,
Message,
Workspace,
Settings,
Tag,
Favorite,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncState {
pub device_id: String,
pub cursors: HashMap<SyncResourceType, String>,
pub last_sync: DateTime<Utc>,
pub pending_count: usize,
pub status: SyncStatus,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SyncStatus {
Idle,
Syncing,
Paused,
Error,
Offline,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncConflict {
pub id: String,
pub resource_type: SyncResourceType,
pub resource_id: String,
pub local_operation: SyncOperation,
pub remote_operation: SyncOperation,
pub resolution: Option<ConflictResolution>,
pub created_at: DateTime<Utc>,
pub resolved: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConflictResolution {
KeepLocal,
KeepRemote,
Merge { merged_data: serde_json::Value },
Manual { chosen_data: serde_json::Value },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncDelta {
pub operations: Vec<SyncOperation>,
pub cursors: HashMap<SyncResourceType, String>,
pub has_more: bool,
pub server_time: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushChangesRequest {
pub device_id: String,
pub operations: Vec<SyncOperation>,
pub cursors: HashMap<SyncResourceType, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushChangesResponse {
pub applied: Vec<String>,
pub rejected: Vec<String>,
pub conflicts: Vec<SyncConflict>,
pub cursors: HashMap<SyncResourceType, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PullChangesRequest {
pub device_id: String,
pub cursors: HashMap<SyncResourceType, String>,
pub limit: Option<usize>,
pub resource_types: Vec<SyncResourceType>,
}
pub struct CrossDeviceSyncService {
devices: std::sync::RwLock<HashMap<String, SyncDevice>>,
operations: std::sync::RwLock<Vec<SyncOperation>>,
conflicts: std::sync::RwLock<HashMap<String, SyncConflict>>,
logical_clock: std::sync::atomic::AtomicU64,
}
impl CrossDeviceSyncService {
pub fn new() -> Self {
Self {
devices: std::sync::RwLock::new(HashMap::new()),
operations: std::sync::RwLock::new(Vec::new()),
conflicts: std::sync::RwLock::new(HashMap::new()),
logical_clock: std::sync::atomic::AtomicU64::new(0),
}
}
pub async fn register_device(
&self,
user_id: &str,
name: &str,
device_type: DeviceType,
platform: DevicePlatform,
) -> Result<SyncDevice, String> {
let device = SyncDevice {
id: Uuid::new_v4().to_string(),
user_id: user_id.to_string(),
name: name.to_string(),
device_type,
platform,
last_sync: None,
last_seen: Utc::now(),
registered_at: Utc::now(),
is_online: true,
sync_config: DeviceSyncConfig::default(),
capabilities: DeviceCapabilities::default(),
push_token: None,
};
let mut devices = self.devices.write().map_err(|e| e.to_string())?;
devices.insert(device.id.clone(), device.clone());
Ok(device)
}
pub async fn get_device(&self, device_id: &str) -> Result<SyncDevice, String> {
let devices = self.devices.read().map_err(|e| e.to_string())?;
devices
.get(device_id)
.cloned()
.ok_or_else(|| format!("Device not found: {}", device_id))
}
pub async fn list_devices(&self, user_id: &str) -> Result<Vec<SyncDevice>, String> {
let devices = self.devices.read().map_err(|e| e.to_string())?;
let result: Vec<_> = devices
.values()
.filter(|d| d.user_id == user_id)
.cloned()
.collect();
Ok(result)
}
pub async fn update_device(
&self,
device_id: &str,
name: Option<String>,
sync_config: Option<DeviceSyncConfig>,
push_token: Option<String>,
) -> Result<SyncDevice, String> {
let mut devices = self.devices.write().map_err(|e| e.to_string())?;
let device = devices
.get_mut(device_id)
.ok_or_else(|| format!("Device not found: {}", device_id))?;
if let Some(n) = name {
device.name = n;
}
if let Some(config) = sync_config {
device.sync_config = config;
}
if let Some(token) = push_token {
device.push_token = Some(token);
}
Ok(device.clone())
}
pub async fn remove_device(&self, device_id: &str) -> Result<(), String> {
let mut devices = self.devices.write().map_err(|e| e.to_string())?;
devices
.remove(device_id)
.map(|_| ())
.ok_or_else(|| format!("Device not found: {}", device_id))
}
pub async fn set_device_online(&self, device_id: &str, online: bool) -> Result<(), String> {
let mut devices = self.devices.write().map_err(|e| e.to_string())?;
if let Some(device) = devices.get_mut(device_id) {
device.is_online = online;
device.last_seen = Utc::now();
}
Ok(())
}
pub async fn push_changes(
&self,
request: PushChangesRequest,
) -> Result<PushChangesResponse, String> {
let mut applied = Vec::new();
let mut rejected = Vec::new();
let mut conflicts = Vec::new();
let clock = self
.logical_clock
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut operations = self.operations.write().map_err(|e| e.to_string())?;
for mut op in request.operations {
if let Some(conflict) = self.detect_conflict(&op, &operations) {
rejected.push(op.id.clone());
let conflict_record = SyncConflict {
id: Uuid::new_v4().to_string(),
resource_type: op.resource_type,
resource_id: op.resource_id.clone(),
local_operation: op,
remote_operation: conflict,
resolution: None,
created_at: Utc::now(),
resolved: false,
};
conflicts.push(conflict_record.clone());
let mut conflict_store = self.conflicts.write().map_err(|e| e.to_string())?;
conflict_store.insert(conflict_record.id.clone(), conflict_record);
} else {
op.logical_clock = clock;
op.acknowledged = true;
applied.push(op.id.clone());
operations.push(op);
}
}
{
let mut devices = self.devices.write().map_err(|e| e.to_string())?;
if let Some(device) = devices.get_mut(&request.device_id) {
device.last_sync = Some(Utc::now());
}
}
let mut cursors = request.cursors;
for op_id in &applied {
if let Some(op) = operations.iter().find(|o| &o.id == op_id) {
cursors.insert(op.resource_type, op.id.clone());
}
}
Ok(PushChangesResponse {
applied,
rejected,
conflicts,
cursors,
})
}
pub async fn pull_changes(&self, request: PullChangesRequest) -> Result<SyncDelta, String> {
let operations = self.operations.read().map_err(|e| e.to_string())?;
let limit = request.limit.unwrap_or(100);
let mut pending_ops: Vec<_> = operations
.iter()
.filter(|op| {
if !request.resource_types.is_empty()
&& !request.resource_types.contains(&op.resource_type)
{
return false;
}
if let Some(cursor) = request.cursors.get(&op.resource_type) {
if let Some(cursor_pos) = operations.iter().position(|o| &o.id == cursor) {
if let Some(op_pos) = operations.iter().position(|o| o.id == op.id) {
return op_pos > cursor_pos;
}
}
}
true
})
.filter(|op| op.device_id != request.device_id) .cloned()
.collect();
pending_ops.sort_by_key(|op| op.logical_clock);
let has_more = pending_ops.len() > limit;
pending_ops.truncate(limit);
let mut cursors = request.cursors;
for op in &pending_ops {
cursors.insert(op.resource_type, op.id.clone());
}
Ok(SyncDelta {
operations: pending_ops,
cursors,
has_more,
server_time: Utc::now(),
})
}
pub async fn get_sync_state(&self, device_id: &str) -> Result<SyncState, String> {
let device = self.get_device(device_id).await?;
let operations = self.operations.read().map_err(|e| e.to_string())?;
let pending_count = operations
.iter()
.filter(|op| op.device_id != device_id && !op.acknowledged)
.count();
Ok(SyncState {
device_id: device_id.to_string(),
cursors: HashMap::new(), last_sync: device.last_sync.unwrap_or(device.registered_at),
pending_count,
status: if device.is_online {
SyncStatus::Idle
} else {
SyncStatus::Offline
},
})
}
fn detect_conflict(
&self,
op: &SyncOperation,
existing: &[SyncOperation],
) -> Option<SyncOperation> {
existing
.iter()
.filter(|e| {
e.resource_type == op.resource_type
&& e.resource_id == op.resource_id
&& e.device_id != op.device_id
&& e.timestamp > op.timestamp - chrono::Duration::seconds(5)
})
.last()
.cloned()
}
pub async fn list_conflicts(&self, user_id: &str) -> Result<Vec<SyncConflict>, String> {
let conflicts = self.conflicts.read().map_err(|e| e.to_string())?;
let result: Vec<_> = conflicts
.values()
.filter(|c| c.local_operation.user_id == user_id && !c.resolved)
.cloned()
.collect();
Ok(result)
}
pub async fn resolve_conflict(
&self,
conflict_id: &str,
resolution: ConflictResolution,
) -> Result<SyncConflict, String> {
let mut conflicts = self.conflicts.write().map_err(|e| e.to_string())?;
let conflict = conflicts
.get_mut(conflict_id)
.ok_or_else(|| format!("Conflict not found: {}", conflict_id))?;
conflict.resolution = Some(resolution);
conflict.resolved = true;
Ok(conflict.clone())
}
}
impl Default for CrossDeviceSyncService {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Deserialize)]
pub struct RegisterDeviceRequest {
pub user_id: String,
pub name: String,
pub device_type: DeviceType,
pub platform: DevicePlatform,
}
pub async fn register_device(
service: web::Data<CrossDeviceSyncService>,
body: web::Json<RegisterDeviceRequest>,
) -> HttpResponse {
let request = body.into_inner();
match service
.register_device(
&request.user_id,
&request.name,
request.device_type,
request.platform,
)
.await
{
Ok(device) => HttpResponse::Created().json(device),
Err(e) => HttpResponse::BadRequest().json(serde_json::json!({ "error": e })),
}
}
pub async fn get_device(
service: web::Data<CrossDeviceSyncService>,
path: web::Path<String>,
) -> HttpResponse {
match service.get_device(&path.into_inner()).await {
Ok(device) => HttpResponse::Ok().json(device),
Err(e) => HttpResponse::NotFound().json(serde_json::json!({ "error": e })),
}
}
#[derive(Debug, Deserialize)]
pub struct ListDevicesQuery {
pub user_id: String,
}
pub async fn list_devices(
service: web::Data<CrossDeviceSyncService>,
query: web::Query<ListDevicesQuery>,
) -> HttpResponse {
match service.list_devices(&query.user_id).await {
Ok(devices) => HttpResponse::Ok().json(devices),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e })),
}
}
#[derive(Debug, Deserialize)]
pub struct UpdateDeviceRequest {
pub name: Option<String>,
pub sync_config: Option<DeviceSyncConfig>,
pub push_token: Option<String>,
}
pub async fn update_device(
service: web::Data<CrossDeviceSyncService>,
path: web::Path<String>,
body: web::Json<UpdateDeviceRequest>,
) -> HttpResponse {
let request = body.into_inner();
match service
.update_device(
&path.into_inner(),
request.name,
request.sync_config,
request.push_token,
)
.await
{
Ok(device) => HttpResponse::Ok().json(device),
Err(e) => HttpResponse::BadRequest().json(serde_json::json!({ "error": e })),
}
}
pub async fn remove_device(
service: web::Data<CrossDeviceSyncService>,
path: web::Path<String>,
) -> HttpResponse {
match service.remove_device(&path.into_inner()).await {
Ok(()) => HttpResponse::NoContent().finish(),
Err(e) => HttpResponse::NotFound().json(serde_json::json!({ "error": e })),
}
}
pub async fn push_changes(
service: web::Data<CrossDeviceSyncService>,
body: web::Json<PushChangesRequest>,
) -> HttpResponse {
match service.push_changes(body.into_inner()).await {
Ok(response) => HttpResponse::Ok().json(response),
Err(e) => HttpResponse::BadRequest().json(serde_json::json!({ "error": e })),
}
}
pub async fn pull_changes(
service: web::Data<CrossDeviceSyncService>,
body: web::Json<PullChangesRequest>,
) -> HttpResponse {
match service.pull_changes(body.into_inner()).await {
Ok(delta) => HttpResponse::Ok().json(delta),
Err(e) => HttpResponse::BadRequest().json(serde_json::json!({ "error": e })),
}
}
pub async fn get_sync_state(
service: web::Data<CrossDeviceSyncService>,
path: web::Path<String>,
) -> HttpResponse {
match service.get_sync_state(&path.into_inner()).await {
Ok(state) => HttpResponse::Ok().json(state),
Err(e) => HttpResponse::NotFound().json(serde_json::json!({ "error": e })),
}
}
#[derive(Debug, Deserialize)]
pub struct ListConflictsQuery {
pub user_id: String,
}
pub async fn list_conflicts(
service: web::Data<CrossDeviceSyncService>,
query: web::Query<ListConflictsQuery>,
) -> HttpResponse {
match service.list_conflicts(&query.user_id).await {
Ok(conflicts) => HttpResponse::Ok().json(conflicts),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e })),
}
}
#[derive(Debug, Deserialize)]
pub struct ResolveConflictRequest {
pub resolution: ConflictResolution,
}
pub async fn resolve_conflict(
service: web::Data<CrossDeviceSyncService>,
path: web::Path<String>,
body: web::Json<ResolveConflictRequest>,
) -> HttpResponse {
match service
.resolve_conflict(&path.into_inner(), body.into_inner().resolution)
.await
{
Ok(conflict) => HttpResponse::Ok().json(conflict),
Err(e) => HttpResponse::BadRequest().json(serde_json::json!({ "error": e })),
}
}
pub fn configure_device_sync_routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/sync")
.route("/devices", web::post().to(register_device))
.route("/devices", web::get().to(list_devices))
.route("/devices/{id}", web::get().to(get_device))
.route("/devices/{id}", web::patch().to(update_device))
.route("/devices/{id}", web::delete().to(remove_device))
.route("/push", web::post().to(push_changes))
.route("/pull", web::post().to(pull_changes))
.route("/state/{device_id}", web::get().to(get_sync_state))
.route("/conflicts", web::get().to(list_conflicts))
.route("/conflicts/{id}/resolve", web::post().to(resolve_conflict)),
);
}