use std::path::Path;
use car_sync::{
check_dispatch, frontier_of, system_clock, FsRelay, InMemoryLeaseCoordinator, Intent,
IntentStatus, LeaseCoordinator, Relay, RelayConfig, Scope, Surface, SyncSession, Turn,
WallClock,
};
use serde_json::{json, Value};
pub struct SyncSubsystem {
device_id: String,
session: SyncSession,
relay: Box<dyn Relay + Send>,
coordinator: Box<dyn LeaseCoordinator + Send>,
}
impl std::fmt::Debug for SyncSubsystem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SyncSubsystem")
.field("device_id", &self.device_id)
.finish_non_exhaustive()
}
}
impl SyncSubsystem {
pub fn open(root: &Path) -> Result<Self, String> {
std::fs::create_dir_all(root).map_err(|e| format!("sync: create {}: {e}", root.display()))?;
let device_id = load_or_mint_device_id(root)?;
let device_dir = root.join(&device_id);
let relay_dir = root.join("relay");
let coordinator = Box::new(InMemoryLeaseCoordinator::new(system_clock()));
Self::open_with(device_id, &device_dir, &relay_dir, coordinator, system_clock())
}
pub fn open_with(
device_id: String,
device_dir: &Path,
relay_dir: &Path,
coordinator: Box<dyn LeaseCoordinator + Send>,
wall: WallClock,
) -> Result<Self, String> {
std::fs::create_dir_all(device_dir)
.map_err(|e| format!("sync: create {}: {e}", device_dir.display()))?;
let journal_path = device_dir.join("oplog.jsonl");
let checkpoint_dir = device_dir.join("checkpoints");
let session = SyncSession::open(device_id.clone(), &journal_path, &checkpoint_dir, wall.clone())
.map_err(|e| format!("sync: open session: {e}"))?;
let relay = FsRelay::open(relay_dir, RelayConfig::default(), wall)
.map_err(|e| format!("sync: open relay {}: {e}", relay_dir.display()))?;
Ok(Self { device_id, session, relay: Box::new(relay), coordinator })
}
pub fn device_id(&self) -> &str {
&self.device_id
}
pub fn status(&mut self) -> Result<Value, String> {
let stable = self
.relay
.stable_frontier()
.map_err(|e| format!("sync: stable_frontier: {e}"))?;
let roster = self.relay.roster().map_err(|e| format!("sync: roster: {e}"))?;
Ok(json!({
"device_id": self.device_id,
"state_hash": self.session.state_hash(),
"journal_frontier": frontier_of(self.session.ops()),
"stable_frontier": stable,
"base_checkpoint": self.session.base().map(|c| c.checkpoint_hash.clone()),
"roster": roster,
}))
}
pub fn append(&mut self, scope: Scope, surface: Surface, payload: Value) -> Result<Value, String> {
let op = self
.session
.append(scope, surface, payload)
.map_err(|e| format!("sync: append: {e}"))?;
Ok(json!({ "op_id": op.op_id, "seq": op.seq, "hlc": op.hlc }))
}
pub fn record_turn(
&mut self,
scope: Scope,
conversation_id: &str,
role: &str,
content: &str,
tool_calls: Vec<Value>,
tool_use_id: Option<&str>,
timestamp: u64,
) -> Result<Value, String> {
let payload = match role {
"assistant" => Turn::assistant_payload(conversation_id, content, tool_calls, timestamp),
"tool" | "tool_result" => Turn::tool_payload(
conversation_id,
tool_use_id.unwrap_or_default(),
content,
timestamp,
),
_ => Turn::user_payload(conversation_id, content, timestamp),
};
self.append(scope, Surface::Conversation, payload)
}
pub fn record_intent(&mut self, scope: Scope, intent: &Intent) -> Result<Value, String> {
let recorded = self
.session
.record_intent(scope, intent)
.map_err(|e| format!("sync: record_intent: {e}"))?;
Ok(match recorded {
Some(op) => json!({ "recorded": true, "op_id": op.op_id }),
None => json!({ "recorded": false, "reason": "run already committed (terminal guard)" }),
})
}
pub fn pump(&mut self) -> Result<Value, String> {
let report = self
.session
.pump(self.relay.as_mut())
.map_err(|e| format!("sync: pump: {e}"))?;
Ok(json!({
"pushed": report.pushed,
"push_deduped": report.push_deduped,
"folded": report.folded,
"acked": report.acked,
"state_hash": self.session.state_hash(),
}))
}
pub fn checkpoint(&mut self) -> Result<Value, String> {
let published = self
.session
.publish_checkpoint(self.relay.as_mut())
.map_err(|e| format!("sync: publish_checkpoint: {e}"))?;
Ok(match published {
Some(c) => json!({ "published": true, "checkpoint_hash": c.checkpoint_hash }),
None => json!({ "published": false }),
})
}
pub fn rebase(&mut self) -> Result<Value, String> {
let rebased = self
.session
.rebase(self.relay.as_mut())
.map_err(|e| format!("sync: rebase: {e}"))?;
Ok(json!({
"rebased": rebased,
"base_checkpoint": self.session.base().map(|c| c.checkpoint_hash.clone()),
}))
}
pub fn transcript(&self, conversation_id: &str) -> Value {
json!(self.session.state().transcript(conversation_id))
}
pub fn resume(&self, conversation_id: &str) -> Result<Value, String> {
serde_json::to_value(self.session.state().resume_messages(conversation_id))
.map_err(|e| format!("sync: serialize resume messages: {e}"))
}
pub fn fence_check(&mut self, agent_id: &str, run_id: &str, epoch: u64) -> Result<Value, String> {
let state = self.session.state();
let decision = check_dispatch(
self.coordinator.as_mut(),
&state,
agent_id,
run_id,
&self.device_id,
epoch,
)
.map_err(|e| format!("sync: fence_check: {e}"))?;
let may = decision.may_dispatch();
Ok(json!({ "decision": decision, "may_dispatch": may }))
}
pub fn lease_acquire(&mut self, agent_id: &str, ttl_ms: u64) -> Result<Value, String> {
let lease = self
.coordinator
.acquire(agent_id, &self.device_id, ttl_ms)
.map_err(|e| format!("lease: acquire: {e}"))?;
serde_json::to_value(lease).map_err(|e| e.to_string())
}
pub fn lease_renew(&mut self, agent_id: &str, epoch: u64, ttl_ms: u64) -> Result<Value, String> {
let lease = self
.coordinator
.renew(agent_id, &self.device_id, epoch, ttl_ms)
.map_err(|e| format!("lease: renew: {e}"))?;
serde_json::to_value(lease).map_err(|e| e.to_string())
}
pub fn lease_release(&mut self, agent_id: &str, epoch: u64) -> Result<Value, String> {
self.coordinator
.release(agent_id, &self.device_id, epoch)
.map_err(|e| format!("lease: release: {e}"))?;
Ok(json!({ "released": true }))
}
pub fn lease_status(&mut self, agent_id: &str) -> Result<Value, String> {
let current = self
.coordinator
.current(agent_id)
.map_err(|e| format!("lease: status: {e}"))?;
Ok(json!({ "lease": current }))
}
}
fn load_or_mint_device_id(root: &Path) -> Result<String, String> {
let path = root.join("device-id");
if path.exists() {
let id = std::fs::read_to_string(&path)
.map_err(|e| format!("sync: read device-id: {e}"))?
.trim()
.to_string();
if !id.is_empty() {
return Ok(id);
}
}
let id = format!("device-{}", uuid::Uuid::new_v4());
std::fs::write(&path, &id).map_err(|e| format!("sync: write device-id: {e}"))?;
Ok(id)
}
pub fn parse_scope(params: &Value) -> Scope {
if let Some(org) = params
.get("scope")
.and_then(|s| s.get("org"))
.or_else(|| params.get("org"))
.and_then(Value::as_str)
{
return Scope::Shared { org: org.to_string() };
}
Scope::Personal
}
pub fn parse_surface(s: &str) -> Result<Surface, String> {
Ok(match s {
"routing" => Surface::Routing,
"declagent" => Surface::Declagent,
"conversation" => Surface::Conversation,
"knowledge" => Surface::Knowledge,
"skill" => Surface::Skill,
"trajectory" => Surface::Trajectory,
"run" => Surface::Run,
"intent" => Surface::Intent,
other => {
if let Some(kind) = other.strip_prefix("registry:") {
Surface::Registry { kind: kind.to_string() }
} else {
return Err(format!("unknown sync surface '{other}'"));
}
}
})
}
pub fn parse_intent_status(s: &str) -> Result<IntentStatus, String> {
Ok(match s {
"pending" => IntentStatus::Pending,
"committed" => IntentStatus::Committed,
"failed" => IntentStatus::Failed,
other => return Err(format!("unknown intent status '{other}'")),
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
fn manual_clock() -> (Arc<AtomicU64>, WallClock) {
let t = Arc::new(AtomicU64::new(0));
let reader = t.clone();
(t, Arc::new(move || reader.load(Ordering::SeqCst)))
}
fn two_devices(
relay_dir: &Path,
a_dir: &Path,
b_dir: &Path,
) -> (SyncSubsystem, SyncSubsystem) {
let coord = InMemoryLeaseCoordinator::new({
let (_t, w) = manual_clock();
w
});
let (_ta, wa) = manual_clock();
let (_tb, wb) = manual_clock();
let a = SyncSubsystem::open_with(
"mac-a".into(),
a_dir,
relay_dir,
Box::new(coord.clone()),
wa,
)
.unwrap();
let b = SyncSubsystem::open_with(
"mac-b".into(),
b_dir,
relay_dir,
Box::new(coord),
wb,
)
.unwrap();
(a, b)
}
#[test]
fn two_devices_converge_a_conversation_through_the_shared_relay() {
let tmp = tempfile::tempdir().unwrap();
let relay = tmp.path().join("relay");
let (mut a, mut b) =
two_devices(&relay, &tmp.path().join("a"), &tmp.path().join("b"));
a.record_turn(Scope::Personal, "c1", "user", "hello", vec![], None, 1).unwrap();
a.append(Scope::Personal, Surface::Knowledge, json!({"id": "f1", "body": "sky is blue"}))
.unwrap();
a.pump().unwrap();
b.pump().unwrap();
b.record_turn(Scope::Personal, "c1", "assistant", "hi there", vec![], None, 2).unwrap();
b.pump().unwrap();
a.pump().unwrap();
let sa = a.status().unwrap();
let sb = b.status().unwrap();
assert_eq!(sa["state_hash"], sb["state_hash"], "two devices converge");
let resume_a = a.resume("c1").unwrap();
let resume_b = b.resume("c1").unwrap();
assert_eq!(resume_a, resume_b);
let msgs = resume_a.as_array().unwrap();
assert_eq!(msgs.len(), 2, "user + assistant");
assert_eq!(msgs[0]["role"], json!("user"));
assert_eq!(msgs[1]["role"], json!("assistant"));
let transcript = a.transcript("c1");
assert_eq!(transcript.as_array().unwrap().len(), 2);
}
#[test]
fn dispatch_fence_refuses_stale_epoch_and_already_committed() {
let tmp = tempfile::tempdir().unwrap();
let relay = tmp.path().join("relay");
let (_tc, wc) = manual_clock();
let coord = InMemoryLeaseCoordinator::new(wc);
let (_ta, wa) = manual_clock();
let (_tb, wb) = manual_clock();
let mut a = SyncSubsystem::open_with(
"mac-a".into(),
&tmp.path().join("a"),
&relay,
Box::new(coord.clone()),
wa,
)
.unwrap();
let mut b = SyncSubsystem::open_with(
"mac-b".into(),
&tmp.path().join("b"),
&relay,
Box::new(coord),
wb,
)
.unwrap();
let lease = a.lease_acquire("milo", 100).unwrap();
assert_eq!(lease["epoch"], json!(1));
let f = a.fence_check("milo", "run-1", 1).unwrap();
assert_eq!(f["may_dispatch"], json!(true));
a.record_intent(Scope::Personal, &Intent::new("milo", "run-1", 1, IntentStatus::Committed))
.unwrap();
a.pump().unwrap();
b.pump().unwrap();
let f = a.fence_check("milo", "run-1", 1).unwrap();
assert_eq!(f["decision"]["decision"], json!("already_committed"));
assert_eq!(f["may_dispatch"], json!(false));
let fb = b.fence_check("milo", "run-1", 1).unwrap();
assert_eq!(fb["decision"]["decision"], json!("already_committed"));
let stale = b.fence_check("milo", "run-2", 1).unwrap();
assert_eq!(stale["decision"]["decision"], json!("stale_epoch"));
assert_eq!(stale["may_dispatch"], json!(false));
}
#[test]
fn lease_is_visible_across_two_devices_sharing_the_register() {
let tmp = tempfile::tempdir().unwrap();
let relay = tmp.path().join("relay");
let (mut a, mut b) =
two_devices(&relay, &tmp.path().join("a"), &tmp.path().join("b"));
a.lease_acquire("milo", 1_000_000).unwrap();
let status = b.lease_status("milo").unwrap();
assert_eq!(status["lease"]["holder"], json!("mac-a"));
assert!(b.lease_acquire("milo", 100).is_err());
a.lease_release("milo", 1).unwrap();
let lease = b.lease_acquire("milo", 100).unwrap();
assert_eq!((lease["epoch"].as_u64(), lease["holder"].as_str()), (Some(2), Some("mac-b")));
}
}