Skip to main content

oversync_core/
model.rs

1use std::collections::HashMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
8pub struct RawRow {
9	pub row_key: String,
10	pub row_data: serde_json::Value,
11}
12
13#[derive(Debug, Clone, Default)]
14pub struct DeltaResult {
15	pub created: Vec<DeltaEvent>,
16	pub updated: Vec<DeltaEvent>,
17	pub deleted: Vec<DeltaEvent>,
18}
19
20impl DeltaResult {
21	pub fn total(&self) -> usize {
22		self.created.len() + self.updated.len() + self.deleted.len()
23	}
24
25	pub fn is_empty(&self) -> bool {
26		self.total() == 0
27	}
28}
29
30/// Wire format for sink delivery. Each event is one message.
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub struct EventEnvelope {
33	pub meta: EventMeta,
34	pub data: serde_json::Value,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct EventMeta {
39	pub op: OpType,
40	pub origin_id: String,
41	pub query_id: String,
42	pub key: String,
43	pub hash: String,
44	pub cycle_id: u64,
45	pub timestamp: DateTime<Utc>,
46}
47
48impl From<&DeltaEvent> for EventEnvelope {
49	fn from(e: &DeltaEvent) -> Self {
50		Self {
51			meta: EventMeta {
52				op: e.op,
53				origin_id: e.origin_id.clone(),
54				query_id: e.query_id.clone(),
55				key: e.row_key.clone(),
56				hash: e.row_hash.clone(),
57				cycle_id: e.cycle_id,
58				timestamp: e.timestamp,
59			},
60			data: e.row_data.clone(),
61		}
62	}
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum CycleStatus {
68	Running,
69	Success,
70	Failed,
71	Aborted,
72}
73
74impl std::fmt::Display for CycleStatus {
75	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76		match self {
77			Self::Running => write!(f, "running"),
78			Self::Success => write!(f, "success"),
79			Self::Failed => write!(f, "failed"),
80			Self::Aborted => write!(f, "aborted"),
81		}
82	}
83}
84
85pub fn compute_diff(
86	previous: &HashMap<String, String>,
87	current: &[RawRow],
88	origin_id: &str,
89	query_id: &str,
90	cycle_id: u64,
91) -> DeltaResult {
92	let now = Utc::now();
93	let mut result = DeltaResult::default();
94	let mut seen_keys = std::collections::HashSet::new();
95
96	for row in current {
97		seen_keys.insert(row.row_key.clone());
98		let row_hash = hash_row_data(&row.row_data);
99
100		match previous.get(&row.row_key) {
101			None => {
102				result.created.push(DeltaEvent {
103					op: OpType::Created,
104					origin_id: origin_id.into(),
105					query_id: query_id.into(),
106					row_key: row.row_key.clone(),
107					row_data: row.row_data.clone(),
108					row_hash,
109					cycle_id,
110					timestamp: now,
111				});
112			}
113			Some(prev_hash) if *prev_hash != row_hash => {
114				result.updated.push(DeltaEvent {
115					op: OpType::Updated,
116					origin_id: origin_id.into(),
117					query_id: query_id.into(),
118					row_key: row.row_key.clone(),
119					row_data: row.row_data.clone(),
120					row_hash,
121					cycle_id,
122					timestamp: now,
123				});
124			}
125			_ => {}
126		}
127	}
128
129	for (key, hash) in previous {
130		if !seen_keys.contains(key) {
131			result.deleted.push(DeltaEvent {
132				op: OpType::Deleted,
133				origin_id: origin_id.into(),
134				query_id: query_id.into(),
135				row_key: key.clone(),
136				row_data: serde_json::Value::Null,
137				row_hash: hash.clone(),
138				cycle_id,
139				timestamp: now,
140			});
141		}
142	}
143
144	result
145}
146
147#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum OpType {
150	Created,
151	Updated,
152	Deleted,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
156pub struct DeltaEvent {
157	pub op: OpType,
158	pub origin_id: String,
159	pub query_id: String,
160	pub row_key: String,
161	pub row_data: serde_json::Value,
162	pub row_hash: String,
163	pub cycle_id: u64,
164	pub timestamp: DateTime<Utc>,
165}
166
167pub fn hash_row_data(data: &serde_json::Value) -> String {
168	let serialized = serde_json::to_string(data).unwrap_or_default();
169	let mut hasher = Sha256::new();
170	hasher.update(serialized.as_bytes());
171	hex::encode(hasher.finalize())
172}
173
174impl std::fmt::Display for OpType {
175	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176		match self {
177			OpType::Created => write!(f, "created"),
178			OpType::Updated => write!(f, "updated"),
179			OpType::Deleted => write!(f, "deleted"),
180		}
181	}
182}
183
184#[derive(Debug, Clone, Deserialize)]
185#[serde(tag = "type", rename_all = "snake_case")]
186pub enum AuthConfig {
187	Bearer { token: String },
188	Header { name: String, value: String },
189	Basic { username: String, password: String },
190}
191
192#[cfg(test)]
193mod tests {
194	use super::*;
195
196	#[test]
197	fn hash_is_deterministic() {
198		let data = serde_json::json!({"name": "alice", "age": 30});
199		assert_eq!(hash_row_data(&data), hash_row_data(&data));
200	}
201
202	#[test]
203	fn hash_differs_for_different_data() {
204		let a = serde_json::json!({"name": "alice"});
205		let b = serde_json::json!({"name": "bob"});
206		assert_ne!(hash_row_data(&a), hash_row_data(&b));
207	}
208
209	#[test]
210	fn hash_is_64_char_hex() {
211		let hash = hash_row_data(&serde_json::json!({}));
212		assert_eq!(hash.len(), 64);
213		assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
214	}
215
216	#[test]
217	fn op_type_display() {
218		assert_eq!(OpType::Created.to_string(), "created");
219		assert_eq!(OpType::Updated.to_string(), "updated");
220		assert_eq!(OpType::Deleted.to_string(), "deleted");
221	}
222
223	#[test]
224	fn raw_row_roundtrip_json() {
225		let row = RawRow {
226			row_key: "pk-1".into(),
227			row_data: serde_json::json!({"col": "val"}),
228		};
229		let json = serde_json::to_string(&row).unwrap();
230		let back: RawRow = serde_json::from_str(&json).unwrap();
231		assert_eq!(row, back);
232	}
233
234	#[test]
235	fn delta_event_roundtrip_json() {
236		let evt = DeltaEvent {
237			op: OpType::Created,
238			origin_id: "pg-prod".into(),
239			query_id: "q1".into(),
240			row_key: "pk-1".into(),
241			row_data: serde_json::json!({"x": 1}),
242			row_hash: hash_row_data(&serde_json::json!({"x": 1})),
243			cycle_id: 42,
244			timestamp: chrono::Utc::now(),
245		};
246		let json = serde_json::to_string(&evt).unwrap();
247		let back: DeltaEvent = serde_json::from_str(&json).unwrap();
248		assert_eq!(evt.op, back.op);
249		assert_eq!(evt.row_key, back.row_key);
250		assert_eq!(evt.cycle_id, back.cycle_id);
251	}
252
253	#[test]
254	fn op_type_serde_snake_case() {
255		let json = serde_json::to_string(&OpType::Created).unwrap();
256		assert_eq!(json, "\"created\"");
257		let back: OpType = serde_json::from_str("\"updated\"").unwrap();
258		assert_eq!(back, OpType::Updated);
259	}
260
261	// ── DeltaResult tests ──────────────────────────────────────────
262
263	#[test]
264	fn delta_result_empty_by_default() {
265		let r = DeltaResult::default();
266		assert!(r.is_empty());
267		assert_eq!(r.total(), 0);
268	}
269
270	#[test]
271	fn delta_result_total_counts_all() {
272		let evt = || DeltaEvent {
273			op: OpType::Created,
274			origin_id: "s".into(),
275			query_id: "q".into(),
276			row_key: "k".into(),
277			row_data: serde_json::json!({}),
278			row_hash: "h".into(),
279			cycle_id: 1,
280			timestamp: Utc::now(),
281		};
282		let r = DeltaResult {
283			created: vec![evt()],
284			updated: vec![evt(), evt()],
285			deleted: vec![evt()],
286		};
287		assert_eq!(r.total(), 4);
288		assert!(!r.is_empty());
289	}
290
291	// ── CycleStatus tests ──────────────────────────────────────────
292
293	#[test]
294	fn cycle_status_display() {
295		assert_eq!(CycleStatus::Running.to_string(), "running");
296		assert_eq!(CycleStatus::Success.to_string(), "success");
297		assert_eq!(CycleStatus::Failed.to_string(), "failed");
298		assert_eq!(CycleStatus::Aborted.to_string(), "aborted");
299	}
300
301	#[test]
302	fn cycle_status_serde() {
303		let json = serde_json::to_string(&CycleStatus::Aborted).unwrap();
304		assert_eq!(json, "\"aborted\"");
305		let back: CycleStatus = serde_json::from_str("\"success\"").unwrap();
306		assert_eq!(back, CycleStatus::Success);
307	}
308
309	// ── compute_diff tests ─────────────────────────────────────────
310
311	#[test]
312	fn diff_all_created_when_no_previous() {
313		let prev = HashMap::new();
314		let rows = vec![
315			RawRow {
316				row_key: "a".into(),
317				row_data: serde_json::json!({"v": 1}),
318			},
319			RawRow {
320				row_key: "b".into(),
321				row_data: serde_json::json!({"v": 2}),
322			},
323		];
324		let r = compute_diff(&prev, &rows, "src", "q", 1);
325		assert_eq!(r.created.len(), 2);
326		assert!(r.updated.is_empty());
327		assert!(r.deleted.is_empty());
328		assert!(r.created.iter().all(|e| e.op == OpType::Created));
329	}
330
331	#[test]
332	fn diff_all_deleted_when_no_current() {
333		let prev = HashMap::from([
334			("a".into(), hash_row_data(&serde_json::json!({"v": 1}))),
335			("b".into(), hash_row_data(&serde_json::json!({"v": 2}))),
336		]);
337		let r = compute_diff(&prev, &[], "src", "q", 2);
338		assert!(r.created.is_empty());
339		assert!(r.updated.is_empty());
340		assert_eq!(r.deleted.len(), 2);
341		assert!(r.deleted.iter().all(|e| e.op == OpType::Deleted));
342	}
343
344	#[test]
345	fn diff_detects_update() {
346		let data_v1 = serde_json::json!({"name": "alice"});
347		let data_v2 = serde_json::json!({"name": "alice_updated"});
348		let prev = HashMap::from([("a".into(), hash_row_data(&data_v1))]);
349		let rows = vec![RawRow {
350			row_key: "a".into(),
351			row_data: data_v2,
352		}];
353		let r = compute_diff(&prev, &rows, "src", "q", 2);
354		assert!(r.created.is_empty());
355		assert_eq!(r.updated.len(), 1);
356		assert!(r.deleted.is_empty());
357		assert_eq!(r.updated[0].row_key, "a");
358	}
359
360	#[test]
361	fn diff_no_change_when_hash_matches() {
362		let data = serde_json::json!({"name": "alice"});
363		let prev = HashMap::from([("a".into(), hash_row_data(&data))]);
364		let rows = vec![RawRow {
365			row_key: "a".into(),
366			row_data: data,
367		}];
368		let r = compute_diff(&prev, &rows, "src", "q", 2);
369		assert!(r.is_empty());
370	}
371
372	#[test]
373	fn diff_mixed_operations() {
374		let data_a = serde_json::json!({"v": 1});
375		let data_b = serde_json::json!({"v": 2});
376		let data_b2 = serde_json::json!({"v": 3});
377		let prev = HashMap::from([
378			("a".into(), hash_row_data(&data_a)),
379			("b".into(), hash_row_data(&data_b)),
380			("c".into(), hash_row_data(&serde_json::json!({"v": 99}))),
381		]);
382		let rows = vec![
383			RawRow {
384				row_key: "a".into(),
385				row_data: data_a,
386			},
387			RawRow {
388				row_key: "b".into(),
389				row_data: data_b2,
390			},
391			RawRow {
392				row_key: "d".into(),
393				row_data: serde_json::json!({"v": 4}),
394			},
395		];
396		let r = compute_diff(&prev, &rows, "src", "q", 3);
397		assert_eq!(r.created.len(), 1);
398		assert_eq!(r.created[0].row_key, "d");
399		assert_eq!(r.updated.len(), 1);
400		assert_eq!(r.updated[0].row_key, "b");
401		assert_eq!(r.deleted.len(), 1);
402		assert_eq!(r.deleted[0].row_key, "c");
403	}
404
405	#[test]
406	fn diff_preserves_source_and_query_ids() {
407		let rows = vec![RawRow {
408			row_key: "x".into(),
409			row_data: serde_json::json!({}),
410		}];
411		let r = compute_diff(&HashMap::new(), &rows, "my-src", "my-q", 7);
412		assert_eq!(r.created[0].origin_id, "my-src");
413		assert_eq!(r.created[0].query_id, "my-q");
414		assert_eq!(r.created[0].cycle_id, 7);
415	}
416
417	#[test]
418	fn diff_deleted_events_carry_previous_hash() {
419		let data = serde_json::json!({"k": 1});
420		let h = hash_row_data(&data);
421		let prev = HashMap::from([("gone".into(), h.clone())]);
422		let r = compute_diff(&prev, &[], "s", "q", 5);
423		assert_eq!(r.deleted[0].row_hash, h);
424		assert_eq!(r.deleted[0].row_data, serde_json::Value::Null);
425	}
426}