Skip to main content

pylon_sync/
lib.rs

1use std::sync::Mutex;
2
3use serde::{Deserialize, Serialize};
4
5// ---------------------------------------------------------------------------
6// Change events — the append-only log entries
7// ---------------------------------------------------------------------------
8
9/// A change event in the sync log.
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub struct ChangeEvent {
12    /// Monotonically increasing sequence number.
13    pub seq: u64,
14    /// The entity that was changed.
15    pub entity: String,
16    /// The row ID that was changed.
17    pub row_id: String,
18    /// The type of change.
19    pub kind: ChangeKind,
20    /// The data after the change (None for deletes).
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub data: Option<serde_json::Value>,
23    /// Timestamp of the change.
24    pub timestamp: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "lowercase")]
29pub enum ChangeKind {
30    Insert,
31    Update,
32    Delete,
33}
34
35// ---------------------------------------------------------------------------
36// Sync cursor — tracks client position in the log
37// ---------------------------------------------------------------------------
38
39/// A sync cursor representing a client's position in the change log.
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub struct SyncCursor {
42    /// The last sequence number the client has seen.
43    pub last_seq: u64,
44}
45
46impl SyncCursor {
47    pub fn beginning() -> Self {
48        Self { last_seq: 0 }
49    }
50}
51
52// ---------------------------------------------------------------------------
53// Pull response — what the server sends to a pulling client
54// ---------------------------------------------------------------------------
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PullResponse {
58    /// Changes since the client's cursor.
59    pub changes: Vec<ChangeEvent>,
60    /// The new cursor position after these changes.
61    pub cursor: SyncCursor,
62    /// Whether there are more changes to pull.
63    pub has_more: bool,
64}
65
66/// Error returned by [`ChangeLog::pull`].
67#[derive(Debug, Clone)]
68pub enum PullError {
69    /// The caller's cursor has fallen off the back of the retention window.
70    /// The client should do a full re-sync from entity-list state rather than
71    /// trusting the delta stream — events between `cursor.last_seq` and
72    /// `oldest_seq` were evicted and cannot be replayed.
73    ResyncRequired { oldest_seq: u64, cursor: SyncCursor },
74}
75
76// ---------------------------------------------------------------------------
77// Push request — what a client sends to push changes
78// ---------------------------------------------------------------------------
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct PushRequest {
82    /// The changes the client wants to push.
83    pub changes: Vec<ClientChange>,
84    /// Stable identifier for this client across reconnects. Lets the server
85    /// correlate retries (even without op_id) and attach per-client
86    /// diagnostics / rate limits. Clients that don't supply one get a
87    /// synthesized `"anon"` bucket for those features. Legacy clients
88    /// without this field keep working — the router ignores it when
89    /// absent.
90    #[serde(default, skip_serializing_if = "Option::is_none")]
91    pub client_id: Option<String>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ClientChange {
96    pub entity: String,
97    pub row_id: String,
98    pub kind: ChangeKind,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub data: Option<serde_json::Value>,
101    /// Client-minted idempotency key. The server remembers recently-seen
102    /// op_ids and short-circuits replays with the previous result instead
103    /// of re-applying the change. When absent, no dedup is performed (legacy
104    /// clients stay functional but lose idempotency on retry).
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub op_id: Option<String>,
107}
108
109// ---------------------------------------------------------------------------
110// Change log — in-memory append-only log
111// ---------------------------------------------------------------------------
112
113/// An in-memory change log with bounded retention.
114///
115/// Older events are evicted when the log exceeds `capacity`. The sequence
116/// counter still increments monotonically; clients pulling with an old
117/// cursor will see only what remains in memory (or should issue a full
118/// re-sync if their cursor falls off the back).
119pub struct ChangeLog {
120    events: Mutex<std::collections::VecDeque<ChangeEvent>>,
121    seq: Mutex<u64>,
122    capacity: usize,
123    /// Recently-seen client op_ids, for push idempotency. Bounded by
124    /// `op_id_capacity`; oldest entries age out when the map grows past it.
125    seen_op_ids: Mutex<std::collections::VecDeque<String>>,
126    seen_op_id_set: Mutex<std::collections::HashSet<String>>,
127    op_id_capacity: usize,
128}
129
130impl ChangeLog {
131    /// Create a new change log with the default capacity of 10,000 events.
132    pub fn new() -> Self {
133        Self::with_capacity(10_000)
134    }
135
136    /// Create a new change log with a specific capacity.
137    pub fn with_capacity(capacity: usize) -> Self {
138        Self {
139            events: Mutex::new(std::collections::VecDeque::with_capacity(
140                capacity.min(1024),
141            )),
142            seq: Mutex::new(0),
143            capacity,
144            seen_op_ids: Mutex::new(std::collections::VecDeque::with_capacity(1024)),
145            seen_op_id_set: Mutex::new(std::collections::HashSet::with_capacity(1024)),
146            op_id_capacity: 10_000,
147        }
148    }
149
150    /// Returns true if this op_id was already applied. Used by the push
151    /// handler to short-circuit replays. Callers that observe `true` should
152    /// NOT re-apply the change and should return success to the client.
153    pub fn has_seen_op_id(&self, op_id: &str) -> bool {
154        self.seen_op_id_set.lock().unwrap().contains(op_id)
155    }
156
157    /// Mark an op_id as processed. Safe to call multiple times. Evicts the
158    /// oldest entry when the cache exceeds `op_id_capacity`.
159    pub fn remember_op_id(&self, op_id: &str) {
160        let mut set = self.seen_op_id_set.lock().unwrap();
161        if set.contains(op_id) {
162            return;
163        }
164        set.insert(op_id.to_string());
165        drop(set);
166        let mut q = self.seen_op_ids.lock().unwrap();
167        q.push_back(op_id.to_string());
168        while q.len() > self.op_id_capacity {
169            if let Some(evicted) = q.pop_front() {
170                self.seen_op_id_set.lock().unwrap().remove(&evicted);
171            }
172        }
173    }
174
175    /// Append a change event. Returns the assigned sequence number.
176    pub fn append(
177        &self,
178        entity: &str,
179        row_id: &str,
180        kind: ChangeKind,
181        data: Option<serde_json::Value>,
182    ) -> u64 {
183        let mut seq = self.seq.lock().unwrap();
184        *seq += 1;
185        let event = ChangeEvent {
186            seq: *seq,
187            entity: entity.to_string(),
188            row_id: row_id.to_string(),
189            kind,
190            data,
191            timestamp: now_iso8601(),
192        };
193        let mut events = self.events.lock().unwrap();
194        events.push_back(event);
195        while events.len() > self.capacity {
196            events.pop_front();
197        }
198        *seq
199    }
200
201    /// Pull changes since a cursor, up to a limit.
202    ///
203    /// Returns `Err(PullError::ResyncRequired)` when the caller's cursor has
204    /// fallen off the back of the retention window — i.e. the cursor's
205    /// `last_seq` is lower than the oldest seq we still remember. Previously
206    /// this case was silent: `pull` would return the surviving tail and
207    /// advance the cursor, so the client converged to a state that skipped
208    /// the evicted events entirely. That's a permanent correctness bug;
209    /// clients should instead do a full re-sync from entity list state.
210    pub fn pull(&self, cursor: &SyncCursor, limit: usize) -> Result<PullResponse, PullError> {
211        let events = self.events.lock().unwrap();
212        let current_seq = *self.seq.lock().unwrap();
213
214        // Detect "cursor from a previous server lifetime": the caller's
215        // cursor is ahead of the current seq counter. In-memory change logs
216        // reset on process restart, so a client that persisted cursor=15
217        // under the old server will silently tail-follow forever against
218        // the new server (which starts at 0 and will never produce seqs
219        // within (0, 15]). Force a resync so the client rehydrates from
220        // the entity list endpoints.
221        if cursor.last_seq > current_seq {
222            return Err(PullError::ResyncRequired {
223                oldest_seq: current_seq.saturating_add(1),
224                cursor: cursor.clone(),
225            });
226        }
227
228        // Detect "cursor too old": the caller's cursor is before the oldest
229        // retained event by more than one seq. EXCEPT cursor=0 — a fresh
230        // client gets whatever the log currently holds. The previous
231        // policy 410'd cursor=0 whenever the seeded entity replay had
232        // been evicted, which the React client handled by resetting
233        // back to cursor=0 and re-pulling — an infinite loop. The
234        // partial-tail risk the old comment warned about is real but
235        // narrow: the runtime now also re-seeds entity rows on demand
236        // (see `Runtime::seed_change_log`), so cursor=0 always gets a
237        // current snapshot of state.
238        if cursor.last_seq > 0 {
239            if let Some(front) = events.front() {
240                if cursor.last_seq + 1 < front.seq {
241                    return Err(PullError::ResyncRequired {
242                        oldest_seq: front.seq,
243                        cursor: cursor.clone(),
244                    });
245                }
246            }
247        }
248
249        let changes: Vec<ChangeEvent> = events
250            .iter()
251            .filter(|e| e.seq > cursor.last_seq)
252            .take(limit)
253            .cloned()
254            .collect();
255
256        let last_seq = changes.last().map(|e| e.seq).unwrap_or(cursor.last_seq);
257        let has_more = events.iter().any(|e| e.seq > last_seq);
258
259        Ok(PullResponse {
260            changes,
261            cursor: SyncCursor { last_seq },
262            has_more,
263        })
264    }
265
266    /// Get the total number of events in the log.
267    pub fn len(&self) -> usize {
268        self.events.lock().unwrap().len()
269    }
270
271    pub fn is_empty(&self) -> bool {
272        self.events.lock().unwrap().is_empty()
273    }
274}
275
276fn now_iso8601() -> String {
277    use std::time::{SystemTime, UNIX_EPOCH};
278    let ts = SystemTime::now()
279        .duration_since(UNIX_EPOCH)
280        .unwrap_or_default()
281        .as_secs();
282    format!("{}Z", ts)
283}
284
285// ---------------------------------------------------------------------------
286// Tests
287// ---------------------------------------------------------------------------
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn empty_log() {
295        let log = ChangeLog::new();
296        assert!(log.is_empty());
297        assert_eq!(log.len(), 0);
298    }
299
300    #[test]
301    fn append_and_pull() {
302        let log = ChangeLog::new();
303        log.append(
304            "User",
305            "u1",
306            ChangeKind::Insert,
307            Some(serde_json::json!({"name": "Alice"})),
308        );
309        log.append(
310            "User",
311            "u2",
312            ChangeKind::Insert,
313            Some(serde_json::json!({"name": "Bob"})),
314        );
315
316        assert_eq!(log.len(), 2);
317
318        let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
319        assert_eq!(resp.changes.len(), 2);
320        assert_eq!(resp.cursor.last_seq, 2);
321        assert!(!resp.has_more);
322    }
323
324    #[test]
325    fn pull_with_cursor() {
326        let log = ChangeLog::new();
327        log.append("User", "u1", ChangeKind::Insert, None);
328        log.append("User", "u2", ChangeKind::Insert, None);
329        log.append("User", "u3", ChangeKind::Insert, None);
330
331        // Pull from seq 1 — should get events 2 and 3.
332        let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
333        assert_eq!(resp.changes.len(), 2);
334        assert_eq!(resp.changes[0].seq, 2);
335        assert_eq!(resp.changes[1].seq, 3);
336    }
337
338    #[test]
339    fn pull_with_limit() {
340        let log = ChangeLog::new();
341        log.append("User", "u1", ChangeKind::Insert, None);
342        log.append("User", "u2", ChangeKind::Insert, None);
343        log.append("User", "u3", ChangeKind::Insert, None);
344
345        let resp = log.pull(&SyncCursor::beginning(), 2).unwrap();
346        assert_eq!(resp.changes.len(), 2);
347        assert!(resp.has_more);
348        assert_eq!(resp.cursor.last_seq, 2);
349
350        // Continue pulling.
351        let resp2 = log.pull(&resp.cursor, 2).unwrap();
352        assert_eq!(resp2.changes.len(), 1);
353        assert!(!resp2.has_more);
354    }
355
356    #[test]
357    fn change_kinds() {
358        let log = ChangeLog::new();
359        log.append(
360            "Todo",
361            "t1",
362            ChangeKind::Insert,
363            Some(serde_json::json!({"title": "Test"})),
364        );
365        log.append(
366            "Todo",
367            "t1",
368            ChangeKind::Update,
369            Some(serde_json::json!({"title": "Updated"})),
370        );
371        log.append("Todo", "t1", ChangeKind::Delete, None);
372
373        let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
374        assert_eq!(resp.changes[0].kind, ChangeKind::Insert);
375        assert_eq!(resp.changes[1].kind, ChangeKind::Update);
376        assert_eq!(resp.changes[2].kind, ChangeKind::Delete);
377        assert!(resp.changes[2].data.is_none());
378    }
379
380    #[test]
381    fn sequence_numbers_are_monotonic() {
382        let log = ChangeLog::new();
383        let s1 = log.append("A", "1", ChangeKind::Insert, None);
384        let s2 = log.append("B", "2", ChangeKind::Insert, None);
385        let s3 = log.append("C", "3", ChangeKind::Insert, None);
386        assert_eq!(s1, 1);
387        assert_eq!(s2, 2);
388        assert_eq!(s3, 3);
389    }
390
391    #[test]
392    fn serialization_roundtrip() {
393        let event = ChangeEvent {
394            seq: 1,
395            entity: "User".into(),
396            row_id: "u1".into(),
397            kind: ChangeKind::Insert,
398            data: Some(serde_json::json!({"name": "Test"})),
399            timestamp: "2024-01-01T00:00:00Z".into(),
400        };
401        let json = serde_json::to_string(&event).unwrap();
402        let parsed: ChangeEvent = serde_json::from_str(&json).unwrap();
403        assert_eq!(event, parsed);
404    }
405
406    // -- Edge cases --
407
408    #[test]
409    fn pull_from_future_cursor_requires_resync() {
410        // A cursor whose last_seq is greater than the log's current seq
411        // counter is from a previous server lifetime (the in-memory log
412        // reset on restart). The server must force resync — silently
413        // returning an empty tail here used to wedge clients forever.
414        let log = ChangeLog::new();
415        log.append("User", "u1", ChangeKind::Insert, None);
416        let err = log
417            .pull(&SyncCursor { last_seq: 999 }, 100)
418            .expect_err("future cursors must signal resync");
419        match err {
420            PullError::ResyncRequired { cursor, .. } => {
421                assert_eq!(cursor.last_seq, 999);
422            }
423        }
424    }
425
426    #[test]
427    fn pull_limit_zero_returns_empty() {
428        let log = ChangeLog::new();
429        log.append("User", "u1", ChangeKind::Insert, None);
430        let resp = log.pull(&SyncCursor::beginning(), 0).unwrap();
431        assert!(resp.changes.is_empty());
432    }
433
434    #[test]
435    fn pull_with_evicted_cursor_requires_resync() {
436        // Capacity 2 — we keep only the most recent 2. After seq 1..4 are
437        // appended the oldest retained is seq 3.
438        let log = ChangeLog::with_capacity(2);
439        log.append("A", "1", ChangeKind::Insert, None);
440        log.append("A", "2", ChangeKind::Insert, None);
441        log.append("A", "3", ChangeKind::Insert, None);
442        log.append("A", "4", ChangeKind::Insert, None);
443
444        // Client knew up to seq 1 — seq 2 is unrecoverable, so RESYNC.
445        let err = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap_err();
446        match err {
447            PullError::ResyncRequired { oldest_seq, .. } => {
448                assert_eq!(oldest_seq, 3);
449            }
450        }
451    }
452
453    #[test]
454    fn fresh_cursor_zero_never_resyncs() {
455        // Regression: previously cursor=0 would 410 if the seeded entity
456        // replay had been evicted, and the React client handled it by
457        // resetting to cursor=0 and re-pulling — infinite loop. cursor=0
458        // is "I just connected, give me what you have"; never 410.
459        let log = ChangeLog::with_capacity(2);
460        log.append("A", "1", ChangeKind::Insert, None);
461        log.append("A", "2", ChangeKind::Insert, None);
462        log.append("A", "3", ChangeKind::Insert, None);
463        log.append("A", "4", ChangeKind::Insert, None);
464        // Front is now seq 3 (1+2 evicted). Old behavior: 410 because
465        // 0+1 < 3. New: succeed and return what we have.
466        let resp = log
467            .pull(&SyncCursor { last_seq: 0 }, 100)
468            .expect("cursor=0 must never resync — no infinite loop");
469        assert_eq!(resp.changes.len(), 2);
470        assert_eq!(resp.changes[0].seq, 3);
471    }
472
473    #[test]
474    fn pull_with_cursor_at_eviction_boundary_is_ok() {
475        // Capacity 2 retains seq 2..3 after appending 1..3.
476        let log = ChangeLog::with_capacity(2);
477        log.append("A", "1", ChangeKind::Insert, None);
478        log.append("A", "2", ChangeKind::Insert, None);
479        log.append("A", "3", ChangeKind::Insert, None);
480        // Client cursor=1, next event is seq 2 — exactly what we have.
481        let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
482        assert_eq!(resp.changes.len(), 2);
483    }
484
485    #[test]
486    fn delete_event_has_no_data() {
487        let log = ChangeLog::new();
488        log.append("User", "u1", ChangeKind::Delete, None);
489        let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
490        assert!(resp.changes[0].data.is_none());
491    }
492
493    #[test]
494    fn concurrent_appends_get_unique_seqs() {
495        let log = ChangeLog::new();
496        let s1 = log.append("A", "1", ChangeKind::Insert, None);
497        let s2 = log.append("A", "1", ChangeKind::Update, None);
498        let s3 = log.append("A", "1", ChangeKind::Delete, None);
499        assert!(s1 < s2);
500        assert!(s2 < s3);
501    }
502
503    #[test]
504    fn push_request_serialization() {
505        let req = PushRequest {
506            changes: vec![ClientChange {
507                entity: "User".into(),
508                row_id: "u1".into(),
509                kind: ChangeKind::Insert,
510                data: Some(serde_json::json!({"name": "Alice"})),
511                op_id: None,
512            }],
513            client_id: Some("cl_123".into()),
514        };
515        let json = serde_json::to_string(&req).unwrap();
516        let parsed: PushRequest = serde_json::from_str(&json).unwrap();
517        assert_eq!(parsed.changes.len(), 1);
518        assert_eq!(parsed.changes[0].entity, "User");
519        assert_eq!(parsed.client_id.as_deref(), Some("cl_123"));
520    }
521
522    #[test]
523    fn push_request_accepts_missing_client_id() {
524        // Legacy clients that don't send client_id must still parse.
525        let json = r#"{"changes":[]}"#;
526        let parsed: PushRequest = serde_json::from_str(json).unwrap();
527        assert!(parsed.client_id.is_none());
528    }
529}