1use std::collections::HashMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
8pub struct RawRow {
9 pub row_key: String,
10 pub row_data: serde_json::Value,
11}
12
13#[derive(Debug, Clone, Default)]
14pub struct DeltaResult {
15 pub created: Vec<DeltaEvent>,
16 pub updated: Vec<DeltaEvent>,
17 pub deleted: Vec<DeltaEvent>,
18}
19
20impl DeltaResult {
21 pub fn total(&self) -> usize {
22 self.created.len() + self.updated.len() + self.deleted.len()
23 }
24
25 pub fn is_empty(&self) -> bool {
26 self.total() == 0
27 }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub struct EventEnvelope {
33 pub meta: EventMeta,
34 pub data: serde_json::Value,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct EventMeta {
39 pub op: OpType,
40 pub origin_id: String,
41 pub query_id: String,
42 pub key: String,
43 pub hash: String,
44 pub cycle_id: u64,
45 pub timestamp: DateTime<Utc>,
46}
47
48impl From<&DeltaEvent> for EventEnvelope {
49 fn from(e: &DeltaEvent) -> Self {
50 Self {
51 meta: EventMeta {
52 op: e.op,
53 origin_id: e.origin_id.clone(),
54 query_id: e.query_id.clone(),
55 key: e.row_key.clone(),
56 hash: e.row_hash.clone(),
57 cycle_id: e.cycle_id,
58 timestamp: e.timestamp,
59 },
60 data: e.row_data.clone(),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum CycleStatus {
68 Running,
69 Success,
70 Failed,
71 Aborted,
72}
73
74impl std::fmt::Display for CycleStatus {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 Self::Running => write!(f, "running"),
78 Self::Success => write!(f, "success"),
79 Self::Failed => write!(f, "failed"),
80 Self::Aborted => write!(f, "aborted"),
81 }
82 }
83}
84
85pub fn compute_diff(
86 previous: &HashMap<String, String>,
87 current: &[RawRow],
88 origin_id: &str,
89 query_id: &str,
90 cycle_id: u64,
91) -> DeltaResult {
92 let now = Utc::now();
93 let mut result = DeltaResult::default();
94 let mut seen_keys = std::collections::HashSet::new();
95
96 for row in current {
97 seen_keys.insert(row.row_key.clone());
98 let row_hash = hash_row_data(&row.row_data);
99
100 match previous.get(&row.row_key) {
101 None => {
102 result.created.push(DeltaEvent {
103 op: OpType::Created,
104 origin_id: origin_id.into(),
105 query_id: query_id.into(),
106 row_key: row.row_key.clone(),
107 row_data: row.row_data.clone(),
108 row_hash,
109 cycle_id,
110 timestamp: now,
111 });
112 }
113 Some(prev_hash) if *prev_hash != row_hash => {
114 result.updated.push(DeltaEvent {
115 op: OpType::Updated,
116 origin_id: origin_id.into(),
117 query_id: query_id.into(),
118 row_key: row.row_key.clone(),
119 row_data: row.row_data.clone(),
120 row_hash,
121 cycle_id,
122 timestamp: now,
123 });
124 }
125 _ => {}
126 }
127 }
128
129 for (key, hash) in previous {
130 if !seen_keys.contains(key) {
131 result.deleted.push(DeltaEvent {
132 op: OpType::Deleted,
133 origin_id: origin_id.into(),
134 query_id: query_id.into(),
135 row_key: key.clone(),
136 row_data: serde_json::Value::Null,
137 row_hash: hash.clone(),
138 cycle_id,
139 timestamp: now,
140 });
141 }
142 }
143
144 result
145}
146
147#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum OpType {
150 Created,
151 Updated,
152 Deleted,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
156pub struct DeltaEvent {
157 pub op: OpType,
158 pub origin_id: String,
159 pub query_id: String,
160 pub row_key: String,
161 pub row_data: serde_json::Value,
162 pub row_hash: String,
163 pub cycle_id: u64,
164 pub timestamp: DateTime<Utc>,
165}
166
167pub fn hash_row_data(data: &serde_json::Value) -> String {
168 let serialized = serde_json::to_string(data).unwrap_or_default();
169 let mut hasher = Sha256::new();
170 hasher.update(serialized.as_bytes());
171 hex::encode(hasher.finalize())
172}
173
174impl std::fmt::Display for OpType {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 match self {
177 OpType::Created => write!(f, "created"),
178 OpType::Updated => write!(f, "updated"),
179 OpType::Deleted => write!(f, "deleted"),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Deserialize)]
185#[serde(tag = "type", rename_all = "snake_case")]
186pub enum AuthConfig {
187 Bearer { token: String },
188 Header { name: String, value: String },
189 Basic { username: String, password: String },
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn hash_is_deterministic() {
198 let data = serde_json::json!({"name": "alice", "age": 30});
199 assert_eq!(hash_row_data(&data), hash_row_data(&data));
200 }
201
202 #[test]
203 fn hash_differs_for_different_data() {
204 let a = serde_json::json!({"name": "alice"});
205 let b = serde_json::json!({"name": "bob"});
206 assert_ne!(hash_row_data(&a), hash_row_data(&b));
207 }
208
209 #[test]
210 fn hash_is_64_char_hex() {
211 let hash = hash_row_data(&serde_json::json!({}));
212 assert_eq!(hash.len(), 64);
213 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
214 }
215
216 #[test]
217 fn op_type_display() {
218 assert_eq!(OpType::Created.to_string(), "created");
219 assert_eq!(OpType::Updated.to_string(), "updated");
220 assert_eq!(OpType::Deleted.to_string(), "deleted");
221 }
222
223 #[test]
224 fn raw_row_roundtrip_json() {
225 let row = RawRow {
226 row_key: "pk-1".into(),
227 row_data: serde_json::json!({"col": "val"}),
228 };
229 let json = serde_json::to_string(&row).unwrap();
230 let back: RawRow = serde_json::from_str(&json).unwrap();
231 assert_eq!(row, back);
232 }
233
234 #[test]
235 fn delta_event_roundtrip_json() {
236 let evt = DeltaEvent {
237 op: OpType::Created,
238 origin_id: "pg-prod".into(),
239 query_id: "q1".into(),
240 row_key: "pk-1".into(),
241 row_data: serde_json::json!({"x": 1}),
242 row_hash: hash_row_data(&serde_json::json!({"x": 1})),
243 cycle_id: 42,
244 timestamp: chrono::Utc::now(),
245 };
246 let json = serde_json::to_string(&evt).unwrap();
247 let back: DeltaEvent = serde_json::from_str(&json).unwrap();
248 assert_eq!(evt.op, back.op);
249 assert_eq!(evt.row_key, back.row_key);
250 assert_eq!(evt.cycle_id, back.cycle_id);
251 }
252
253 #[test]
254 fn op_type_serde_snake_case() {
255 let json = serde_json::to_string(&OpType::Created).unwrap();
256 assert_eq!(json, "\"created\"");
257 let back: OpType = serde_json::from_str("\"updated\"").unwrap();
258 assert_eq!(back, OpType::Updated);
259 }
260
261 #[test]
264 fn delta_result_empty_by_default() {
265 let r = DeltaResult::default();
266 assert!(r.is_empty());
267 assert_eq!(r.total(), 0);
268 }
269
270 #[test]
271 fn delta_result_total_counts_all() {
272 let evt = || DeltaEvent {
273 op: OpType::Created,
274 origin_id: "s".into(),
275 query_id: "q".into(),
276 row_key: "k".into(),
277 row_data: serde_json::json!({}),
278 row_hash: "h".into(),
279 cycle_id: 1,
280 timestamp: Utc::now(),
281 };
282 let r = DeltaResult {
283 created: vec![evt()],
284 updated: vec![evt(), evt()],
285 deleted: vec![evt()],
286 };
287 assert_eq!(r.total(), 4);
288 assert!(!r.is_empty());
289 }
290
291 #[test]
294 fn cycle_status_display() {
295 assert_eq!(CycleStatus::Running.to_string(), "running");
296 assert_eq!(CycleStatus::Success.to_string(), "success");
297 assert_eq!(CycleStatus::Failed.to_string(), "failed");
298 assert_eq!(CycleStatus::Aborted.to_string(), "aborted");
299 }
300
301 #[test]
302 fn cycle_status_serde() {
303 let json = serde_json::to_string(&CycleStatus::Aborted).unwrap();
304 assert_eq!(json, "\"aborted\"");
305 let back: CycleStatus = serde_json::from_str("\"success\"").unwrap();
306 assert_eq!(back, CycleStatus::Success);
307 }
308
309 #[test]
312 fn diff_all_created_when_no_previous() {
313 let prev = HashMap::new();
314 let rows = vec![
315 RawRow {
316 row_key: "a".into(),
317 row_data: serde_json::json!({"v": 1}),
318 },
319 RawRow {
320 row_key: "b".into(),
321 row_data: serde_json::json!({"v": 2}),
322 },
323 ];
324 let r = compute_diff(&prev, &rows, "src", "q", 1);
325 assert_eq!(r.created.len(), 2);
326 assert!(r.updated.is_empty());
327 assert!(r.deleted.is_empty());
328 assert!(r.created.iter().all(|e| e.op == OpType::Created));
329 }
330
331 #[test]
332 fn diff_all_deleted_when_no_current() {
333 let prev = HashMap::from([
334 ("a".into(), hash_row_data(&serde_json::json!({"v": 1}))),
335 ("b".into(), hash_row_data(&serde_json::json!({"v": 2}))),
336 ]);
337 let r = compute_diff(&prev, &[], "src", "q", 2);
338 assert!(r.created.is_empty());
339 assert!(r.updated.is_empty());
340 assert_eq!(r.deleted.len(), 2);
341 assert!(r.deleted.iter().all(|e| e.op == OpType::Deleted));
342 }
343
344 #[test]
345 fn diff_detects_update() {
346 let data_v1 = serde_json::json!({"name": "alice"});
347 let data_v2 = serde_json::json!({"name": "alice_updated"});
348 let prev = HashMap::from([("a".into(), hash_row_data(&data_v1))]);
349 let rows = vec![RawRow {
350 row_key: "a".into(),
351 row_data: data_v2,
352 }];
353 let r = compute_diff(&prev, &rows, "src", "q", 2);
354 assert!(r.created.is_empty());
355 assert_eq!(r.updated.len(), 1);
356 assert!(r.deleted.is_empty());
357 assert_eq!(r.updated[0].row_key, "a");
358 }
359
360 #[test]
361 fn diff_no_change_when_hash_matches() {
362 let data = serde_json::json!({"name": "alice"});
363 let prev = HashMap::from([("a".into(), hash_row_data(&data))]);
364 let rows = vec![RawRow {
365 row_key: "a".into(),
366 row_data: data,
367 }];
368 let r = compute_diff(&prev, &rows, "src", "q", 2);
369 assert!(r.is_empty());
370 }
371
372 #[test]
373 fn diff_mixed_operations() {
374 let data_a = serde_json::json!({"v": 1});
375 let data_b = serde_json::json!({"v": 2});
376 let data_b2 = serde_json::json!({"v": 3});
377 let prev = HashMap::from([
378 ("a".into(), hash_row_data(&data_a)),
379 ("b".into(), hash_row_data(&data_b)),
380 ("c".into(), hash_row_data(&serde_json::json!({"v": 99}))),
381 ]);
382 let rows = vec![
383 RawRow {
384 row_key: "a".into(),
385 row_data: data_a,
386 },
387 RawRow {
388 row_key: "b".into(),
389 row_data: data_b2,
390 },
391 RawRow {
392 row_key: "d".into(),
393 row_data: serde_json::json!({"v": 4}),
394 },
395 ];
396 let r = compute_diff(&prev, &rows, "src", "q", 3);
397 assert_eq!(r.created.len(), 1);
398 assert_eq!(r.created[0].row_key, "d");
399 assert_eq!(r.updated.len(), 1);
400 assert_eq!(r.updated[0].row_key, "b");
401 assert_eq!(r.deleted.len(), 1);
402 assert_eq!(r.deleted[0].row_key, "c");
403 }
404
405 #[test]
406 fn diff_preserves_source_and_query_ids() {
407 let rows = vec![RawRow {
408 row_key: "x".into(),
409 row_data: serde_json::json!({}),
410 }];
411 let r = compute_diff(&HashMap::new(), &rows, "my-src", "my-q", 7);
412 assert_eq!(r.created[0].origin_id, "my-src");
413 assert_eq!(r.created[0].query_id, "my-q");
414 assert_eq!(r.created[0].cycle_id, 7);
415 }
416
417 #[test]
418 fn diff_deleted_events_carry_previous_hash() {
419 let data = serde_json::json!({"k": 1});
420 let h = hash_row_data(&data);
421 let prev = HashMap::from([("gone".into(), h.clone())]);
422 let r = compute_diff(&prev, &[], "s", "q", 5);
423 assert_eq!(r.deleted[0].row_hash, h);
424 assert_eq!(r.deleted[0].row_data, serde_json::Value::Null);
425 }
426}