use std::fs;
use std::io::Write;
use std::path::PathBuf;
use rusqlite::Connection;
use crate::auth::credentials::LoadedAuth;
use crate::paths;
use crate::remote::http::Client;
use crate::remote::machine::StoredMachine;
use crate::remote::sync_client::{SyncEvent, SyncRequest};
use crate::tracking;
const LOCK_STALE_SECS: u64 = 300;
fn lock_path() -> Option<PathBuf> {
paths::user_data_dir().map(|d| d.join("sync.lock"))
}
pub(crate) struct SyncLock {
path: PathBuf,
}
impl SyncLock {
pub(crate) fn acquire() -> Option<Self> {
let path = lock_path()?;
if let Some(parent) = path.parent() {
let _ = fs::create_dir_all(parent);
}
if let Ok(mut f) = fs::File::create_new(&path) {
let _ = write!(f, "{}", std::process::id());
return Some(Self { path });
}
let stale = fs::metadata(&path)
.and_then(|m| m.modified())
.ok()
.and_then(|mtime| mtime.elapsed().ok())
.is_some_and(|age| age.as_secs() > LOCK_STALE_SECS);
if !stale {
return None; }
let _ = fs::remove_file(&path);
let mut f = fs::File::create_new(&path).ok()?;
let _ = write!(f, "{}", std::process::id());
Some(Self { path })
}
}
impl Drop for SyncLock {
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
}
#[allow(clippy::cast_possible_wrap, clippy::cast_sign_loss)]
pub fn utc_now_iso8601() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let days = secs / 86400;
let time_of_day = secs % 86400;
let hours = time_of_day / 3600;
let minutes = (time_of_day % 3600) / 60;
let seconds = time_of_day % 60;
let z = days as i64 + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = (z - era * 146_097) as u64;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
format!("{y:04}-{m:02}-{d:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
}
pub struct SyncResult {
pub synced_count: usize,
pub cursor: i64,
}
fn to_sync_event(e: &tracking::SyncableEvent) -> SyncEvent {
SyncEvent {
id: e.id,
filter_name: e.filter_name.clone(),
filter_hash: e.filter_hash.clone(),
input_tokens: e.input_tokens_est,
output_tokens: e.output_tokens_est,
raw_tokens: Some(e.raw_tokens_est),
command_count: 1,
recorded_at: e.timestamp.clone(),
}
}
pub fn perform_sync(
auth: &LoadedAuth,
machine: &StoredMachine,
conn: &Connection,
) -> anyhow::Result<SyncResult> {
let _lock =
SyncLock::acquire().ok_or_else(|| anyhow::anyhow!("another sync is already running"))?;
let http_client = Client::new(&auth.server_url, Some(&auth.token))?;
let mut total_synced = 0usize;
let mut cursor = tracking::get_last_synced_id(conn)?;
loop {
let events = tracking::get_events_since(conn, cursor)?;
if events.is_empty() {
break;
}
let sync_events: Vec<SyncEvent> = events.iter().map(to_sync_event).collect();
let req = SyncRequest {
machine_id: machine.machine_id.clone(),
last_event_id: cursor,
events: sync_events,
};
let response = crate::remote::retry::with_retry("sync", || {
crate::remote::sync_client::sync_events(&http_client, &req)
})?;
total_synced += response.accepted;
let new_cursor = response.cursor;
if new_cursor <= cursor {
anyhow::bail!(
"sync stalled: server returned cursor {new_cursor} (was {cursor}). \
This may indicate a server issue — try again later."
);
}
cursor = new_cursor;
let tx = conn.unchecked_transaction()?;
tracking::set_last_synced_id(&tx, cursor)?;
tracking::set_last_synced_at(&tx, &utc_now_iso8601())?;
tx.commit()?;
if events.len() < 500 {
break;
}
}
Ok(SyncResult {
synced_count: total_synced,
cursor,
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use serial_test::serial;
use super::*;
use crate::tracking::SyncableEvent;
#[test]
#[serial]
fn sync_lock_acquire_and_release() {
let dir = tempfile::TempDir::new().unwrap();
let _guard = paths::HomeGuard::set(dir.path());
let lock = SyncLock::acquire();
assert!(lock.is_some(), "should acquire lock on fresh dir");
let lock_file = dir.path().join("sync.lock");
assert!(lock_file.exists(), "lock file should exist while held");
drop(lock);
assert!(!lock_file.exists(), "lock file should be removed on drop");
}
#[test]
#[serial]
fn sync_lock_prevents_double_acquire() {
let dir = tempfile::TempDir::new().unwrap();
let _guard = paths::HomeGuard::set(dir.path());
let lock1 = SyncLock::acquire();
assert!(lock1.is_some());
let lock2 = SyncLock::acquire();
assert!(
lock2.is_none(),
"second acquire should fail while first is held"
);
drop(lock1);
let lock3 = SyncLock::acquire();
assert!(
lock3.is_some(),
"should succeed after first lock is released"
);
}
#[test]
#[serial]
fn sync_lock_reclaims_stale_lock() {
use std::time::{Duration, SystemTime};
let dir = tempfile::TempDir::new().unwrap();
let _guard = paths::HomeGuard::set(dir.path());
let lock_file = dir.path().join("sync.lock");
fs::write(&lock_file, "99999999").unwrap();
let old_time = SystemTime::now() - Duration::from_secs(LOCK_STALE_SECS + 60);
filetime::set_file_mtime(&lock_file, filetime::FileTime::from_system_time(old_time))
.unwrap();
let lock = SyncLock::acquire();
assert!(lock.is_some(), "should reclaim stale lock");
}
#[test]
fn to_sync_event_maps_fields() {
let se = SyncableEvent {
id: 42,
filter_name: Some("git/push".to_string()),
filter_hash: Some("abc".to_string()),
input_tokens_est: 1000,
output_tokens_est: 200,
raw_tokens_est: 1000,
timestamp: "2026-01-01T00:00:00Z".to_string(),
};
let result = to_sync_event(&se);
assert_eq!(result.id, 42);
assert_eq!(result.filter_name.as_deref(), Some("git/push"));
assert_eq!(result.filter_hash.as_deref(), Some("abc"));
assert_eq!(result.input_tokens, 1000);
assert_eq!(result.output_tokens, 200);
assert_eq!(result.raw_tokens, Some(1000));
assert_eq!(result.command_count, 1);
assert_eq!(result.recorded_at, "2026-01-01T00:00:00Z");
}
#[test]
fn to_sync_event_handles_none_fields() {
let se = SyncableEvent {
id: 1,
filter_name: None,
filter_hash: None,
input_tokens_est: 500,
output_tokens_est: 500,
raw_tokens_est: 500,
timestamp: "2026-02-01T12:00:00Z".to_string(),
};
let result = to_sync_event(&se);
assert!(result.filter_name.is_none());
assert!(result.filter_hash.is_none());
}
#[test]
fn utc_now_iso8601_format() {
let ts = utc_now_iso8601();
assert!(
regex::Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$")
.unwrap()
.is_match(&ts),
"timestamp should be ISO 8601 UTC format, got: {ts}"
);
let year: u32 = ts[..4].parse().unwrap();
assert!(year >= 2020, "year should be >= 2020, got {year}");
}
#[test]
fn utc_now_iso8601_month_and_day_in_range() {
let ts = utc_now_iso8601();
let month: u32 = ts[5..7].parse().unwrap();
let day: u32 = ts[8..10].parse().unwrap();
assert!((1..=12).contains(&month), "month out of range: {month}");
assert!((1..=31).contains(&day), "day out of range: {day}");
}
}