Skip to main content

pylon_sync/
lib.rs

1use std::sync::Mutex;
2
3use serde::{Deserialize, Serialize};
4
5// ---------------------------------------------------------------------------
6// Change events — the append-only log entries
7// ---------------------------------------------------------------------------
8
9/// A change event in the sync log.
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub struct ChangeEvent {
12    /// Monotonically increasing sequence number.
13    pub seq: u64,
14    /// The entity that was changed.
15    pub entity: String,
16    /// The row ID that was changed.
17    pub row_id: String,
18    /// The type of change.
19    pub kind: ChangeKind,
20    /// The data after the change (None for deletes).
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub data: Option<serde_json::Value>,
23    /// Timestamp of the change.
24    pub timestamp: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "lowercase")]
29pub enum ChangeKind {
30    Insert,
31    Update,
32    Delete,
33}
34
35// ---------------------------------------------------------------------------
36// Sync cursor — tracks client position in the log
37// ---------------------------------------------------------------------------
38
39/// A sync cursor representing a client's position in the change log.
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub struct SyncCursor {
42    /// The last sequence number the client has seen.
43    pub last_seq: u64,
44}
45
46impl SyncCursor {
47    pub fn beginning() -> Self {
48        Self { last_seq: 0 }
49    }
50}
51
52// ---------------------------------------------------------------------------
53// Pull response — what the server sends to a pulling client
54// ---------------------------------------------------------------------------
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PullResponse {
58    /// Changes since the client's cursor.
59    pub changes: Vec<ChangeEvent>,
60    /// The new cursor position after these changes.
61    pub cursor: SyncCursor,
62    /// Whether there are more changes to pull.
63    pub has_more: bool,
64}
65
66/// Error returned by [`ChangeLog::pull`].
67#[derive(Debug, Clone)]
68pub enum PullError {
69    /// The caller's cursor has fallen off the back of the retention window.
70    /// The client should do a full re-sync from entity-list state rather than
71    /// trusting the delta stream — events between `cursor.last_seq` and
72    /// `oldest_seq` were evicted and cannot be replayed.
73    ResyncRequired { oldest_seq: u64, cursor: SyncCursor },
74}
75
76// ---------------------------------------------------------------------------
77// Push request — what a client sends to push changes
78// ---------------------------------------------------------------------------
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct PushRequest {
82    /// The changes the client wants to push.
83    pub changes: Vec<ClientChange>,
84    /// Stable identifier for this client across reconnects. Lets the server
85    /// correlate retries (even without op_id) and attach per-client
86    /// diagnostics / rate limits. Clients that don't supply one get a
87    /// synthesized `"anon"` bucket for those features. Legacy clients
88    /// without this field keep working — the router ignores it when
89    /// absent.
90    #[serde(default, skip_serializing_if = "Option::is_none")]
91    pub client_id: Option<String>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ClientChange {
96    pub entity: String,
97    pub row_id: String,
98    pub kind: ChangeKind,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub data: Option<serde_json::Value>,
101    /// Client-minted idempotency key. The server remembers recently-seen
102    /// op_ids and short-circuits replays with the previous result instead
103    /// of re-applying the change. When absent, no dedup is performed (legacy
104    /// clients stay functional but lose idempotency on retry).
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub op_id: Option<String>,
107}
108
109// ---------------------------------------------------------------------------
110// Change log — in-memory append-only log
111// ---------------------------------------------------------------------------
112
113/// An in-memory change log with bounded retention.
114///
115/// Older events are evicted when the log exceeds `capacity`. The sequence
116/// counter still increments monotonically; clients pulling with an old
117/// cursor will see only what remains in memory (or should issue a full
118/// re-sync if their cursor falls off the back).
119pub struct ChangeLog {
120    events: Mutex<std::collections::VecDeque<ChangeEvent>>,
121    seq: Mutex<u64>,
122    capacity: usize,
123    /// Recently-seen client op_ids, for push idempotency. Bounded by
124    /// `op_id_capacity`; oldest entries age out when the map grows past it.
125    seen_op_ids: Mutex<std::collections::VecDeque<String>>,
126    seen_op_id_set: Mutex<std::collections::HashSet<String>>,
127    op_id_capacity: usize,
128}
129
130impl ChangeLog {
131    /// Create a new change log with the default capacity of 10,000 events.
132    pub fn new() -> Self {
133        Self::with_capacity(10_000)
134    }
135
136    /// Create a new change log with a specific capacity.
137    pub fn with_capacity(capacity: usize) -> Self {
138        Self {
139            events: Mutex::new(std::collections::VecDeque::with_capacity(
140                capacity.min(1024),
141            )),
142            seq: Mutex::new(0),
143            capacity,
144            seen_op_ids: Mutex::new(std::collections::VecDeque::with_capacity(1024)),
145            seen_op_id_set: Mutex::new(std::collections::HashSet::with_capacity(1024)),
146            op_id_capacity: 10_000,
147        }
148    }
149
150    /// Returns true if this op_id was already applied. Used by the push
151    /// handler to short-circuit replays. Callers that observe `true` should
152    /// NOT re-apply the change and should return success to the client.
153    pub fn has_seen_op_id(&self, op_id: &str) -> bool {
154        self.seen_op_id_set.lock().unwrap().contains(op_id)
155    }
156
157    /// Mark an op_id as processed. Safe to call multiple times. Evicts the
158    /// oldest entry when the cache exceeds `op_id_capacity`.
159    pub fn remember_op_id(&self, op_id: &str) {
160        let mut set = self.seen_op_id_set.lock().unwrap();
161        if set.contains(op_id) {
162            return;
163        }
164        set.insert(op_id.to_string());
165        drop(set);
166        let mut q = self.seen_op_ids.lock().unwrap();
167        q.push_back(op_id.to_string());
168        while q.len() > self.op_id_capacity {
169            if let Some(evicted) = q.pop_front() {
170                self.seen_op_id_set.lock().unwrap().remove(&evicted);
171            }
172        }
173    }
174
175    /// Append a change event. Returns the assigned sequence number.
176    pub fn append(
177        &self,
178        entity: &str,
179        row_id: &str,
180        kind: ChangeKind,
181        data: Option<serde_json::Value>,
182    ) -> u64 {
183        let mut seq = self.seq.lock().unwrap();
184        *seq += 1;
185        let event = ChangeEvent {
186            seq: *seq,
187            entity: entity.to_string(),
188            row_id: row_id.to_string(),
189            kind,
190            data,
191            timestamp: now_iso8601(),
192        };
193        let mut events = self.events.lock().unwrap();
194        events.push_back(event);
195        while events.len() > self.capacity {
196            events.pop_front();
197        }
198        *seq
199    }
200
201    /// Pull changes since a cursor, up to a limit.
202    ///
203    /// Returns `Err(PullError::ResyncRequired)` when the caller's cursor has
204    /// fallen off the back of the retention window — i.e. the cursor's
205    /// `last_seq` is lower than the oldest seq we still remember. Previously
206    /// this case was silent: `pull` would return the surviving tail and
207    /// advance the cursor, so the client converged to a state that skipped
208    /// the evicted events entirely. That's a permanent correctness bug;
209    /// clients should instead do a full re-sync from entity list state.
210    pub fn pull(&self, cursor: &SyncCursor, limit: usize) -> Result<PullResponse, PullError> {
211        let events = self.events.lock().unwrap();
212        let current_seq = *self.seq.lock().unwrap();
213
214        // Detect "cursor from a previous server lifetime": the caller's
215        // cursor is ahead of the current seq counter. In-memory change logs
216        // reset on process restart, so a client that persisted cursor=15
217        // under the old server will silently tail-follow forever against
218        // the new server (which starts at 0 and will never produce seqs
219        // within (0, 15]). Force a resync so the client rehydrates from
220        // the entity list endpoints.
221        if cursor.last_seq > current_seq {
222            return Err(PullError::ResyncRequired {
223                oldest_seq: current_seq.saturating_add(1),
224                cursor: cursor.clone(),
225            });
226        }
227
228        // Detect "cursor too old": the caller's cursor is before the oldest
229        // retained event by more than one seq (i.e. there are evicted seqs
230        // between cursor.last_seq and front.seq). We do NOT carve out
231        // cursor=0 — a fresh client must use an entity-list endpoint for
232        // initial state rather than pull, because pull silently returning
233        // only the post-eviction tail hides that state is missing.
234        if let Some(front) = events.front() {
235            if cursor.last_seq + 1 < front.seq {
236                return Err(PullError::ResyncRequired {
237                    oldest_seq: front.seq,
238                    cursor: cursor.clone(),
239                });
240            }
241        }
242
243        let changes: Vec<ChangeEvent> = events
244            .iter()
245            .filter(|e| e.seq > cursor.last_seq)
246            .take(limit)
247            .cloned()
248            .collect();
249
250        let last_seq = changes.last().map(|e| e.seq).unwrap_or(cursor.last_seq);
251        let has_more = events.iter().any(|e| e.seq > last_seq);
252
253        Ok(PullResponse {
254            changes,
255            cursor: SyncCursor { last_seq },
256            has_more,
257        })
258    }
259
260    /// Get the total number of events in the log.
261    pub fn len(&self) -> usize {
262        self.events.lock().unwrap().len()
263    }
264
265    pub fn is_empty(&self) -> bool {
266        self.events.lock().unwrap().is_empty()
267    }
268}
269
270fn now_iso8601() -> String {
271    use std::time::{SystemTime, UNIX_EPOCH};
272    let ts = SystemTime::now()
273        .duration_since(UNIX_EPOCH)
274        .unwrap_or_default()
275        .as_secs();
276    format!("{}Z", ts)
277}
278
279// ---------------------------------------------------------------------------
280// Tests
281// ---------------------------------------------------------------------------
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn empty_log() {
289        let log = ChangeLog::new();
290        assert!(log.is_empty());
291        assert_eq!(log.len(), 0);
292    }
293
294    #[test]
295    fn append_and_pull() {
296        let log = ChangeLog::new();
297        log.append(
298            "User",
299            "u1",
300            ChangeKind::Insert,
301            Some(serde_json::json!({"name": "Alice"})),
302        );
303        log.append(
304            "User",
305            "u2",
306            ChangeKind::Insert,
307            Some(serde_json::json!({"name": "Bob"})),
308        );
309
310        assert_eq!(log.len(), 2);
311
312        let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
313        assert_eq!(resp.changes.len(), 2);
314        assert_eq!(resp.cursor.last_seq, 2);
315        assert!(!resp.has_more);
316    }
317
318    #[test]
319    fn pull_with_cursor() {
320        let log = ChangeLog::new();
321        log.append("User", "u1", ChangeKind::Insert, None);
322        log.append("User", "u2", ChangeKind::Insert, None);
323        log.append("User", "u3", ChangeKind::Insert, None);
324
325        // Pull from seq 1 — should get events 2 and 3.
326        let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
327        assert_eq!(resp.changes.len(), 2);
328        assert_eq!(resp.changes[0].seq, 2);
329        assert_eq!(resp.changes[1].seq, 3);
330    }
331
332    #[test]
333    fn pull_with_limit() {
334        let log = ChangeLog::new();
335        log.append("User", "u1", ChangeKind::Insert, None);
336        log.append("User", "u2", ChangeKind::Insert, None);
337        log.append("User", "u3", ChangeKind::Insert, None);
338
339        let resp = log.pull(&SyncCursor::beginning(), 2).unwrap();
340        assert_eq!(resp.changes.len(), 2);
341        assert!(resp.has_more);
342        assert_eq!(resp.cursor.last_seq, 2);
343
344        // Continue pulling.
345        let resp2 = log.pull(&resp.cursor, 2).unwrap();
346        assert_eq!(resp2.changes.len(), 1);
347        assert!(!resp2.has_more);
348    }
349
350    #[test]
351    fn change_kinds() {
352        let log = ChangeLog::new();
353        log.append(
354            "Todo",
355            "t1",
356            ChangeKind::Insert,
357            Some(serde_json::json!({"title": "Test"})),
358        );
359        log.append(
360            "Todo",
361            "t1",
362            ChangeKind::Update,
363            Some(serde_json::json!({"title": "Updated"})),
364        );
365        log.append("Todo", "t1", ChangeKind::Delete, None);
366
367        let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
368        assert_eq!(resp.changes[0].kind, ChangeKind::Insert);
369        assert_eq!(resp.changes[1].kind, ChangeKind::Update);
370        assert_eq!(resp.changes[2].kind, ChangeKind::Delete);
371        assert!(resp.changes[2].data.is_none());
372    }
373
374    #[test]
375    fn sequence_numbers_are_monotonic() {
376        let log = ChangeLog::new();
377        let s1 = log.append("A", "1", ChangeKind::Insert, None);
378        let s2 = log.append("B", "2", ChangeKind::Insert, None);
379        let s3 = log.append("C", "3", ChangeKind::Insert, None);
380        assert_eq!(s1, 1);
381        assert_eq!(s2, 2);
382        assert_eq!(s3, 3);
383    }
384
385    #[test]
386    fn serialization_roundtrip() {
387        let event = ChangeEvent {
388            seq: 1,
389            entity: "User".into(),
390            row_id: "u1".into(),
391            kind: ChangeKind::Insert,
392            data: Some(serde_json::json!({"name": "Test"})),
393            timestamp: "2024-01-01T00:00:00Z".into(),
394        };
395        let json = serde_json::to_string(&event).unwrap();
396        let parsed: ChangeEvent = serde_json::from_str(&json).unwrap();
397        assert_eq!(event, parsed);
398    }
399
400    // -- Edge cases --
401
402    #[test]
403    fn pull_from_future_cursor_requires_resync() {
404        // A cursor whose last_seq is greater than the log's current seq
405        // counter is from a previous server lifetime (the in-memory log
406        // reset on restart). The server must force resync — silently
407        // returning an empty tail here used to wedge clients forever.
408        let log = ChangeLog::new();
409        log.append("User", "u1", ChangeKind::Insert, None);
410        let err = log
411            .pull(&SyncCursor { last_seq: 999 }, 100)
412            .expect_err("future cursors must signal resync");
413        match err {
414            PullError::ResyncRequired { cursor, .. } => {
415                assert_eq!(cursor.last_seq, 999);
416            }
417        }
418    }
419
420    #[test]
421    fn pull_limit_zero_returns_empty() {
422        let log = ChangeLog::new();
423        log.append("User", "u1", ChangeKind::Insert, None);
424        let resp = log.pull(&SyncCursor::beginning(), 0).unwrap();
425        assert!(resp.changes.is_empty());
426    }
427
428    #[test]
429    fn pull_with_evicted_cursor_requires_resync() {
430        // Capacity 2 — we keep only the most recent 2. After seq 1..4 are
431        // appended the oldest retained is seq 3.
432        let log = ChangeLog::with_capacity(2);
433        log.append("A", "1", ChangeKind::Insert, None);
434        log.append("A", "2", ChangeKind::Insert, None);
435        log.append("A", "3", ChangeKind::Insert, None);
436        log.append("A", "4", ChangeKind::Insert, None);
437
438        // Client knew up to seq 1 — seq 2 is unrecoverable, so RESYNC.
439        let err = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap_err();
440        match err {
441            PullError::ResyncRequired { oldest_seq, .. } => {
442                assert_eq!(oldest_seq, 3);
443            }
444        }
445    }
446
447    #[test]
448    fn pull_with_cursor_at_eviction_boundary_is_ok() {
449        // Capacity 2 retains seq 2..3 after appending 1..3.
450        let log = ChangeLog::with_capacity(2);
451        log.append("A", "1", ChangeKind::Insert, None);
452        log.append("A", "2", ChangeKind::Insert, None);
453        log.append("A", "3", ChangeKind::Insert, None);
454        // Client cursor=1, next event is seq 2 — exactly what we have.
455        let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
456        assert_eq!(resp.changes.len(), 2);
457    }
458
459    #[test]
460    fn delete_event_has_no_data() {
461        let log = ChangeLog::new();
462        log.append("User", "u1", ChangeKind::Delete, None);
463        let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
464        assert!(resp.changes[0].data.is_none());
465    }
466
467    #[test]
468    fn concurrent_appends_get_unique_seqs() {
469        let log = ChangeLog::new();
470        let s1 = log.append("A", "1", ChangeKind::Insert, None);
471        let s2 = log.append("A", "1", ChangeKind::Update, None);
472        let s3 = log.append("A", "1", ChangeKind::Delete, None);
473        assert!(s1 < s2);
474        assert!(s2 < s3);
475    }
476
477    #[test]
478    fn push_request_serialization() {
479        let req = PushRequest {
480            changes: vec![ClientChange {
481                entity: "User".into(),
482                row_id: "u1".into(),
483                kind: ChangeKind::Insert,
484                data: Some(serde_json::json!({"name": "Alice"})),
485                op_id: None,
486            }],
487            client_id: Some("cl_123".into()),
488        };
489        let json = serde_json::to_string(&req).unwrap();
490        let parsed: PushRequest = serde_json::from_str(&json).unwrap();
491        assert_eq!(parsed.changes.len(), 1);
492        assert_eq!(parsed.changes[0].entity, "User");
493        assert_eq!(parsed.client_id.as_deref(), Some("cl_123"));
494    }
495
496    #[test]
497    fn push_request_accepts_missing_client_id() {
498        // Legacy clients that don't send client_id must still parse.
499        let json = r#"{"changes":[]}"#;
500        let parsed: PushRequest = serde_json::from_str(json).unwrap();
501        assert!(parsed.client_id.is_none());
502    }
503}