Skip to main content

llm_sync/
session.rs

1// SPDX-License-Identifier: MIT
2//! Sync session management for agent-to-agent state exchange.
3//!
4//! ## Responsibility
5//! Encapsulate the full shared CRDT state for a sync session, including
6//! vector clocks, counters, sets, registers, and maps. Provide a single
7//! `merge` entry point that delegates to each CRDT's merge operation.
8//!
9//! ## Guarantees
10//! - All merges are commutative, associative, and idempotent.
11//! - JSON round-trip is lossless.
12
13use std::collections::HashMap;
14use serde::{Deserialize, Serialize};
15use uuid::Uuid;
16use crate::crdt::{GCounter, GSet, LWWRegister, ORMap};
17use crate::error::SyncError;
18use crate::vclock::VectorClock;
19
20/// Unique identifier for a sync session.
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
22pub struct SessionId(pub String);
23
24impl SessionId {
25    /// Generate a fresh random session id.
26    pub fn new() -> Self { Self(Uuid::new_v4().to_string()) }
27}
28
29impl Default for SessionId {
30    fn default() -> Self { Self::new() }
31}
32
33/// The full CRDT-backed shared state for a sync session.
34///
35/// Each field is a named map of CRDTs, allowing arbitrary named slots
36/// without requiring schema changes.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct AgentState {
39    /// Unique session identifier.
40    pub session_id: SessionId,
41    /// Vector clock tracking causal history across agents.
42    pub vector_clock: VectorClock,
43    /// Named grow-only counters.
44    pub counters: HashMap<String, GCounter>,
45    /// Named grow-only sets.
46    pub sets: HashMap<String, GSet>,
47    /// Named last-write-wins string registers.
48    pub registers: HashMap<String, LWWRegister<String>>,
49    /// Named observed-remove maps.
50    pub maps: HashMap<String, ORMap>,
51}
52
53impl Default for AgentState {
54    fn default() -> Self {
55        Self {
56            session_id: SessionId::new(),
57            vector_clock: VectorClock::new(),
58            counters: HashMap::new(),
59            sets: HashMap::new(),
60            registers: HashMap::new(),
61            maps: HashMap::new(),
62        }
63    }
64}
65
66impl AgentState {
67    /// Create a new AgentState with a fresh random session id.
68    pub fn new() -> Self { Self::default() }
69
70    /// Override the session id (builder pattern).
71    pub fn with_session(mut self, id: SessionId) -> Self {
72        self.session_id = id;
73        self
74    }
75
76    /// Merge another agent's state into this one.
77    ///
78    /// All merges are CRDT-compliant: commutative, associative, idempotent.
79    ///
80    /// # Returns
81    /// A new `AgentState` representing the merged view.
82    pub fn merge(&self, other: &AgentState) -> AgentState {
83        let mut result = self.clone();
84        result.vector_clock = self.vector_clock.merge(&other.vector_clock);
85
86        for (k, v) in &other.counters {
87            result.counters
88                .entry(k.clone())
89                .and_modify(|existing| *existing = existing.merge(v))
90                .or_insert_with(|| v.clone());
91        }
92        for (k, v) in &other.sets {
93            result.sets
94                .entry(k.clone())
95                .and_modify(|existing| *existing = existing.merge(v))
96                .or_insert_with(|| v.clone());
97        }
98        for (k, v) in &other.registers {
99            result.registers
100                .entry(k.clone())
101                .and_modify(|existing| *existing = existing.merge(v))
102                .or_insert_with(|| v.clone());
103        }
104        for (k, v) in &other.maps {
105            result.maps
106                .entry(k.clone())
107                .and_modify(|existing| *existing = existing.merge(v))
108                .or_insert_with(|| v.clone());
109        }
110        result
111    }
112
113    /// Serialize this state to JSON.
114    ///
115    /// # Returns
116    /// - `Ok(String)` — JSON representation.
117    /// - `Err(SyncError::Serialization)` — on failure.
118    pub fn to_json(&self) -> Result<String, SyncError> {
119        serde_json::to_string(self).map_err(SyncError::Serialization)
120    }
121
122    /// Deserialize state from JSON.
123    ///
124    /// # Returns
125    /// - `Ok(AgentState)` on success.
126    /// - `Err(SyncError::Serialization)` on parse failure.
127    pub fn from_json(s: &str) -> Result<Self, SyncError> {
128        serde_json::from_str(s).map_err(SyncError::Serialization)
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135
136    #[test]
137    fn test_agent_state_merge_counters_commutative() {
138        let mut s1 = AgentState::new();
139        let mut g1 = GCounter::new(); g1.increment("a1", 5);
140        s1.counters.insert("req_count".into(), g1);
141
142        let mut s2 = AgentState::new();
143        let mut g2 = GCounter::new(); g2.increment("a2", 3);
144        s2.counters.insert("req_count".into(), g2);
145
146        let m1 = s1.merge(&s2);
147        let m2 = s2.merge(&s1);
148        assert_eq!(m1.counters["req_count"].value(), m2.counters["req_count"].value());
149        assert_eq!(m1.counters["req_count"].value(), 8);
150    }
151
152    #[test]
153    fn test_agent_state_merge_is_idempotent() {
154        let mut s = AgentState::new();
155        let mut g = GCounter::new(); g.increment("x", 10);
156        s.counters.insert("c".into(), g);
157        let m = s.merge(&s.clone());
158        assert_eq!(m.counters["c"].value(), 10);
159    }
160
161    #[test]
162    fn test_agent_state_json_roundtrip_preserves_session_id() {
163        let s = AgentState::new();
164        let json = s.to_json().unwrap();
165        let decoded = AgentState::from_json(&json).unwrap();
166        assert_eq!(decoded.session_id, s.session_id);
167    }
168
169    #[test]
170    fn test_agent_state_from_json_invalid_returns_error() {
171        let err = AgentState::from_json("not json").unwrap_err();
172        assert!(matches!(err, SyncError::Serialization(_)));
173    }
174
175    #[test]
176    fn test_agent_state_merge_sets_union() {
177        let mut s1 = AgentState::new();
178        let mut set1 = GSet::new(); set1.insert("capability-a");
179        s1.sets.insert("capabilities".into(), set1);
180
181        let mut s2 = AgentState::new();
182        let mut set2 = GSet::new(); set2.insert("capability-b");
183        s2.sets.insert("capabilities".into(), set2);
184
185        let merged = s1.merge(&s2);
186        let caps = &merged.sets["capabilities"];
187        assert!(caps.contains("capability-a"));
188        assert!(caps.contains("capability-b"));
189    }
190
191    #[test]
192    fn test_agent_state_merge_registers_lww() {
193        let mut s1 = AgentState::new();
194        let mut r1: LWWRegister<String> = LWWRegister::new();
195        r1.write("model-a".into(), 1, "agent1");
196        s1.registers.insert("current_model".into(), r1);
197
198        let mut s2 = AgentState::new();
199        let mut r2: LWWRegister<String> = LWWRegister::new();
200        r2.write("model-b".into(), 10, "agent2");
201        s2.registers.insert("current_model".into(), r2);
202
203        let merged = s1.merge(&s2);
204        assert_eq!(merged.registers["current_model"].read().unwrap(), "model-b");
205    }
206
207    #[test]
208    fn test_agent_state_merge_propagates_new_keys_from_other() {
209        let s1 = AgentState::new();
210        let mut s2 = AgentState::new();
211        let mut g = GCounter::new(); g.increment("n", 7);
212        s2.counters.insert("new_key".into(), g);
213
214        let merged = s1.merge(&s2);
215        assert_eq!(merged.counters["new_key"].value(), 7);
216    }
217
218    #[test]
219    fn test_agent_state_with_session_overrides_id() {
220        let id = SessionId("custom".into());
221        let s = AgentState::new().with_session(id.clone());
222        assert_eq!(s.session_id, id);
223    }
224
225    #[test]
226    fn test_agent_state_merge_maps_lww() {
227        let mut s1 = AgentState::new();
228        let mut m1 = ORMap::new(); m1.set("config", "v1", 1, "a");
229        s1.maps.insert("cfg".into(), m1);
230
231        let mut s2 = AgentState::new();
232        let mut m2 = ORMap::new(); m2.set("config", "v2", 5, "b");
233        s2.maps.insert("cfg".into(), m2);
234
235        let merged = s1.merge(&s2);
236        assert_eq!(merged.maps["cfg"].get("config").unwrap(), "v2");
237    }
238
239    #[test]
240    fn test_session_id_default_is_unique() {
241        let a = SessionId::new();
242        let b = SessionId::new();
243        assert_ne!(a.0, b.0);
244    }
245}