use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use tracing::info;
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncState {
pub version: u32,
pub sources: HashMap<String, SourceState>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SourceState {
#[serde(default)]
pub last_sync_at: Option<DateTime<Utc>>,
#[serde(default)]
pub sync_count: u64,
#[serde(default)]
pub subsources: HashMap<String, SubsourceState>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SubsourceState {
pub last_cursor: Option<DateTime<Utc>>,
pub last_sync_at: Option<DateTime<Utc>>,
#[serde(default)]
pub documents_synced: u64,
#[serde(default)]
pub last_sample_id: Option<String>,
}
#[derive(Debug, Deserialize)]
struct V1State {
#[serde(default)]
#[allow(dead_code)]
version: u32,
#[serde(default)]
sources: HashMap<String, V1SourceState>,
}
#[derive(Debug, Deserialize)]
struct V1SourceState {
last_cursor: Option<DateTime<Utc>>,
last_sync_at: Option<DateTime<Utc>>,
#[serde(default)]
documents_synced: u64,
#[serde(default)]
sync_count: u64,
}
impl Default for SyncState {
fn default() -> Self {
Self {
version: 2,
sources: HashMap::new(),
}
}
}
impl SyncState {
pub fn load(path: &Path, subsources_by_source: &[(String, Vec<String>)]) -> Result<Self> {
if !path.exists() {
return Ok(Self::default());
}
let data = std::fs::read_to_string(path).context("failed to read sync state file")?;
let peek: serde_json::Value =
serde_json::from_str(&data).context("failed to parse sync state file")?;
let version = peek.get("version").and_then(|v| v.as_u64()).unwrap_or(2);
if version == 1 {
let v1: V1State = serde_json::from_value(peek)?;
let migrated = migrate_v1_to_v2(v1, subsources_by_source);
info!(
path = %path.display(),
"Migrated sync state from v1 to v2",
);
Ok(migrated)
} else {
let v2: SyncState =
serde_json::from_str(&data).context("failed to parse sync state file (v2)")?;
Ok(v2)
}
}
pub fn save(&self, path: &Path) -> Result<()> {
let data = serde_json::to_string_pretty(self).context("failed to serialize sync state")?;
let tmp_path = path.with_extension("tmp");
std::fs::write(&tmp_path, &data).context("failed to write sync state temp file")?;
std::fs::rename(&tmp_path, path).context("failed to rename sync state file")?;
Ok(())
}
pub fn get_source(&self, name: &str) -> SourceState {
self.sources.get(name).cloned().unwrap_or_default()
}
pub fn update_subsource(
&mut self,
source: &str,
subsource: &str,
cursor: DateTime<Utc>,
docs_synced: u64,
last_sample_id: Option<String>,
) {
let src = self.sources.entry(source.to_string()).or_default();
src.last_sync_at = Some(Utc::now());
let sub = src.subsources.entry(subsource.to_string()).or_default();
sub.last_cursor = Some(cursor);
sub.last_sync_at = Some(Utc::now());
sub.documents_synced += docs_synced;
if last_sample_id.is_some() {
sub.last_sample_id = last_sample_id;
}
}
pub fn complete_source_cycle(&mut self, source: &str) {
let src = self.sources.entry(source.to_string()).or_default();
src.sync_count += 1;
src.last_sync_at = Some(Utc::now());
}
pub fn reset_source(&mut self, name: &str, subsource: Option<&str>) {
match subsource {
Some(key) => {
if let Some(src) = self.sources.get_mut(name) {
src.subsources.remove(key);
}
}
None => {
self.sources.remove(name);
}
}
}
pub fn reset_all(&mut self) {
self.sources.clear();
}
}
fn migrate_v1_to_v2(v1: V1State, subsources_by_source: &[(String, Vec<String>)]) -> SyncState {
let mut out = SyncState::default();
for (name, old) in v1.sources {
let mut src = SourceState {
last_sync_at: old.last_sync_at,
sync_count: old.sync_count,
subsources: HashMap::new(),
};
let subs = subsources_by_source
.iter()
.find_map(|(n, ss)| if n == &name { Some(ss.clone()) } else { None })
.unwrap_or_else(|| vec!["_".to_string()]);
for sub_key in subs {
src.subsources.insert(
sub_key,
SubsourceState {
last_cursor: old.last_cursor,
last_sync_at: old.last_sync_at,
documents_synced: old.documents_synced,
last_sample_id: None,
},
);
}
out.sources.insert(name, src);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn default_state_is_v2() {
let state = SyncState::default();
assert_eq!(state.version, 2);
assert!(state.sources.is_empty());
}
#[test]
fn load_returns_default_if_file_missing() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("nonexistent.json");
let state = SyncState::load(&path, &[]).unwrap();
assert!(state.sources.is_empty());
}
#[test]
fn update_subsource_accumulates() {
let mut state = SyncState::default();
let t = Utc::now();
state.update_subsource("my-jira", "DO", t, 5, Some("DO-1".into()));
state.update_subsource("my-jira", "DO", t, 7, Some("DO-9".into()));
let s = state.get_source("my-jira");
let sub = s.subsources.get("DO").unwrap();
assert_eq!(sub.documents_synced, 12);
assert_eq!(sub.last_sample_id.as_deref(), Some("DO-9"));
}
#[test]
fn complete_source_cycle_increments_sync_count_once() {
let mut state = SyncState::default();
state.update_subsource("s", "A", Utc::now(), 5, None);
state.update_subsource("s", "A", Utc::now(), 7, None);
state.complete_source_cycle("s");
assert_eq!(state.get_source("s").sync_count, 1);
state.complete_source_cycle("s");
assert_eq!(state.get_source("s").sync_count, 2);
}
#[test]
fn migrates_v1_to_v2_copies_cursor_to_all_subsources() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("state.json");
let v1_json = r#"{
"version": 1,
"sources": {
"my-jira": {
"last_cursor": "2026-01-15T10:00:00Z",
"last_sync_at": "2026-01-15T10:01:00Z",
"documents_synced": 42,
"sync_count": 3
}
}
}"#;
std::fs::write(&path, v1_json).unwrap();
let expected: Vec<(String, Vec<String>)> = vec![(
"my-jira".to_string(),
vec!["DO".to_string(), "HR".to_string()],
)];
let state = SyncState::load(&path, &expected).unwrap();
assert_eq!(state.version, 2);
let s = state.get_source("my-jira");
assert_eq!(s.sync_count, 3);
let do_sub = s.subsources.get("DO").unwrap();
let hr_sub = s.subsources.get("HR").unwrap();
assert!(do_sub.last_cursor.is_some());
assert!(hr_sub.last_cursor.is_some());
assert_eq!(do_sub.last_cursor, hr_sub.last_cursor);
}
#[test]
fn save_then_load_v2_roundtrip() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("state.json");
let mut state = SyncState::default();
let t = Utc::now();
state.update_subsource("my-jira", "DO", t, 3, Some("DO-5".into()));
state.save(&path).unwrap();
let loaded = SyncState::load(&path, &[]).unwrap();
assert_eq!(loaded.version, 2);
let sub = loaded
.get_source("my-jira")
.subsources
.get("DO")
.cloned()
.unwrap();
assert_eq!(sub.documents_synced, 3);
assert_eq!(sub.last_sample_id.as_deref(), Some("DO-5"));
}
#[test]
fn reset_source_clears_all_subsources() {
let mut state = SyncState::default();
state.update_subsource("s", "A", Utc::now(), 1, None);
state.update_subsource("s", "B", Utc::now(), 1, None);
state.reset_source("s", None);
assert!(state.get_source("s").subsources.is_empty());
}
#[test]
fn reset_source_single_subsource() {
let mut state = SyncState::default();
state.update_subsource("s", "A", Utc::now(), 1, None);
state.update_subsource("s", "B", Utc::now(), 1, None);
state.reset_source("s", Some("A"));
let src = state.get_source("s");
assert!(!src.subsources.contains_key("A"));
assert!(src.subsources.contains_key("B"));
}
}