use std::sync::Mutex;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChangeEvent {
pub seq: u64,
pub entity: String,
pub row_id: String,
pub kind: ChangeKind,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
pub timestamp: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ChangeKind {
Insert,
Update,
Delete,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SyncCursor {
pub last_seq: u64,
}
impl SyncCursor {
pub fn beginning() -> Self {
Self { last_seq: 0 }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PullResponse {
pub changes: Vec<ChangeEvent>,
pub cursor: SyncCursor,
pub has_more: bool,
}
#[derive(Debug, Clone)]
pub enum PullError {
ResyncRequired { oldest_seq: u64, cursor: SyncCursor },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushRequest {
pub changes: Vec<ClientChange>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub client_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientChange {
pub entity: String,
pub row_id: String,
pub kind: ChangeKind,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub op_id: Option<String>,
}
pub struct ChangeLog {
events: Mutex<std::collections::VecDeque<ChangeEvent>>,
seq: Mutex<u64>,
capacity: usize,
seen_op_ids: Mutex<std::collections::VecDeque<String>>,
seen_op_id_set: Mutex<std::collections::HashSet<String>>,
op_id_capacity: usize,
}
impl ChangeLog {
pub fn new() -> Self {
Self::with_capacity(10_000)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
events: Mutex::new(std::collections::VecDeque::with_capacity(
capacity.min(1024),
)),
seq: Mutex::new(0),
capacity,
seen_op_ids: Mutex::new(std::collections::VecDeque::with_capacity(1024)),
seen_op_id_set: Mutex::new(std::collections::HashSet::with_capacity(1024)),
op_id_capacity: 10_000,
}
}
pub fn has_seen_op_id(&self, op_id: &str) -> bool {
self.seen_op_id_set.lock().unwrap().contains(op_id)
}
pub fn remember_op_id(&self, op_id: &str) {
let mut set = self.seen_op_id_set.lock().unwrap();
if set.contains(op_id) {
return;
}
set.insert(op_id.to_string());
drop(set);
let mut q = self.seen_op_ids.lock().unwrap();
q.push_back(op_id.to_string());
while q.len() > self.op_id_capacity {
if let Some(evicted) = q.pop_front() {
self.seen_op_id_set.lock().unwrap().remove(&evicted);
}
}
}
pub fn append(
&self,
entity: &str,
row_id: &str,
kind: ChangeKind,
data: Option<serde_json::Value>,
) -> u64 {
let mut seq = self.seq.lock().unwrap();
*seq += 1;
let event = ChangeEvent {
seq: *seq,
entity: entity.to_string(),
row_id: row_id.to_string(),
kind,
data,
timestamp: now_iso8601(),
};
let mut events = self.events.lock().unwrap();
events.push_back(event);
while events.len() > self.capacity {
events.pop_front();
}
*seq
}
pub fn pull(&self, cursor: &SyncCursor, limit: usize) -> Result<PullResponse, PullError> {
let events = self.events.lock().unwrap();
let current_seq = *self.seq.lock().unwrap();
if cursor.last_seq > current_seq {
return Err(PullError::ResyncRequired {
oldest_seq: current_seq.saturating_add(1),
cursor: cursor.clone(),
});
}
if cursor.last_seq > 0 {
if let Some(front) = events.front() {
if cursor.last_seq + 1 < front.seq {
return Err(PullError::ResyncRequired {
oldest_seq: front.seq,
cursor: cursor.clone(),
});
}
}
}
let changes: Vec<ChangeEvent> = events
.iter()
.filter(|e| e.seq > cursor.last_seq)
.take(limit)
.cloned()
.collect();
let last_seq = changes.last().map(|e| e.seq).unwrap_or(cursor.last_seq);
let has_more = events.iter().any(|e| e.seq > last_seq);
Ok(PullResponse {
changes,
cursor: SyncCursor { last_seq },
has_more,
})
}
pub fn len(&self) -> usize {
self.events.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.events.lock().unwrap().is_empty()
}
}
fn now_iso8601() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
format!("{}Z", ts)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_log() {
let log = ChangeLog::new();
assert!(log.is_empty());
assert_eq!(log.len(), 0);
}
#[test]
fn append_and_pull() {
let log = ChangeLog::new();
log.append(
"User",
"u1",
ChangeKind::Insert,
Some(serde_json::json!({"name": "Alice"})),
);
log.append(
"User",
"u2",
ChangeKind::Insert,
Some(serde_json::json!({"name": "Bob"})),
);
assert_eq!(log.len(), 2);
let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
assert_eq!(resp.changes.len(), 2);
assert_eq!(resp.cursor.last_seq, 2);
assert!(!resp.has_more);
}
#[test]
fn pull_with_cursor() {
let log = ChangeLog::new();
log.append("User", "u1", ChangeKind::Insert, None);
log.append("User", "u2", ChangeKind::Insert, None);
log.append("User", "u3", ChangeKind::Insert, None);
let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
assert_eq!(resp.changes.len(), 2);
assert_eq!(resp.changes[0].seq, 2);
assert_eq!(resp.changes[1].seq, 3);
}
#[test]
fn pull_with_limit() {
let log = ChangeLog::new();
log.append("User", "u1", ChangeKind::Insert, None);
log.append("User", "u2", ChangeKind::Insert, None);
log.append("User", "u3", ChangeKind::Insert, None);
let resp = log.pull(&SyncCursor::beginning(), 2).unwrap();
assert_eq!(resp.changes.len(), 2);
assert!(resp.has_more);
assert_eq!(resp.cursor.last_seq, 2);
let resp2 = log.pull(&resp.cursor, 2).unwrap();
assert_eq!(resp2.changes.len(), 1);
assert!(!resp2.has_more);
}
#[test]
fn change_kinds() {
let log = ChangeLog::new();
log.append(
"Todo",
"t1",
ChangeKind::Insert,
Some(serde_json::json!({"title": "Test"})),
);
log.append(
"Todo",
"t1",
ChangeKind::Update,
Some(serde_json::json!({"title": "Updated"})),
);
log.append("Todo", "t1", ChangeKind::Delete, None);
let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
assert_eq!(resp.changes[0].kind, ChangeKind::Insert);
assert_eq!(resp.changes[1].kind, ChangeKind::Update);
assert_eq!(resp.changes[2].kind, ChangeKind::Delete);
assert!(resp.changes[2].data.is_none());
}
#[test]
fn sequence_numbers_are_monotonic() {
let log = ChangeLog::new();
let s1 = log.append("A", "1", ChangeKind::Insert, None);
let s2 = log.append("B", "2", ChangeKind::Insert, None);
let s3 = log.append("C", "3", ChangeKind::Insert, None);
assert_eq!(s1, 1);
assert_eq!(s2, 2);
assert_eq!(s3, 3);
}
#[test]
fn serialization_roundtrip() {
let event = ChangeEvent {
seq: 1,
entity: "User".into(),
row_id: "u1".into(),
kind: ChangeKind::Insert,
data: Some(serde_json::json!({"name": "Test"})),
timestamp: "2024-01-01T00:00:00Z".into(),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: ChangeEvent = serde_json::from_str(&json).unwrap();
assert_eq!(event, parsed);
}
#[test]
fn pull_from_future_cursor_requires_resync() {
let log = ChangeLog::new();
log.append("User", "u1", ChangeKind::Insert, None);
let err = log
.pull(&SyncCursor { last_seq: 999 }, 100)
.expect_err("future cursors must signal resync");
match err {
PullError::ResyncRequired { cursor, .. } => {
assert_eq!(cursor.last_seq, 999);
}
}
}
#[test]
fn pull_limit_zero_returns_empty() {
let log = ChangeLog::new();
log.append("User", "u1", ChangeKind::Insert, None);
let resp = log.pull(&SyncCursor::beginning(), 0).unwrap();
assert!(resp.changes.is_empty());
}
#[test]
fn pull_with_evicted_cursor_requires_resync() {
let log = ChangeLog::with_capacity(2);
log.append("A", "1", ChangeKind::Insert, None);
log.append("A", "2", ChangeKind::Insert, None);
log.append("A", "3", ChangeKind::Insert, None);
log.append("A", "4", ChangeKind::Insert, None);
let err = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap_err();
match err {
PullError::ResyncRequired { oldest_seq, .. } => {
assert_eq!(oldest_seq, 3);
}
}
}
#[test]
fn fresh_cursor_zero_never_resyncs() {
let log = ChangeLog::with_capacity(2);
log.append("A", "1", ChangeKind::Insert, None);
log.append("A", "2", ChangeKind::Insert, None);
log.append("A", "3", ChangeKind::Insert, None);
log.append("A", "4", ChangeKind::Insert, None);
let resp = log
.pull(&SyncCursor { last_seq: 0 }, 100)
.expect("cursor=0 must never resync — no infinite loop");
assert_eq!(resp.changes.len(), 2);
assert_eq!(resp.changes[0].seq, 3);
}
#[test]
fn pull_with_cursor_at_eviction_boundary_is_ok() {
let log = ChangeLog::with_capacity(2);
log.append("A", "1", ChangeKind::Insert, None);
log.append("A", "2", ChangeKind::Insert, None);
log.append("A", "3", ChangeKind::Insert, None);
let resp = log.pull(&SyncCursor { last_seq: 1 }, 100).unwrap();
assert_eq!(resp.changes.len(), 2);
}
#[test]
fn delete_event_has_no_data() {
let log = ChangeLog::new();
log.append("User", "u1", ChangeKind::Delete, None);
let resp = log.pull(&SyncCursor::beginning(), 100).unwrap();
assert!(resp.changes[0].data.is_none());
}
#[test]
fn concurrent_appends_get_unique_seqs() {
let log = ChangeLog::new();
let s1 = log.append("A", "1", ChangeKind::Insert, None);
let s2 = log.append("A", "1", ChangeKind::Update, None);
let s3 = log.append("A", "1", ChangeKind::Delete, None);
assert!(s1 < s2);
assert!(s2 < s3);
}
#[test]
fn push_request_serialization() {
let req = PushRequest {
changes: vec![ClientChange {
entity: "User".into(),
row_id: "u1".into(),
kind: ChangeKind::Insert,
data: Some(serde_json::json!({"name": "Alice"})),
op_id: None,
}],
client_id: Some("cl_123".into()),
};
let json = serde_json::to_string(&req).unwrap();
let parsed: PushRequest = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.changes.len(), 1);
assert_eq!(parsed.changes[0].entity, "User");
assert_eq!(parsed.client_id.as_deref(), Some("cl_123"));
}
#[test]
fn push_request_accepts_missing_client_id() {
let json = r#"{"changes":[]}"#;
let parsed: PushRequest = serde_json::from_str(json).unwrap();
assert!(parsed.client_id.is_none());
}
}