1use std::sync::Mutex;
2
3use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub struct ChangeEvent {
12 pub seq: u64,
14 pub entity: String,
16 pub row_id: String,
18 pub kind: ChangeKind,
20 #[serde(skip_serializing_if = "Option::is_none")]
22 pub data: Option<serde_json::Value>,
23 pub timestamp: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "lowercase")]
29pub enum ChangeKind {
30 Insert,
31 Update,
32 Delete,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub struct SyncCursor {
42 pub last_seq: u64,
44}
45
46impl SyncCursor {
47 pub fn beginning() -> Self {
48 Self { last_seq: 0 }
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PullResponse {
58 pub changes: Vec<ChangeEvent>,
60 pub cursor: SyncCursor,
62 pub has_more: bool,
64}
65
66#[derive(Debug, Clone)]
68pub enum PullError {
69 ResyncRequired { oldest_seq: u64, cursor: SyncCursor },
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct PushRequest {
82 pub changes: Vec<ClientChange>,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
91 pub client_id: Option<String>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ClientChange {
96 pub entity: String,
97 pub row_id: String,
98 pub kind: ChangeKind,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub data: Option<serde_json::Value>,
101 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub op_id: Option<String>,
107}
108
109pub struct ChangeLog {
120 events: Mutex<std::collections::VecDeque<ChangeEvent>>,
121 seq: Mutex<u64>,
122 capacity: usize,
123 seen_op_ids: Mutex<std::collections::VecDeque<String>>,
126 seen_op_id_set: Mutex<std::collections::HashSet<String>>,
127 op_id_capacity: usize,
128}
129
130impl ChangeLog {
131 pub fn new() -> Self {
133 Self::with_capacity(10_000)
134 }
135
136 pub fn with_capacity(capacity: usize) -> Self {
138 Self {
139 events: Mutex::new(std::collections::VecDeque::with_capacity(
140 capacity.min(1024),
141 )),
142 seq: Mutex::new(0),
143 capacity,
144 seen_op_ids: Mutex::new(std::collections::VecDeque::with_capacity(1024)),
145 seen_op_id_set: Mutex::new(std::collections::HashSet::with_capacity(1024)),
146 op_id_capacity: 10_000,
147 }
148 }
149
150 pub fn has_seen_op_id(&self, op_id: &str) -> bool {
154 self.seen_op_id_set.lock().unwrap().contains(op_id)
155 }
156
157 pub fn remember_op_id(&self, op_id: &str) {
160 let mut set = self.seen_op_id_set.lock().unwrap();
161 if set.contains(op_id) {
162 return;
163 }
164 set.insert(op_id.to_string());
165 drop(set);
166 let mut q = self.seen_op_ids.lock().unwrap();
167 q.push_back(op_id.to_string());
168 while q.len() > self.op_id_capacity {
169 if let Some(evicted) = q.pop_front() {
170 self.seen_op_id_set.lock().unwrap().remove(&evicted);
171 }
172 }
173 }
174
175 pub fn append(
177 &self,
178 entity: &str,
179 row_id: &str,
180 kind: ChangeKind,
181 data: Option<serde_json::Value>,
182 ) -> u64 {
183 let mut seq = self.seq.lock().unwrap();
184 *seq += 1;
185 let event = ChangeEvent {
186 seq: *seq,
187 entity: entity.to_string(),
188 row_id: row_id.to_string(),
189 kind,
190 data,
191 timestamp: now_iso8601(),
192 };
193 let mut events = self.events.lock().unwrap();
194 events.push_back(event);
195 while events.len() > self.capacity {
196 events.pop_front();
197 }
198 *seq
199 }
200
201 pub fn pull(&self, cursor: &SyncCursor, limit: usize) -> Result<PullResponse, PullError> {
211 let events = self.events.lock().unwrap();
212 let current_seq = *self.seq.lock().unwrap();
213
214 if cursor.last_seq > current_seq {
222 return Err(PullError::ResyncRequired {
223 oldest_seq: current_seq.saturating_add(1),
224 cursor: cursor.clone(),
225 });
226 }
227
228 if let Some(front) = events.front() {
235 if cursor.last_seq + 1 < front.seq {
236 return Err(PullError::ResyncRequired {
237 oldest_seq: front.seq,
238 cursor: cursor.clone(),
239 });
240 }
241 }
242
243 let changes: Vec<ChangeEvent> = events
244 .iter()
245 .filter(|e| e.seq > cursor.last_seq)
246 .take(limit)
247 .cloned()
248 .collect();
249
250 let last_seq = changes.last().map(|e| e.seq).unwrap_or(cursor.last_seq);
251 let has_more = events.iter().any(|e| e.seq > last_seq);
252
253 Ok(PullResponse {
254 changes,
255 cursor: SyncCursor { last_seq },
256 has_more,
257 })
258 }
259
260 pub fn len(&self) -> usize {
262 self.events.lock().unwrap().len()
263 }
264
265 pub fn is_empty(&self) -> bool {
266 self.events.lock().unwrap().is_empty()
267 }
268}
269
270fn now_iso8601() -> String {
271 use std::time::{SystemTime, UNIX_EPOCH};
272 let ts = SystemTime::now()
273 .duration_since(UNIX_EPOCH)
274 .unwrap_or_default()
275 .as_secs();
276 format!("{}Z", ts)
277}
278
279#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn empty_log() {
289 let log = ChangeLog::new();
290 assert!(log.is_empty());
291 assert_eq!(log.len(), 0);
292 }
293
294 #[test]
295 fn append_and_pull() {
296 let log = ChangeLog::new();
297 log.append(
298 "User",
299 "u1",
300 ChangeKind::Insert,
301 Some(serde_json::json!({"name": "Alice"})),
302 );
303 log.append(
304 "User",
305 "u2",
306 ChangeKind::Insert,
307 Some(serde_json::json!({"name": "Bob"})),
308 );
309
310 assert_eq!(log.len(), 2);
311
312 let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
313 assert_eq!(resp.changes.len(), 2);
314 assert_eq!(resp.cursor.last_seq, 2);
315 assert!(!resp.has_more);
316 }
317
318 #[test]
319 fn pull_with_cursor() {
320 let log = ChangeLog::new();
321 log.append("User", "u1", ChangeKind::Insert, None);
322 log.append("User", "u2", ChangeKind::Insert, None);
323 log.append("User", "u3", ChangeKind::Insert, None);
324
325 let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
327 assert_eq!(resp.changes.len(), 2);
328 assert_eq!(resp.changes[0].seq, 2);
329 assert_eq!(resp.changes[1].seq, 3);
330 }
331
332 #[test]
333 fn pull_with_limit() {
334 let log = ChangeLog::new();
335 log.append("User", "u1", ChangeKind::Insert, None);
336 log.append("User", "u2", ChangeKind::Insert, None);
337 log.append("User", "u3", ChangeKind::Insert, None);
338
339 let resp = log.pull(&SyncCursor::beginning(), 2).unwrap();
340 assert_eq!(resp.changes.len(), 2);
341 assert!(resp.has_more);
342 assert_eq!(resp.cursor.last_seq, 2);
343
344 let resp2 = log.pull(&resp.cursor, 2).unwrap();
346 assert_eq!(resp2.changes.len(), 1);
347 assert!(!resp2.has_more);
348 }
349
350 #[test]
351 fn change_kinds() {
352 let log = ChangeLog::new();
353 log.append(
354 "Todo",
355 "t1",
356 ChangeKind::Insert,
357 Some(serde_json::json!({"title": "Test"})),
358 );
359 log.append(
360 "Todo",
361 "t1",
362 ChangeKind::Update,
363 Some(serde_json::json!({"title": "Updated"})),
364 );
365 log.append("Todo", "t1", ChangeKind::Delete, None);
366
367 let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
368 assert_eq!(resp.changes[0].kind, ChangeKind::Insert);
369 assert_eq!(resp.changes[1].kind, ChangeKind::Update);
370 assert_eq!(resp.changes[2].kind, ChangeKind::Delete);
371 assert!(resp.changes[2].data.is_none());
372 }
373
374 #[test]
375 fn sequence_numbers_are_monotonic() {
376 let log = ChangeLog::new();
377 let s1 = log.append("A", "1", ChangeKind::Insert, None);
378 let s2 = log.append("B", "2", ChangeKind::Insert, None);
379 let s3 = log.append("C", "3", ChangeKind::Insert, None);
380 assert_eq!(s1, 1);
381 assert_eq!(s2, 2);
382 assert_eq!(s3, 3);
383 }
384
385 #[test]
386 fn serialization_roundtrip() {
387 let event = ChangeEvent {
388 seq: 1,
389 entity: "User".into(),
390 row_id: "u1".into(),
391 kind: ChangeKind::Insert,
392 data: Some(serde_json::json!({"name": "Test"})),
393 timestamp: "2024-01-01T00:00:00Z".into(),
394 };
395 let json = serde_json::to_string(&event).unwrap();
396 let parsed: ChangeEvent = serde_json::from_str(&json).unwrap();
397 assert_eq!(event, parsed);
398 }
399
400 #[test]
403 fn pull_from_future_cursor_requires_resync() {
404 let log = ChangeLog::new();
409 log.append("User", "u1", ChangeKind::Insert, None);
410 let err = log
411 .pull(&SyncCursor { last_seq: 999 }, 100)
412 .expect_err("future cursors must signal resync");
413 match err {
414 PullError::ResyncRequired { cursor, .. } => {
415 assert_eq!(cursor.last_seq, 999);
416 }
417 }
418 }
419
420 #[test]
421 fn pull_limit_zero_returns_empty() {
422 let log = ChangeLog::new();
423 log.append("User", "u1", ChangeKind::Insert, None);
424 let resp = log.pull(&SyncCursor::beginning(), 0).unwrap();
425 assert!(resp.changes.is_empty());
426 }
427
428 #[test]
429 fn pull_with_evicted_cursor_requires_resync() {
430 let log = ChangeLog::with_capacity(2);
433 log.append("A", "1", ChangeKind::Insert, None);
434 log.append("A", "2", ChangeKind::Insert, None);
435 log.append("A", "3", ChangeKind::Insert, None);
436 log.append("A", "4", ChangeKind::Insert, None);
437
438 let err = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap_err();
440 match err {
441 PullError::ResyncRequired { oldest_seq, .. } => {
442 assert_eq!(oldest_seq, 3);
443 }
444 }
445 }
446
447 #[test]
448 fn pull_with_cursor_at_eviction_boundary_is_ok() {
449 let log = ChangeLog::with_capacity(2);
451 log.append("A", "1", ChangeKind::Insert, None);
452 log.append("A", "2", ChangeKind::Insert, None);
453 log.append("A", "3", ChangeKind::Insert, None);
454 let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
456 assert_eq!(resp.changes.len(), 2);
457 }
458
459 #[test]
460 fn delete_event_has_no_data() {
461 let log = ChangeLog::new();
462 log.append("User", "u1", ChangeKind::Delete, None);
463 let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
464 assert!(resp.changes[0].data.is_none());
465 }
466
467 #[test]
468 fn concurrent_appends_get_unique_seqs() {
469 let log = ChangeLog::new();
470 let s1 = log.append("A", "1", ChangeKind::Insert, None);
471 let s2 = log.append("A", "1", ChangeKind::Update, None);
472 let s3 = log.append("A", "1", ChangeKind::Delete, None);
473 assert!(s1 < s2);
474 assert!(s2 < s3);
475 }
476
477 #[test]
478 fn push_request_serialization() {
479 let req = PushRequest {
480 changes: vec![ClientChange {
481 entity: "User".into(),
482 row_id: "u1".into(),
483 kind: ChangeKind::Insert,
484 data: Some(serde_json::json!({"name": "Alice"})),
485 op_id: None,
486 }],
487 client_id: Some("cl_123".into()),
488 };
489 let json = serde_json::to_string(&req).unwrap();
490 let parsed: PushRequest = serde_json::from_str(&json).unwrap();
491 assert_eq!(parsed.changes.len(), 1);
492 assert_eq!(parsed.changes[0].entity, "User");
493 assert_eq!(parsed.client_id.as_deref(), Some("cl_123"));
494 }
495
496 #[test]
497 fn push_request_accepts_missing_client_id() {
498 let json = r#"{"changes":[]}"#;
500 let parsed: PushRequest = serde_json::from_str(json).unwrap();
501 assert!(parsed.client_id.is_none());
502 }
503}