use std::{collections::HashSet, fs::OpenOptions, io::Write as _, path::Path, process::Command};
use allsource_core::embedded::{EmbeddedCore, IngestEvent, Query};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::domain::error::ChronError;
#[derive(Serialize, Deserialize)]
struct SyncEvent {
id: Uuid,
event_type: String,
entity_id: String,
tenant_id: String,
payload: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
metadata: Option<serde_json::Value>,
timestamp: chrono::DateTime<chrono::Utc>,
version: i64,
}
fn load_id_set(path: &Path) -> Result<HashSet<String>, ChronError> {
if !path.exists() {
return Ok(HashSet::new());
}
let content = std::fs::read_to_string(path)?;
serde_json::from_str(&content)
.map_err(|e| ChronError::Sync(format!("parse {}: {e}", path.display())))
}
fn save_id_set(path: &Path, ids: &HashSet<String>) -> Result<(), ChronError> {
let content =
serde_json::to_string(ids).map_err(|e| ChronError::Sync(format!("serialize: {e}")))?;
std::fs::write(path, content)?;
Ok(())
}
fn ensure_git_repo() -> Result<(), ChronError> {
let output = Command::new("git")
.args(["rev-parse", "--is-inside-work-tree"])
.output()
.map_err(|e| ChronError::Sync(format!("git not found: {e}")))?;
if !output.status.success() {
return Err(ChronError::Sync("not inside a git repository".into()));
}
Ok(())
}
fn git_pull() -> Result<(), ChronError> {
let output = Command::new("git").args(["pull", "--rebase"]).output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(ChronError::Sync(format!(
"git pull --rebase failed: {stderr}\nResolve manually, then re-run `cn sync --git`."
)));
}
Ok(())
}
fn git_add(path: &str) -> Result<(), ChronError> {
let output = Command::new("git").args(["add", path]).output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(ChronError::Sync(format!("git add failed: {stderr}")));
}
Ok(())
}
fn git_has_staged_changes(path: &str) -> Result<bool, ChronError> {
let output = Command::new("git")
.args(["diff", "--cached", "--quiet", path])
.output()?;
Ok(!output.status.success())
}
fn git_commit(message: &str) -> Result<(), ChronError> {
let output = Command::new("git")
.args(["commit", "-m", message])
.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(ChronError::Sync(format!("git commit failed: {stderr}")));
}
Ok(())
}
fn git_push() {
match Command::new("git").args(["push"]).output() {
Ok(o) if !o.status.success() => {
let stderr = String::from_utf8_lossy(&o.stderr);
eprintln!("Warning: git push failed: {stderr}");
eprintln!("Changes are committed locally. Push manually when ready.");
}
Err(e) => {
eprintln!("Warning: git push failed: {e}");
eprintln!("Changes are committed locally. Push manually when ready.");
}
_ => println!("Pushed to remote."),
}
}
pub async fn sync_git(core: &EmbeddedCore, workspace_root: &Path) -> Result<(), ChronError> {
let sync_dir = workspace_root.join(".chronis/sync");
let events_path = sync_dir.join("events.jsonl");
let remote_ids_path = sync_dir.join(".remote_ids");
let local_ids_path = sync_dir.join(".local_ids");
std::fs::create_dir_all(&sync_dir)?;
ensure_git_repo()?;
println!("Pulling remote changes...");
git_pull()?;
let pre_import = core
.query(Query::new())
.await
.map_err(|e| ChronError::Sync(format!("query: {e}")))?;
let pre_import_ids: HashSet<String> = pre_import.iter().map(|e| e.id.to_string()).collect();
let mut remote_ids = load_id_set(&remote_ids_path)?;
let mut import_count = 0u64;
if events_path.exists() {
let content = std::fs::read_to_string(&events_path)?;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
let ev: SyncEvent = serde_json::from_str(line)
.map_err(|e| ChronError::Sync(format!("parse JSONL: {e}")))?;
let jsonl_id = ev.id.to_string();
if remote_ids.contains(&jsonl_id) {
continue;
}
core.ingest(IngestEvent {
entity_id: &ev.entity_id,
event_type: &ev.event_type,
payload: ev.payload,
metadata: ev.metadata,
tenant_id: Some(&ev.tenant_id),
})
.await
.map_err(|e| ChronError::Sync(format!("ingest: {e}")))?;
remote_ids.insert(jsonl_id);
import_count += 1;
}
if import_count > 0 {
println!("Imported {import_count} remote events.");
}
}
let mut local_ids = load_id_set(&local_ids_path)?;
if import_count > 0 {
let post_import = core
.query(Query::new())
.await
.map_err(|e| ChronError::Sync(format!("query: {e}")))?;
for ev in &post_import {
let id = ev.id.to_string();
if !pre_import_ids.contains(&id) {
local_ids.insert(id);
}
}
}
let all_events = if import_count > 0 {
let mut evs = core
.query(Query::new())
.await
.map_err(|e| ChronError::Sync(format!("query: {e}")))?;
evs.sort_by_key(|e| e.timestamp);
evs
} else {
let mut evs = pre_import;
evs.sort_by_key(|e| e.timestamp);
evs
};
let mut new_lines = String::new();
let mut export_count = 0u64;
for ev in &all_events {
let core_id = ev.id.to_string();
if local_ids.contains(&core_id) {
continue;
}
let sync_ev = SyncEvent {
id: ev.id,
event_type: ev.event_type.clone(),
entity_id: ev.entity_id.clone(),
tenant_id: ev.tenant_id.clone(),
payload: ev.payload.clone(),
metadata: ev.metadata.clone(),
timestamp: ev.timestamp,
version: ev.version,
};
let line = serde_json::to_string(&sync_ev)
.map_err(|e| ChronError::Sync(format!("serialize: {e}")))?;
new_lines.push_str(&line);
new_lines.push('\n');
remote_ids.insert(ev.id.to_string());
local_ids.insert(core_id);
export_count += 1;
}
if !new_lines.is_empty() {
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(&events_path)?;
file.write_all(new_lines.as_bytes())?;
}
save_id_set(&remote_ids_path, &remote_ids)?;
save_id_set(&local_ids_path, &local_ids)?;
git_add(".chronis/")?;
if git_has_staged_changes(".chronis/")? {
let msg = if import_count > 0 && export_count > 0 {
format!("chronis: sync +{export_count} local, +{import_count} remote")
} else if export_count > 0 {
format!("chronis: sync +{export_count} events")
} else {
"chronis: sync".to_string()
};
git_commit(&msg)?;
println!("Committed sync changes.");
git_push();
} else {
println!("Nothing to sync — already up to date.");
}
Ok(())
}