1use std::sync::Mutex;
2
3use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub struct ChangeEvent {
12 pub seq: u64,
14 pub entity: String,
16 pub row_id: String,
18 pub kind: ChangeKind,
20 #[serde(skip_serializing_if = "Option::is_none")]
22 pub data: Option<serde_json::Value>,
23 pub timestamp: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "lowercase")]
29pub enum ChangeKind {
30 Insert,
31 Update,
32 Delete,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub struct SyncCursor {
42 pub last_seq: u64,
44}
45
46impl SyncCursor {
47 pub fn beginning() -> Self {
48 Self { last_seq: 0 }
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PullResponse {
58 pub changes: Vec<ChangeEvent>,
60 pub cursor: SyncCursor,
62 pub has_more: bool,
64}
65
66#[derive(Debug, Clone)]
68pub enum PullError {
69 ResyncRequired { oldest_seq: u64, cursor: SyncCursor },
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct PushRequest {
82 pub changes: Vec<ClientChange>,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
91 pub client_id: Option<String>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ClientChange {
96 pub entity: String,
97 pub row_id: String,
98 pub kind: ChangeKind,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub data: Option<serde_json::Value>,
101 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub op_id: Option<String>,
107}
108
109pub struct ChangeLog {
120 events: Mutex<std::collections::VecDeque<ChangeEvent>>,
121 seq: Mutex<u64>,
122 capacity: usize,
123 seen_op_ids: Mutex<std::collections::VecDeque<String>>,
126 seen_op_id_set: Mutex<std::collections::HashSet<String>>,
127 op_id_capacity: usize,
128}
129
130impl ChangeLog {
131 pub fn new() -> Self {
133 Self::with_capacity(10_000)
134 }
135
136 pub fn with_capacity(capacity: usize) -> Self {
138 Self {
139 events: Mutex::new(std::collections::VecDeque::with_capacity(
140 capacity.min(1024),
141 )),
142 seq: Mutex::new(0),
143 capacity,
144 seen_op_ids: Mutex::new(std::collections::VecDeque::with_capacity(1024)),
145 seen_op_id_set: Mutex::new(std::collections::HashSet::with_capacity(1024)),
146 op_id_capacity: 10_000,
147 }
148 }
149
150 pub fn has_seen_op_id(&self, op_id: &str) -> bool {
154 self.seen_op_id_set.lock().unwrap().contains(op_id)
155 }
156
157 pub fn remember_op_id(&self, op_id: &str) {
160 let mut set = self.seen_op_id_set.lock().unwrap();
161 if set.contains(op_id) {
162 return;
163 }
164 set.insert(op_id.to_string());
165 drop(set);
166 let mut q = self.seen_op_ids.lock().unwrap();
167 q.push_back(op_id.to_string());
168 while q.len() > self.op_id_capacity {
169 if let Some(evicted) = q.pop_front() {
170 self.seen_op_id_set.lock().unwrap().remove(&evicted);
171 }
172 }
173 }
174
175 pub fn append(
177 &self,
178 entity: &str,
179 row_id: &str,
180 kind: ChangeKind,
181 data: Option<serde_json::Value>,
182 ) -> u64 {
183 let mut seq = self.seq.lock().unwrap();
184 *seq += 1;
185 let event = ChangeEvent {
186 seq: *seq,
187 entity: entity.to_string(),
188 row_id: row_id.to_string(),
189 kind,
190 data,
191 timestamp: now_iso8601(),
192 };
193 let mut events = self.events.lock().unwrap();
194 events.push_back(event);
195 while events.len() > self.capacity {
196 events.pop_front();
197 }
198 *seq
199 }
200
201 pub fn pull(&self, cursor: &SyncCursor, limit: usize) -> Result<PullResponse, PullError> {
211 let events = self.events.lock().unwrap();
212 let current_seq = *self.seq.lock().unwrap();
213
214 if cursor.last_seq > current_seq {
222 return Err(PullError::ResyncRequired {
223 oldest_seq: current_seq.saturating_add(1),
224 cursor: cursor.clone(),
225 });
226 }
227
228 if cursor.last_seq > 0 {
239 if let Some(front) = events.front() {
240 if cursor.last_seq + 1 < front.seq {
241 return Err(PullError::ResyncRequired {
242 oldest_seq: front.seq,
243 cursor: cursor.clone(),
244 });
245 }
246 }
247 }
248
249 let changes: Vec<ChangeEvent> = events
250 .iter()
251 .filter(|e| e.seq > cursor.last_seq)
252 .take(limit)
253 .cloned()
254 .collect();
255
256 let last_seq = changes.last().map(|e| e.seq).unwrap_or(cursor.last_seq);
257 let has_more = events.iter().any(|e| e.seq > last_seq);
258
259 Ok(PullResponse {
260 changes,
261 cursor: SyncCursor { last_seq },
262 has_more,
263 })
264 }
265
266 pub fn len(&self) -> usize {
268 self.events.lock().unwrap().len()
269 }
270
271 pub fn is_empty(&self) -> bool {
272 self.events.lock().unwrap().is_empty()
273 }
274}
275
276fn now_iso8601() -> String {
277 use std::time::{SystemTime, UNIX_EPOCH};
278 let ts = SystemTime::now()
279 .duration_since(UNIX_EPOCH)
280 .unwrap_or_default()
281 .as_secs();
282 format!("{}Z", ts)
283}
284
285#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn empty_log() {
295 let log = ChangeLog::new();
296 assert!(log.is_empty());
297 assert_eq!(log.len(), 0);
298 }
299
300 #[test]
301 fn append_and_pull() {
302 let log = ChangeLog::new();
303 log.append(
304 "User",
305 "u1",
306 ChangeKind::Insert,
307 Some(serde_json::json!({"name": "Alice"})),
308 );
309 log.append(
310 "User",
311 "u2",
312 ChangeKind::Insert,
313 Some(serde_json::json!({"name": "Bob"})),
314 );
315
316 assert_eq!(log.len(), 2);
317
318 let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
319 assert_eq!(resp.changes.len(), 2);
320 assert_eq!(resp.cursor.last_seq, 2);
321 assert!(!resp.has_more);
322 }
323
324 #[test]
325 fn pull_with_cursor() {
326 let log = ChangeLog::new();
327 log.append("User", "u1", ChangeKind::Insert, None);
328 log.append("User", "u2", ChangeKind::Insert, None);
329 log.append("User", "u3", ChangeKind::Insert, None);
330
331 let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
333 assert_eq!(resp.changes.len(), 2);
334 assert_eq!(resp.changes[0].seq, 2);
335 assert_eq!(resp.changes[1].seq, 3);
336 }
337
338 #[test]
339 fn pull_with_limit() {
340 let log = ChangeLog::new();
341 log.append("User", "u1", ChangeKind::Insert, None);
342 log.append("User", "u2", ChangeKind::Insert, None);
343 log.append("User", "u3", ChangeKind::Insert, None);
344
345 let resp = log.pull(&SyncCursor::beginning(), 2).unwrap();
346 assert_eq!(resp.changes.len(), 2);
347 assert!(resp.has_more);
348 assert_eq!(resp.cursor.last_seq, 2);
349
350 let resp2 = log.pull(&resp.cursor, 2).unwrap();
352 assert_eq!(resp2.changes.len(), 1);
353 assert!(!resp2.has_more);
354 }
355
356 #[test]
357 fn change_kinds() {
358 let log = ChangeLog::new();
359 log.append(
360 "Todo",
361 "t1",
362 ChangeKind::Insert,
363 Some(serde_json::json!({"title": "Test"})),
364 );
365 log.append(
366 "Todo",
367 "t1",
368 ChangeKind::Update,
369 Some(serde_json::json!({"title": "Updated"})),
370 );
371 log.append("Todo", "t1", ChangeKind::Delete, None);
372
373 let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
374 assert_eq!(resp.changes[0].kind, ChangeKind::Insert);
375 assert_eq!(resp.changes[1].kind, ChangeKind::Update);
376 assert_eq!(resp.changes[2].kind, ChangeKind::Delete);
377 assert!(resp.changes[2].data.is_none());
378 }
379
380 #[test]
381 fn sequence_numbers_are_monotonic() {
382 let log = ChangeLog::new();
383 let s1 = log.append("A", "1", ChangeKind::Insert, None);
384 let s2 = log.append("B", "2", ChangeKind::Insert, None);
385 let s3 = log.append("C", "3", ChangeKind::Insert, None);
386 assert_eq!(s1, 1);
387 assert_eq!(s2, 2);
388 assert_eq!(s3, 3);
389 }
390
391 #[test]
392 fn serialization_roundtrip() {
393 let event = ChangeEvent {
394 seq: 1,
395 entity: "User".into(),
396 row_id: "u1".into(),
397 kind: ChangeKind::Insert,
398 data: Some(serde_json::json!({"name": "Test"})),
399 timestamp: "2024-01-01T00:00:00Z".into(),
400 };
401 let json = serde_json::to_string(&event).unwrap();
402 let parsed: ChangeEvent = serde_json::from_str(&json).unwrap();
403 assert_eq!(event, parsed);
404 }
405
406 #[test]
409 fn pull_from_future_cursor_requires_resync() {
410 let log = ChangeLog::new();
415 log.append("User", "u1", ChangeKind::Insert, None);
416 let err = log
417 .pull(&SyncCursor { last_seq: 999 }, 100)
418 .expect_err("future cursors must signal resync");
419 match err {
420 PullError::ResyncRequired { cursor, .. } => {
421 assert_eq!(cursor.last_seq, 999);
422 }
423 }
424 }
425
426 #[test]
427 fn pull_limit_zero_returns_empty() {
428 let log = ChangeLog::new();
429 log.append("User", "u1", ChangeKind::Insert, None);
430 let resp = log.pull(&SyncCursor::beginning(), 0).unwrap();
431 assert!(resp.changes.is_empty());
432 }
433
434 #[test]
435 fn pull_with_evicted_cursor_requires_resync() {
436 let log = ChangeLog::with_capacity(2);
439 log.append("A", "1", ChangeKind::Insert, None);
440 log.append("A", "2", ChangeKind::Insert, None);
441 log.append("A", "3", ChangeKind::Insert, None);
442 log.append("A", "4", ChangeKind::Insert, None);
443
444 let err = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap_err();
446 match err {
447 PullError::ResyncRequired { oldest_seq, .. } => {
448 assert_eq!(oldest_seq, 3);
449 }
450 }
451 }
452
453 #[test]
454 fn fresh_cursor_zero_never_resyncs() {
455 let log = ChangeLog::with_capacity(2);
460 log.append("A", "1", ChangeKind::Insert, None);
461 log.append("A", "2", ChangeKind::Insert, None);
462 log.append("A", "3", ChangeKind::Insert, None);
463 log.append("A", "4", ChangeKind::Insert, None);
464 let resp = log
467 .pull(&SyncCursor { last_seq: 0 }, 100)
468 .expect("cursor=0 must never resync — no infinite loop");
469 assert_eq!(resp.changes.len(), 2);
470 assert_eq!(resp.changes[0].seq, 3);
471 }
472
473 #[test]
474 fn pull_with_cursor_at_eviction_boundary_is_ok() {
475 let log = ChangeLog::with_capacity(2);
477 log.append("A", "1", ChangeKind::Insert, None);
478 log.append("A", "2", ChangeKind::Insert, None);
479 log.append("A", "3", ChangeKind::Insert, None);
480 let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
482 assert_eq!(resp.changes.len(), 2);
483 }
484
485 #[test]
486 fn delete_event_has_no_data() {
487 let log = ChangeLog::new();
488 log.append("User", "u1", ChangeKind::Delete, None);
489 let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
490 assert!(resp.changes[0].data.is_none());
491 }
492
493 #[test]
494 fn concurrent_appends_get_unique_seqs() {
495 let log = ChangeLog::new();
496 let s1 = log.append("A", "1", ChangeKind::Insert, None);
497 let s2 = log.append("A", "1", ChangeKind::Update, None);
498 let s3 = log.append("A", "1", ChangeKind::Delete, None);
499 assert!(s1 < s2);
500 assert!(s2 < s3);
501 }
502
503 #[test]
504 fn push_request_serialization() {
505 let req = PushRequest {
506 changes: vec![ClientChange {
507 entity: "User".into(),
508 row_id: "u1".into(),
509 kind: ChangeKind::Insert,
510 data: Some(serde_json::json!({"name": "Alice"})),
511 op_id: None,
512 }],
513 client_id: Some("cl_123".into()),
514 };
515 let json = serde_json::to_string(&req).unwrap();
516 let parsed: PushRequest = serde_json::from_str(&json).unwrap();
517 assert_eq!(parsed.changes.len(), 1);
518 assert_eq!(parsed.changes[0].entity, "User");
519 assert_eq!(parsed.client_id.as_deref(), Some("cl_123"));
520 }
521
522 #[test]
523 fn push_request_accepts_missing_client_id() {
524 let json = r#"{"changes":[]}"#;
526 let parsed: PushRequest = serde_json::from_str(json).unwrap();
527 assert!(parsed.client_id.is_none());
528 }
529}