#[cfg(feature = "portal")]
use chrono::{DateTime, Utc};
#[cfg(feature = "portal")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "portal")]
use sha2::{Digest, Sha256};
#[cfg(feature = "portal")]
use sqlx::{PgPool, Row};
#[cfg(feature = "portal")]
use std::collections::HashMap;
#[cfg(feature = "portal")]
use std::net::IpAddr;
#[cfg(feature = "portal")]
use uuid::Uuid;
#[cfg(feature = "portal")]
use crate::portal::sync::{ConflictResolution, SyncEvent};
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbDeviceSession {
pub id: Uuid,
pub user_id: Uuid,
pub device_id: String,
pub device_name: String,
pub platform: String,
pub user_agent: Option<String>,
pub ip_address: Option<IpAddr>,
pub connected: bool,
pub last_connected_at: Option<DateTime<Utc>>,
pub last_disconnected_at: Option<DateTime<Utc>>,
pub settings_version: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncState {
pub id: Uuid,
pub user_id: Uuid,
pub resource_type: String,
pub resource_key: String,
pub version: i64,
pub version_vector: HashMap<String, i64>,
pub value: serde_json::Value,
pub checksum: String,
pub last_modified_by: String,
pub last_modified_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncQueueItem {
pub id: Uuid,
pub user_id: Uuid,
pub target_device_id: String,
pub event_type: String,
pub payload: serde_json::Value,
pub retry_count: i32,
pub max_retries: i32,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
pub delivered_at: Option<DateTime<Utc>>,
pub failed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncConflict {
pub id: Uuid,
pub user_id: Uuid,
pub resource_type: String,
pub resource_key: String,
pub local_device_id: String,
pub remote_device_id: String,
pub local_value: serde_json::Value,
pub remote_value: serde_json::Value,
pub local_version: i64,
pub remote_version: i64,
pub local_timestamp: DateTime<Utc>,
pub remote_timestamp: DateTime<Utc>,
pub resolution_strategy: Option<String>,
pub resolved_value: Option<serde_json::Value>,
pub resolved_at: Option<DateTime<Utc>>,
pub resolved_by: Option<String>,
pub created_at: DateTime<Utc>,
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Deserialize)]
pub struct CreateSessionInput {
pub device_id: String,
pub device_name: String,
pub platform: String,
pub user_agent: Option<String>,
pub ip_address: Option<IpAddr>,
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Deserialize)]
pub struct UpdateSyncStateInput {
pub resource_type: String,
pub resource_key: String,
pub value: serde_json::Value,
pub device_id: String,
}
#[cfg(feature = "portal")]
pub struct DeviceSessionRepository<'a> {
pool: &'a PgPool,
}
#[cfg(feature = "portal")]
impl<'a> DeviceSessionRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn upsert(
&self,
user_id: Uuid,
input: &CreateSessionInput,
) -> Result<DbDeviceSession, sqlx::Error> {
let row = sqlx::query(
r#"
INSERT INTO device_sessions (user_id, device_id, device_name, platform, user_agent, ip_address, connected, last_connected_at)
VALUES ($1, $2, $3, $4, $5, $6, true, NOW())
ON CONFLICT (user_id, device_id) DO UPDATE SET
device_name = EXCLUDED.device_name,
platform = EXCLUDED.platform,
user_agent = EXCLUDED.user_agent,
ip_address = EXCLUDED.ip_address,
connected = true,
last_connected_at = NOW()
RETURNING id, user_id, device_id, device_name, platform, user_agent,
ip_address::TEXT, connected, last_connected_at, last_disconnected_at,
settings_version, created_at, updated_at
"#
)
.bind(user_id)
.bind(&input.device_id)
.bind(&input.device_name)
.bind(&input.platform)
.bind(&input.user_agent)
.bind(input.ip_address.map(|ip| ip.to_string()))
.fetch_one(self.pool)
.await?;
Ok(DbDeviceSession {
id: row.try_get("id")?,
user_id: row.try_get("user_id")?,
device_id: row.try_get("device_id")?,
device_name: row.try_get("device_name")?,
platform: row.try_get("platform")?,
user_agent: row.try_get("user_agent")?,
ip_address: row
.try_get::<Option<String>, _>("ip_address")?
.and_then(|s| s.parse().ok()),
connected: row.try_get("connected")?,
last_connected_at: row.try_get("last_connected_at")?,
last_disconnected_at: row.try_get("last_disconnected_at")?,
settings_version: row.try_get("settings_version")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
}
pub async fn disconnect(&self, user_id: Uuid, device_id: &str) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE device_sessions
SET connected = false, last_disconnected_at = NOW()
WHERE user_id = $1 AND device_id = $2
"#,
)
.bind(user_id)
.bind(device_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn get_connected(&self, user_id: Uuid) -> Result<Vec<DbDeviceSession>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT id, user_id, device_id, device_name, platform, user_agent,
ip_address::TEXT, connected, last_connected_at, last_disconnected_at,
settings_version, created_at, updated_at
FROM device_sessions
WHERE user_id = $1 AND connected = true
ORDER BY last_connected_at DESC
"#,
)
.bind(user_id)
.fetch_all(self.pool)
.await?;
rows.into_iter()
.map(|row| {
use sqlx::Row;
Ok(DbDeviceSession {
id: row.try_get("id")?,
user_id: row.try_get("user_id")?,
device_id: row.try_get("device_id")?,
device_name: row.try_get("device_name")?,
platform: row.try_get("platform")?,
user_agent: row.try_get("user_agent")?,
ip_address: row
.try_get::<Option<String>, _>("ip_address")?
.and_then(|s| s.parse().ok()),
connected: row.try_get("connected")?,
last_connected_at: row.try_get("last_connected_at")?,
last_disconnected_at: row.try_get("last_disconnected_at")?,
settings_version: row.try_get("settings_version")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
})
.collect()
}
pub async fn update_version(
&self,
user_id: Uuid,
device_id: &str,
version: i64,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE device_sessions
SET settings_version = $3
WHERE user_id = $1 AND device_id = $2
"#,
)
.bind(user_id)
.bind(device_id)
.bind(version)
.execute(self.pool)
.await?;
Ok(())
}
}
#[cfg(feature = "portal")]
pub struct SyncStateRepository<'a> {
pool: &'a PgPool,
}
#[cfg(feature = "portal")]
impl<'a> SyncStateRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
fn compute_checksum(value: &serde_json::Value) -> String {
let json_str = serde_json::to_string(value).unwrap_or_default();
let mut hasher = Sha256::new();
hasher.update(json_str.as_bytes());
format!("{:x}", hasher.finalize())
}
pub async fn get(
&self,
user_id: Uuid,
resource_type: &str,
resource_key: &str,
) -> Result<Option<SyncState>, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT id, user_id, resource_type, resource_key, version, version_vector,
value, checksum, last_modified_by, last_modified_at, created_at
FROM sync_state
WHERE user_id = $1 AND resource_type = $2 AND resource_key = $3
"#,
)
.bind(user_id)
.bind(resource_type)
.bind(resource_key)
.fetch_optional(self.pool)
.await?;
match row {
Some(row) => {
use sqlx::Row;
let version_vector_json: serde_json::Value = row.try_get("version_vector")?;
let version_vector: HashMap<String, i64> =
serde_json::from_value(version_vector_json).unwrap_or_default();
Ok(Some(SyncState {
id: row.try_get("id")?,
user_id: row.try_get("user_id")?,
resource_type: row.try_get("resource_type")?,
resource_key: row.try_get("resource_key")?,
version: row.try_get("version")?,
version_vector,
value: row.try_get("value")?,
checksum: row.try_get("checksum")?,
last_modified_by: row.try_get("last_modified_by")?,
last_modified_at: row.try_get("last_modified_at")?,
created_at: row.try_get("created_at")?,
}))
}
None => Ok(None),
}
}
pub async fn update(
&self,
user_id: Uuid,
input: &UpdateSyncStateInput,
expected_version: Option<i64>,
) -> Result<SyncState, SyncStateError> {
let checksum = Self::compute_checksum(&input.value);
let mut tx = self.pool.begin().await.map_err(SyncStateError::Database)?;
let current = sqlx::query(
r#"
SELECT version, version_vector, value, last_modified_by, last_modified_at
FROM sync_state
WHERE user_id = $1 AND resource_type = $2 AND resource_key = $3
FOR UPDATE
"#,
)
.bind(user_id)
.bind(&input.resource_type)
.bind(&input.resource_key)
.fetch_optional(&mut *tx)
.await
.map_err(SyncStateError::Database)?;
let (new_version, new_version_vector) = if let Some(row) = current {
use sqlx::Row;
let current_version: i64 = row.try_get("version").unwrap_or(0);
if let Some(expected) = expected_version {
if current_version != expected {
let current_value: serde_json::Value = row.try_get("value").unwrap_or_default();
let last_modified_by: String =
row.try_get("last_modified_by").unwrap_or_default();
let last_modified_at: DateTime<Utc> = row
.try_get("last_modified_at")
.unwrap_or_else(|_| Utc::now());
return Err(SyncStateError::Conflict {
current_version,
expected_version: expected,
current_value,
last_modified_by,
last_modified_at,
});
}
}
let version_vector_json: serde_json::Value =
row.try_get("version_vector").unwrap_or_default();
let mut version_vector: HashMap<String, i64> =
serde_json::from_value(version_vector_json).unwrap_or_default();
let device_version = version_vector.get(&input.device_id).copied().unwrap_or(0) + 1;
version_vector.insert(input.device_id.clone(), device_version);
(current_version + 1, version_vector)
} else {
let mut version_vector = HashMap::new();
version_vector.insert(input.device_id.clone(), 1);
(1, version_vector)
};
let version_vector_json = serde_json::to_value(&new_version_vector).unwrap_or_default();
let row = sqlx::query(
r#"
INSERT INTO sync_state (user_id, resource_type, resource_key, version, version_vector, value, checksum, last_modified_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (user_id, resource_type, resource_key) DO UPDATE SET
version = EXCLUDED.version,
version_vector = EXCLUDED.version_vector,
value = EXCLUDED.value,
checksum = EXCLUDED.checksum,
last_modified_by = EXCLUDED.last_modified_by,
last_modified_at = NOW()
RETURNING id, user_id, resource_type, resource_key, version, version_vector,
value, checksum, last_modified_by, last_modified_at, created_at
"#
)
.bind(user_id)
.bind(&input.resource_type)
.bind(&input.resource_key)
.bind(new_version)
.bind(&version_vector_json)
.bind(&input.value)
.bind(&checksum)
.bind(&input.device_id)
.fetch_one(&mut *tx)
.await
.map_err(SyncStateError::Database)?;
tx.commit().await.map_err(SyncStateError::Database)?;
use sqlx::Row;
Ok(SyncState {
id: row.try_get("id").unwrap_or_default(),
user_id: row.try_get("user_id").unwrap_or_default(),
resource_type: row.try_get("resource_type").unwrap_or_default(),
resource_key: row.try_get("resource_key").unwrap_or_default(),
version: row.try_get("version").unwrap_or(0),
version_vector: new_version_vector,
value: row.try_get("value").unwrap_or_default(),
checksum: row.try_get("checksum").unwrap_or_default(),
last_modified_by: row.try_get("last_modified_by").unwrap_or_default(),
last_modified_at: row
.try_get("last_modified_at")
.unwrap_or_else(|_| Utc::now()),
created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
})
}
pub async fn get_all_for_user(&self, user_id: Uuid) -> Result<Vec<SyncState>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT id, user_id, resource_type, resource_key, version, version_vector,
value, checksum, last_modified_by, last_modified_at, created_at
FROM sync_state
WHERE user_id = $1
ORDER BY resource_type, resource_key
"#,
)
.bind(user_id)
.fetch_all(self.pool)
.await?;
rows.into_iter()
.map(|row| {
use sqlx::Row;
let version_vector_json: serde_json::Value =
row.try_get("version_vector").unwrap_or_default();
let version_vector: HashMap<String, i64> =
serde_json::from_value(version_vector_json).unwrap_or_default();
Ok(SyncState {
id: row.try_get("id")?,
user_id: row.try_get("user_id")?,
resource_type: row.try_get("resource_type")?,
resource_key: row.try_get("resource_key")?,
version: row.try_get("version")?,
version_vector,
value: row.try_get("value")?,
checksum: row.try_get("checksum")?,
last_modified_by: row.try_get("last_modified_by")?,
last_modified_at: row.try_get("last_modified_at")?,
created_at: row.try_get("created_at")?,
})
})
.collect()
}
}
#[cfg(feature = "portal")]
#[derive(Debug, thiserror::Error)]
pub enum SyncStateError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Version conflict: expected {expected_version}, found {current_version}")]
Conflict {
current_version: i64,
expected_version: i64,
current_value: serde_json::Value,
last_modified_by: String,
last_modified_at: DateTime<Utc>,
},
}
#[cfg(feature = "portal")]
pub struct SyncQueueRepository<'a> {
pool: &'a PgPool,
}
#[cfg(feature = "portal")]
impl<'a> SyncQueueRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn enqueue(
&self,
user_id: Uuid,
target_device_id: &str,
event: &SyncEvent,
) -> Result<Uuid, sqlx::Error> {
let event_type = match event {
SyncEvent::SettingsChanged { .. } => "settings_changed",
SyncEvent::FullSyncRequested { .. } => "full_sync_requested",
SyncEvent::ConflictDetected { .. } => "conflict_detected",
SyncEvent::SessionInvalidated { .. } => "session_invalidated",
SyncEvent::ApiKeyRotated { .. } => "api_key_rotated",
};
let payload = serde_json::to_value(event).unwrap_or_default();
let row = sqlx::query(
r#"
INSERT INTO sync_queue (user_id, target_device_id, event_type, payload)
VALUES ($1, $2, $3, $4)
RETURNING id
"#,
)
.bind(user_id)
.bind(target_device_id)
.bind(event_type)
.bind(&payload)
.fetch_one(self.pool)
.await?;
use sqlx::Row;
Ok(row.try_get("id")?)
}
pub async fn get_pending(
&self,
target_device_id: &str,
limit: i64,
) -> Result<Vec<SyncQueueItem>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT id, user_id, target_device_id, event_type, payload, retry_count, max_retries,
created_at, expires_at, delivered_at, failed_at, error_message
FROM sync_queue
WHERE target_device_id = $1 AND delivered_at IS NULL AND (failed_at IS NULL OR retry_count < max_retries)
AND expires_at > NOW()
ORDER BY created_at ASC
LIMIT $2
"#
)
.bind(target_device_id)
.bind(limit)
.fetch_all(self.pool)
.await?;
rows.into_iter()
.map(|row| {
use sqlx::Row;
Ok(SyncQueueItem {
id: row.try_get("id")?,
user_id: row.try_get("user_id")?,
target_device_id: row.try_get("target_device_id")?,
event_type: row.try_get("event_type")?,
payload: row.try_get("payload")?,
retry_count: row.try_get("retry_count")?,
max_retries: row.try_get("max_retries")?,
created_at: row.try_get("created_at")?,
expires_at: row.try_get("expires_at")?,
delivered_at: row.try_get("delivered_at")?,
failed_at: row.try_get("failed_at")?,
error_message: row.try_get("error_message")?,
})
})
.collect()
}
pub async fn mark_delivered(&self, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE sync_queue
SET delivered_at = NOW()
WHERE id = $1
"#,
)
.bind(id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE sync_queue
SET failed_at = NOW(), retry_count = retry_count + 1, error_message = $2
WHERE id = $1
"#,
)
.bind(id)
.bind(error)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM sync_queue
WHERE expires_at < NOW() OR delivered_at IS NOT NULL
"#,
)
.execute(self.pool)
.await?;
Ok(result.rows_affected())
}
}
#[cfg(feature = "portal")]
pub struct SyncConflictRepository<'a> {
pool: &'a PgPool,
}
#[cfg(feature = "portal")]
impl<'a> SyncConflictRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn create(
&self,
user_id: Uuid,
resource_type: &str,
resource_key: &str,
local_device_id: &str,
remote_device_id: &str,
local_value: &serde_json::Value,
remote_value: &serde_json::Value,
local_version: i64,
remote_version: i64,
local_timestamp: DateTime<Utc>,
remote_timestamp: DateTime<Utc>,
) -> Result<SyncConflict, sqlx::Error> {
let row = sqlx::query(
r#"
INSERT INTO sync_conflicts (
user_id, resource_type, resource_key, local_device_id, remote_device_id,
local_value, remote_value, local_version, remote_version,
local_timestamp, remote_timestamp
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING *
"#,
)
.bind(user_id)
.bind(resource_type)
.bind(resource_key)
.bind(local_device_id)
.bind(remote_device_id)
.bind(local_value)
.bind(remote_value)
.bind(local_version)
.bind(remote_version)
.bind(local_timestamp)
.bind(remote_timestamp)
.fetch_one(self.pool)
.await?;
use sqlx::Row;
Ok(SyncConflict {
id: row.try_get("id")?,
user_id: row.try_get("user_id")?,
resource_type: row.try_get("resource_type")?,
resource_key: row.try_get("resource_key")?,
local_device_id: row.try_get("local_device_id")?,
remote_device_id: row.try_get("remote_device_id")?,
local_value: row.try_get("local_value")?,
remote_value: row.try_get("remote_value")?,
local_version: row.try_get("local_version")?,
remote_version: row.try_get("remote_version")?,
local_timestamp: row.try_get("local_timestamp")?,
remote_timestamp: row.try_get("remote_timestamp")?,
resolution_strategy: row.try_get("resolution_strategy")?,
resolved_value: row.try_get("resolved_value")?,
resolved_at: row.try_get("resolved_at")?,
resolved_by: row.try_get("resolved_by")?,
created_at: row.try_get("created_at")?,
})
}
pub async fn get_unresolved(&self, user_id: Uuid) -> Result<Vec<SyncConflict>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT * FROM sync_conflicts
WHERE user_id = $1 AND resolved_at IS NULL
ORDER BY created_at DESC
"#,
)
.bind(user_id)
.fetch_all(self.pool)
.await?;
rows.into_iter()
.map(|row| {
use sqlx::Row;
Ok(SyncConflict {
id: row.try_get("id")?,
user_id: row.try_get("user_id")?,
resource_type: row.try_get("resource_type")?,
resource_key: row.try_get("resource_key")?,
local_device_id: row.try_get("local_device_id")?,
remote_device_id: row.try_get("remote_device_id")?,
local_value: row.try_get("local_value")?,
remote_value: row.try_get("remote_value")?,
local_version: row.try_get("local_version")?,
remote_version: row.try_get("remote_version")?,
local_timestamp: row.try_get("local_timestamp")?,
remote_timestamp: row.try_get("remote_timestamp")?,
resolution_strategy: row.try_get("resolution_strategy")?,
resolved_value: row.try_get("resolved_value")?,
resolved_at: row.try_get("resolved_at")?,
resolved_by: row.try_get("resolved_by")?,
created_at: row.try_get("created_at")?,
})
})
.collect()
}
pub async fn resolve(
&self,
conflict_id: Uuid,
strategy: ConflictResolution,
resolved_value: &serde_json::Value,
resolved_by: &str,
) -> Result<(), sqlx::Error> {
let strategy_str = match strategy {
ConflictResolution::UseLocal => "use_local",
ConflictResolution::UseRemote => "use_remote",
ConflictResolution::UseNewest => "use_newest",
ConflictResolution::Merge => "merge",
ConflictResolution::Manual => "manual",
};
sqlx::query(
r#"
UPDATE sync_conflicts
SET resolution_strategy = $2, resolved_value = $3, resolved_at = NOW(), resolved_by = $4
WHERE id = $1
"#,
)
.bind(conflict_id)
.bind(strategy_str)
.bind(resolved_value)
.bind(resolved_by)
.execute(self.pool)
.await?;
Ok(())
}
}