1use std::collections::HashMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
8pub struct RawRow {
9 pub row_key: String,
10 pub row_data: serde_json::Value,
11}
12
13#[derive(Debug, Clone, Default)]
14pub struct DeltaResult {
15 pub created: Vec<DeltaEvent>,
16 pub updated: Vec<DeltaEvent>,
17 pub deleted: Vec<DeltaEvent>,
18}
19
20impl DeltaResult {
21 pub fn total(&self) -> usize {
22 self.created.len() + self.updated.len() + self.deleted.len()
23 }
24
25 pub fn is_empty(&self) -> bool {
26 self.total() == 0
27 }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub struct EventEnvelope {
33 pub meta: EventMeta,
34 pub data: serde_json::Value,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct EventMeta {
39 pub op: OpType,
40 pub origin_id: String,
41 pub query_id: String,
42 pub key: String,
43 pub hash: String,
44 pub cycle_id: u64,
45 pub timestamp: DateTime<Utc>,
46}
47
48impl From<&DeltaEvent> for EventEnvelope {
49 fn from(e: &DeltaEvent) -> Self {
50 Self {
51 meta: EventMeta {
52 op: e.op,
53 origin_id: e.origin_id.clone(),
54 query_id: e.query_id.clone(),
55 key: e.row_key.clone(),
56 hash: e.row_hash.clone(),
57 cycle_id: e.cycle_id,
58 timestamp: e.timestamp,
59 },
60 data: e.row_data.clone(),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum CycleStatus {
68 Running,
69 Success,
70 Failed,
71 Aborted,
72}
73
74impl std::fmt::Display for CycleStatus {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 Self::Running => write!(f, "running"),
78 Self::Success => write!(f, "success"),
79 Self::Failed => write!(f, "failed"),
80 Self::Aborted => write!(f, "aborted"),
81 }
82 }
83}
84
85pub fn compute_diff(
86 previous: &HashMap<String, String>,
87 current: &[RawRow],
88 origin_id: &str,
89 query_id: &str,
90 cycle_id: u64,
91) -> DeltaResult {
92 let now = Utc::now();
93 let mut result = DeltaResult::default();
94 let hashes = hash_rows(current);
95 let mut seen_keys = std::collections::HashSet::with_capacity(current.len());
96
97 for (row, row_hash) in current.iter().zip(hashes) {
98 seen_keys.insert(row.row_key.clone());
99
100 match previous.get(&row.row_key) {
101 None => {
102 result.created.push(DeltaEvent {
103 op: OpType::Created,
104 origin_id: origin_id.into(),
105 query_id: query_id.into(),
106 row_key: row.row_key.clone(),
107 row_data: row.row_data.clone(),
108 row_hash,
109 cycle_id,
110 timestamp: now,
111 });
112 }
113 Some(prev_hash) if *prev_hash != row_hash => {
114 result.updated.push(DeltaEvent {
115 op: OpType::Updated,
116 origin_id: origin_id.into(),
117 query_id: query_id.into(),
118 row_key: row.row_key.clone(),
119 row_data: row.row_data.clone(),
120 row_hash,
121 cycle_id,
122 timestamp: now,
123 });
124 }
125 _ => {}
126 }
127 }
128
129 for (key, hash) in previous {
130 if !seen_keys.contains(key) {
131 result.deleted.push(DeltaEvent {
132 op: OpType::Deleted,
133 origin_id: origin_id.into(),
134 query_id: query_id.into(),
135 row_key: key.clone(),
136 row_data: serde_json::Value::Null,
137 row_hash: hash.clone(),
138 cycle_id,
139 timestamp: now,
140 });
141 }
142 }
143
144 result
145}
146
147#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum OpType {
150 Created,
151 Updated,
152 Deleted,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
156pub struct DeltaEvent {
157 pub op: OpType,
158 pub origin_id: String,
159 pub query_id: String,
160 pub row_key: String,
161 pub row_data: serde_json::Value,
162 pub row_hash: String,
163 pub cycle_id: u64,
164 pub timestamp: DateTime<Utc>,
165}
166
167pub fn hash_row_data(data: &serde_json::Value) -> String {
168 let serialized = serde_json::to_string(data).expect("serde_json::Value is always serializable");
169 let mut hasher = Sha256::new();
170 hasher.update(serialized.as_bytes());
171 const_hex::encode(hasher.finalize())
172}
173
174pub fn hash_rows(rows: &[RawRow]) -> Vec<String> {
176 #[cfg(feature = "parallel")]
177 {
178 use rayon::prelude::*;
179 rows.par_iter()
180 .map(|r| hash_row_data(&r.row_data))
181 .collect()
182 }
183 #[cfg(not(feature = "parallel"))]
184 {
185 rows.iter().map(|r| hash_row_data(&r.row_data)).collect()
186 }
187}
188
189impl std::fmt::Display for OpType {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 match self {
192 OpType::Created => write!(f, "created"),
193 OpType::Updated => write!(f, "updated"),
194 OpType::Deleted => write!(f, "deleted"),
195 }
196 }
197}
198
199#[derive(Debug, Clone, Deserialize)]
200#[serde(tag = "type", rename_all = "snake_case")]
201pub enum AuthConfig {
202 Bearer { token: String },
203 Header { name: String, value: String },
204 Basic { username: String, password: String },
205}
206
207#[derive(Debug, Clone, Deserialize)]
208pub struct KafkaAuth {
209 #[serde(default = "default_security_protocol")]
210 pub security_protocol: String,
211 pub sasl_mechanism: Option<String>,
212 pub sasl_username: Option<String>,
213 pub sasl_password: Option<String>,
214 pub sasl_kerberos_keytab: Option<String>,
215 pub sasl_kerberos_principal: Option<String>,
216 pub ssl_ca_location: Option<String>,
217 pub ssl_certificate_location: Option<String>,
218 pub ssl_key_location: Option<String>,
219}
220
221fn default_security_protocol() -> String {
222 "PLAINTEXT".to_string()
223}
224
225impl Default for KafkaAuth {
226 fn default() -> Self {
227 Self {
228 security_protocol: default_security_protocol(),
229 sasl_mechanism: None,
230 sasl_username: None,
231 sasl_password: None,
232 sasl_kerberos_keytab: None,
233 sasl_kerberos_principal: None,
234 ssl_ca_location: None,
235 ssl_certificate_location: None,
236 ssl_key_location: None,
237 }
238 }
239}
240
241impl KafkaAuth {
242 pub fn is_plaintext(&self) -> bool {
243 self.security_protocol == "PLAINTEXT"
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250
251 #[test]
252 fn hash_is_deterministic() {
253 let data = serde_json::json!({"name": "alice", "age": 30});
254 assert_eq!(hash_row_data(&data), hash_row_data(&data));
255 }
256
257 #[test]
258 fn hash_differs_for_different_data() {
259 let a = serde_json::json!({"name": "alice"});
260 let b = serde_json::json!({"name": "bob"});
261 assert_ne!(hash_row_data(&a), hash_row_data(&b));
262 }
263
264 #[test]
265 fn hash_is_64_char_hex() {
266 let hash = hash_row_data(&serde_json::json!({}));
267 assert_eq!(hash.len(), 64);
268 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
269 }
270
271 #[test]
272 fn op_type_display() {
273 assert_eq!(OpType::Created.to_string(), "created");
274 assert_eq!(OpType::Updated.to_string(), "updated");
275 assert_eq!(OpType::Deleted.to_string(), "deleted");
276 }
277
278 #[test]
279 fn raw_row_roundtrip_json() {
280 let row = RawRow {
281 row_key: "pk-1".into(),
282 row_data: serde_json::json!({"col": "val"}),
283 };
284 let json = serde_json::to_string(&row).unwrap();
285 let back: RawRow = serde_json::from_str(&json).unwrap();
286 assert_eq!(row, back);
287 }
288
289 #[test]
290 fn delta_event_roundtrip_json() {
291 let evt = DeltaEvent {
292 op: OpType::Created,
293 origin_id: "pg-prod".into(),
294 query_id: "q1".into(),
295 row_key: "pk-1".into(),
296 row_data: serde_json::json!({"x": 1}),
297 row_hash: hash_row_data(&serde_json::json!({"x": 1})),
298 cycle_id: 42,
299 timestamp: chrono::Utc::now(),
300 };
301 let json = serde_json::to_string(&evt).unwrap();
302 let back: DeltaEvent = serde_json::from_str(&json).unwrap();
303 assert_eq!(evt.op, back.op);
304 assert_eq!(evt.row_key, back.row_key);
305 assert_eq!(evt.cycle_id, back.cycle_id);
306 }
307
308 #[test]
309 fn op_type_serde_snake_case() {
310 let json = serde_json::to_string(&OpType::Created).unwrap();
311 assert_eq!(json, "\"created\"");
312 let back: OpType = serde_json::from_str("\"updated\"").unwrap();
313 assert_eq!(back, OpType::Updated);
314 }
315
316 #[test]
319 fn delta_result_empty_by_default() {
320 let r = DeltaResult::default();
321 assert!(r.is_empty());
322 assert_eq!(r.total(), 0);
323 }
324
325 #[test]
326 fn delta_result_total_counts_all() {
327 let evt = || DeltaEvent {
328 op: OpType::Created,
329 origin_id: "s".into(),
330 query_id: "q".into(),
331 row_key: "k".into(),
332 row_data: serde_json::json!({}),
333 row_hash: "h".into(),
334 cycle_id: 1,
335 timestamp: Utc::now(),
336 };
337 let r = DeltaResult {
338 created: vec![evt()],
339 updated: vec![evt(), evt()],
340 deleted: vec![evt()],
341 };
342 assert_eq!(r.total(), 4);
343 assert!(!r.is_empty());
344 }
345
346 #[test]
349 fn cycle_status_display() {
350 assert_eq!(CycleStatus::Running.to_string(), "running");
351 assert_eq!(CycleStatus::Success.to_string(), "success");
352 assert_eq!(CycleStatus::Failed.to_string(), "failed");
353 assert_eq!(CycleStatus::Aborted.to_string(), "aborted");
354 }
355
356 #[test]
357 fn cycle_status_serde() {
358 let json = serde_json::to_string(&CycleStatus::Aborted).unwrap();
359 assert_eq!(json, "\"aborted\"");
360 let back: CycleStatus = serde_json::from_str("\"success\"").unwrap();
361 assert_eq!(back, CycleStatus::Success);
362 }
363
364 #[test]
367 fn diff_all_created_when_no_previous() {
368 let prev = HashMap::new();
369 let rows = vec![
370 RawRow {
371 row_key: "a".into(),
372 row_data: serde_json::json!({"v": 1}),
373 },
374 RawRow {
375 row_key: "b".into(),
376 row_data: serde_json::json!({"v": 2}),
377 },
378 ];
379 let r = compute_diff(&prev, &rows, "src", "q", 1);
380 assert_eq!(r.created.len(), 2);
381 assert!(r.updated.is_empty());
382 assert!(r.deleted.is_empty());
383 assert!(r.created.iter().all(|e| e.op == OpType::Created));
384 }
385
386 #[test]
387 fn diff_all_deleted_when_no_current() {
388 let prev = HashMap::from([
389 ("a".into(), hash_row_data(&serde_json::json!({"v": 1}))),
390 ("b".into(), hash_row_data(&serde_json::json!({"v": 2}))),
391 ]);
392 let r = compute_diff(&prev, &[], "src", "q", 2);
393 assert!(r.created.is_empty());
394 assert!(r.updated.is_empty());
395 assert_eq!(r.deleted.len(), 2);
396 assert!(r.deleted.iter().all(|e| e.op == OpType::Deleted));
397 }
398
399 #[test]
400 fn diff_detects_update() {
401 let data_v1 = serde_json::json!({"name": "alice"});
402 let data_v2 = serde_json::json!({"name": "alice_updated"});
403 let prev = HashMap::from([("a".into(), hash_row_data(&data_v1))]);
404 let rows = vec![RawRow {
405 row_key: "a".into(),
406 row_data: data_v2,
407 }];
408 let r = compute_diff(&prev, &rows, "src", "q", 2);
409 assert!(r.created.is_empty());
410 assert_eq!(r.updated.len(), 1);
411 assert!(r.deleted.is_empty());
412 assert_eq!(r.updated[0].row_key, "a");
413 }
414
415 #[test]
416 fn diff_no_change_when_hash_matches() {
417 let data = serde_json::json!({"name": "alice"});
418 let prev = HashMap::from([("a".into(), hash_row_data(&data))]);
419 let rows = vec![RawRow {
420 row_key: "a".into(),
421 row_data: data,
422 }];
423 let r = compute_diff(&prev, &rows, "src", "q", 2);
424 assert!(r.is_empty());
425 }
426
427 #[test]
428 fn diff_mixed_operations() {
429 let data_a = serde_json::json!({"v": 1});
430 let data_b = serde_json::json!({"v": 2});
431 let data_b2 = serde_json::json!({"v": 3});
432 let prev = HashMap::from([
433 ("a".into(), hash_row_data(&data_a)),
434 ("b".into(), hash_row_data(&data_b)),
435 ("c".into(), hash_row_data(&serde_json::json!({"v": 99}))),
436 ]);
437 let rows = vec![
438 RawRow {
439 row_key: "a".into(),
440 row_data: data_a,
441 },
442 RawRow {
443 row_key: "b".into(),
444 row_data: data_b2,
445 },
446 RawRow {
447 row_key: "d".into(),
448 row_data: serde_json::json!({"v": 4}),
449 },
450 ];
451 let r = compute_diff(&prev, &rows, "src", "q", 3);
452 assert_eq!(r.created.len(), 1);
453 assert_eq!(r.created[0].row_key, "d");
454 assert_eq!(r.updated.len(), 1);
455 assert_eq!(r.updated[0].row_key, "b");
456 assert_eq!(r.deleted.len(), 1);
457 assert_eq!(r.deleted[0].row_key, "c");
458 }
459
460 #[test]
461 fn diff_preserves_source_and_query_ids() {
462 let rows = vec![RawRow {
463 row_key: "x".into(),
464 row_data: serde_json::json!({}),
465 }];
466 let r = compute_diff(&HashMap::new(), &rows, "my-src", "my-q", 7);
467 assert_eq!(r.created[0].origin_id, "my-src");
468 assert_eq!(r.created[0].query_id, "my-q");
469 assert_eq!(r.created[0].cycle_id, 7);
470 }
471
472 #[test]
473 fn kafka_auth_default_is_plaintext() {
474 let auth = KafkaAuth::default();
475 assert_eq!(auth.security_protocol, "PLAINTEXT");
476 assert!(auth.is_plaintext());
477 assert!(auth.sasl_mechanism.is_none());
478 }
479
480 #[test]
481 fn kafka_auth_deserializes_sasl_plain() {
482 let json = serde_json::json!({
483 "security_protocol": "SASL_SSL",
484 "sasl_mechanism": "PLAIN",
485 "sasl_username": "user",
486 "sasl_password": "pass",
487 "ssl_ca_location": "/path/to/ca.pem"
488 });
489 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
490 assert_eq!(auth.security_protocol, "SASL_SSL");
491 assert_eq!(auth.sasl_mechanism.as_deref(), Some("PLAIN"));
492 assert_eq!(auth.sasl_username.as_deref(), Some("user"));
493 assert_eq!(auth.sasl_password.as_deref(), Some("pass"));
494 assert_eq!(auth.ssl_ca_location.as_deref(), Some("/path/to/ca.pem"));
495 assert!(!auth.is_plaintext());
496 }
497
498 #[test]
499 fn kafka_auth_deserializes_kerberos() {
500 let json = serde_json::json!({
501 "security_protocol": "SASL_PLAINTEXT",
502 "sasl_mechanism": "GSSAPI",
503 "sasl_kerberos_keytab": "/etc/krb5.keytab",
504 "sasl_kerberos_principal": "kafka/broker@REALM"
505 });
506 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
507 assert_eq!(auth.sasl_mechanism.as_deref(), Some("GSSAPI"));
508 assert_eq!(
509 auth.sasl_kerberos_keytab.as_deref(),
510 Some("/etc/krb5.keytab")
511 );
512 assert_eq!(
513 auth.sasl_kerberos_principal.as_deref(),
514 Some("kafka/broker@REALM")
515 );
516 }
517
518 #[test]
519 fn diff_deleted_events_carry_previous_hash() {
520 let data = serde_json::json!({"k": 1});
521 let h = hash_row_data(&data);
522 let prev = HashMap::from([("gone".into(), h.clone())]);
523 let r = compute_diff(&prev, &[], "s", "q", 5);
524 assert_eq!(r.deleted[0].row_hash, h);
525 assert_eq!(r.deleted[0].row_data, serde_json::Value::Null);
526 }
527
528 #[test]
531 fn diff_empty_previous_empty_current() {
532 let r = compute_diff(&HashMap::new(), &[], "s", "q", 1);
533 assert!(r.is_empty());
534 }
535
536 #[test]
537 fn diff_unicode_keys() {
538 let rows = vec![
539 RawRow {
540 row_key: "日本語キー".into(),
541 row_data: serde_json::json!({"emoji": "🎉"}),
542 },
543 RawRow {
544 row_key: "clé-français".into(),
545 row_data: serde_json::json!({"val": "été"}),
546 },
547 ];
548 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
549 assert_eq!(r.created.len(), 2);
550 let keys: Vec<&str> = r.created.iter().map(|e| e.row_key.as_str()).collect();
551 assert!(keys.contains(&"日本語キー"));
552 assert!(keys.contains(&"clé-français"));
553 }
554
555 #[test]
556 fn diff_null_row_data() {
557 let rows = vec![RawRow {
558 row_key: "k1".into(),
559 row_data: serde_json::Value::Null,
560 }];
561 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
562 assert_eq!(r.created.len(), 1);
563 assert_eq!(r.created[0].row_data, serde_json::Value::Null);
564 }
565
566 #[test]
567 fn diff_empty_string_key() {
568 let rows = vec![RawRow {
569 row_key: "".into(),
570 row_data: serde_json::json!({"v": 1}),
571 }];
572 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
573 assert_eq!(r.created.len(), 1);
574 assert_eq!(r.created[0].row_key, "");
575 }
576
577 #[test]
578 fn diff_duplicate_keys_emit_multiple_created_events() {
579 let prev = HashMap::new();
580 let rows = vec![
581 RawRow {
582 row_key: "dup".into(),
583 row_data: serde_json::json!({"v": 1}),
584 },
585 RawRow {
586 row_key: "dup".into(),
587 row_data: serde_json::json!({"v": 2}),
588 },
589 ];
590 let r = compute_diff(&prev, &rows, "s", "q", 1);
591 assert_eq!(r.created.len(), 2);
592 }
593
594 #[test]
595 fn diff_large_json_data() {
596 let big_obj: serde_json::Value = (0..100)
597 .map(|i| (format!("field_{i}"), serde_json::json!(i)))
598 .collect::<serde_json::Map<String, serde_json::Value>>()
599 .into();
600 let rows = vec![RawRow {
601 row_key: "big".into(),
602 row_data: big_obj.clone(),
603 }];
604 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
605 assert_eq!(r.created.len(), 1);
606 assert_eq!(r.created[0].row_hash, hash_row_data(&big_obj));
607 }
608
609 #[test]
610 fn hash_null_value() {
611 let h = hash_row_data(&serde_json::Value::Null);
612 assert_eq!(h.len(), 64);
613 }
614
615 #[test]
616 fn hash_nested_objects_differ() {
617 let a = serde_json::json!({"nested": {"a": 1}});
618 let b = serde_json::json!({"nested": {"a": 2}});
619 assert_ne!(hash_row_data(&a), hash_row_data(&b));
620 }
621
622 #[test]
623 fn hash_array_values() {
624 let data = serde_json::json!({"items": [1, 2, 3]});
625 let h = hash_row_data(&data);
626 assert_eq!(h.len(), 64);
627 assert_ne!(h, hash_row_data(&serde_json::json!({"items": [1, 2, 4]})));
628 }
629
630 #[test]
633 fn event_envelope_from_delta_event() {
634 let evt = DeltaEvent {
635 op: OpType::Updated,
636 origin_id: "pg".into(),
637 query_id: "q1".into(),
638 row_key: "pk-42".into(),
639 row_data: serde_json::json!({"name": "test"}),
640 row_hash: "abc123".into(),
641 cycle_id: 10,
642 timestamp: Utc::now(),
643 };
644 let envelope = EventEnvelope::from(&evt);
645 assert_eq!(envelope.meta.op, OpType::Updated);
646 assert_eq!(envelope.meta.origin_id, "pg");
647 assert_eq!(envelope.meta.query_id, "q1");
648 assert_eq!(envelope.meta.key, "pk-42");
649 assert_eq!(envelope.meta.hash, "abc123");
650 assert_eq!(envelope.meta.cycle_id, 10);
651 assert_eq!(envelope.data, serde_json::json!({"name": "test"}));
652 }
653
654 #[test]
655 fn event_envelope_roundtrip_json() {
656 let evt = DeltaEvent {
657 op: OpType::Created,
658 origin_id: "s".into(),
659 query_id: "q".into(),
660 row_key: "k".into(),
661 row_data: serde_json::json!(null),
662 row_hash: "h".into(),
663 cycle_id: 1,
664 timestamp: Utc::now(),
665 };
666 let envelope = EventEnvelope::from(&evt);
667 let json = serde_json::to_string(&envelope).unwrap();
668 let back: EventEnvelope = serde_json::from_str(&json).unwrap();
669 assert_eq!(envelope, back);
670 }
671
672 #[test]
675 fn auth_config_bearer() {
676 let json = serde_json::json!({"type": "bearer", "token": "tok123"});
677 let auth: AuthConfig = serde_json::from_value(json).unwrap();
678 assert!(matches!(auth, AuthConfig::Bearer { token } if token == "tok123"));
679 }
680
681 #[test]
682 fn auth_config_header() {
683 let json = serde_json::json!({"type": "header", "name": "X-Api-Key", "value": "secret"});
684 let auth: AuthConfig = serde_json::from_value(json).unwrap();
685 assert!(
686 matches!(auth, AuthConfig::Header { name, value } if name == "X-Api-Key" && value == "secret")
687 );
688 }
689
690 #[test]
691 fn auth_config_basic() {
692 let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
693 let auth: AuthConfig = serde_json::from_value(json).unwrap();
694 assert!(
695 matches!(auth, AuthConfig::Basic { username, password } if username == "user" && password == "pass")
696 );
697 }
698
699 #[test]
700 fn auth_config_unknown_type_fails() {
701 let json = serde_json::json!({"type": "oauth2", "token": "t"});
702 assert!(serde_json::from_value::<AuthConfig>(json).is_err());
703 }
704
705 #[test]
706 fn kafka_auth_ssl_only() {
707 let json = serde_json::json!({
708 "security_protocol": "SSL",
709 "ssl_ca_location": "/ca.pem",
710 "ssl_certificate_location": "/cert.pem",
711 "ssl_key_location": "/key.pem"
712 });
713 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
714 assert_eq!(auth.security_protocol, "SSL");
715 assert!(!auth.is_plaintext());
716 assert!(auth.sasl_mechanism.is_none());
717 assert_eq!(auth.ssl_ca_location.as_deref(), Some("/ca.pem"));
718 assert_eq!(auth.ssl_certificate_location.as_deref(), Some("/cert.pem"));
719 assert_eq!(auth.ssl_key_location.as_deref(), Some("/key.pem"));
720 }
721
722 #[test]
723 fn kafka_auth_minimal_deserialize() {
724 let json = serde_json::json!({});
725 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
726 assert_eq!(auth.security_protocol, "PLAINTEXT");
727 assert!(auth.is_plaintext());
728 }
729}
730
731#[cfg(test)]
732mod prop_tests {
733 use super::*;
734 use proptest::prelude::*;
735 use std::collections::HashMap;
736
737 fn arb_json_value() -> impl Strategy<Value = serde_json::Value> {
738 prop_oneof![
739 Just(serde_json::Value::Null),
740 any::<bool>().prop_map(serde_json::Value::Bool),
741 any::<i64>().prop_map(|n| serde_json::json!(n)),
742 any::<f64>()
743 .prop_filter("must be finite", |f| f.is_finite())
744 .prop_map(|n| serde_json::json!(n)),
745 "[a-zA-Z0-9_ ]{0,50}".prop_map(serde_json::Value::String),
746 ]
747 }
748
749 fn arb_raw_row() -> impl Strategy<Value = RawRow> {
750 ("[a-zA-Z0-9_]{1,20}", arb_json_value()).prop_map(|(key, data)| RawRow {
751 row_key: key,
752 row_data: data,
753 })
754 }
755
756 fn arb_raw_rows() -> impl Strategy<Value = Vec<RawRow>> {
757 prop::collection::vec(arb_raw_row(), 0..50)
758 }
759
760 fn arb_previous_snapshot() -> impl Strategy<Value = HashMap<String, String>> {
761 prop::collection::vec(("[a-zA-Z0-9_]{1,20}", "[a-f0-9]{64}"), 0..50)
762 .prop_map(|pairs| pairs.into_iter().collect())
763 }
764
765 proptest! {
766 #[test]
767 fn created_and_deleted_are_disjoint(
768 previous in arb_previous_snapshot(),
769 current in arb_raw_rows(),
770 ) {
771 let result = compute_diff(&previous, ¤t, "o", "q", 1);
772 let created_keys: std::collections::HashSet<_> =
773 result.created.iter().map(|e| &e.row_key).collect();
774 let deleted_keys: std::collections::HashSet<_> =
775 result.deleted.iter().map(|e| &e.row_key).collect();
776 prop_assert!(created_keys.is_disjoint(&deleted_keys));
777 }
778
779 #[test]
780 fn created_and_updated_are_disjoint(
781 previous in arb_previous_snapshot(),
782 current in arb_raw_rows(),
783 ) {
784 let result = compute_diff(&previous, ¤t, "o", "q", 1);
785 let created_keys: std::collections::HashSet<_> =
786 result.created.iter().map(|e| &e.row_key).collect();
787 let updated_keys: std::collections::HashSet<_> =
788 result.updated.iter().map(|e| &e.row_key).collect();
789 prop_assert!(created_keys.is_disjoint(&updated_keys));
790 }
791
792 #[test]
793 fn deleted_keys_come_from_previous(
794 previous in arb_previous_snapshot(),
795 current in arb_raw_rows(),
796 ) {
797 let result = compute_diff(&previous, ¤t, "o", "q", 1);
798 for event in &result.deleted {
799 prop_assert!(previous.contains_key(&event.row_key));
800 }
801 }
802
803 #[test]
804 fn created_keys_not_in_previous(
805 previous in arb_previous_snapshot(),
806 current in arb_raw_rows(),
807 ) {
808 let result = compute_diff(&previous, ¤t, "o", "q", 1);
809 for event in &result.created {
810 prop_assert!(!previous.contains_key(&event.row_key));
811 }
812 }
813
814 #[test]
815 fn updated_keys_in_both(
816 previous in arb_previous_snapshot(),
817 current in arb_raw_rows(),
818 ) {
819 let current_keys: std::collections::HashSet<_> =
820 current.iter().map(|r| &r.row_key).collect();
821 let result = compute_diff(&previous, ¤t, "o", "q", 1);
822 for event in &result.updated {
823 prop_assert!(previous.contains_key(&event.row_key));
824 prop_assert!(current_keys.contains(&event.row_key));
825 }
826 }
827
828 #[test]
829 fn deleted_events_have_null_data(
830 previous in arb_previous_snapshot(),
831 current in arb_raw_rows(),
832 ) {
833 let result = compute_diff(&previous, ¤t, "o", "q", 1);
834 for event in &result.deleted {
835 prop_assert_eq!(&event.row_data, &serde_json::Value::Null);
836 prop_assert_eq!(event.op, OpType::Deleted);
837 }
838 }
839
840 #[test]
841 fn created_events_have_correct_op(
842 previous in arb_previous_snapshot(),
843 current in arb_raw_rows(),
844 ) {
845 let result = compute_diff(&previous, ¤t, "o", "q", 1);
846 for event in &result.created {
847 prop_assert_eq!(event.op, OpType::Created);
848 }
849 for event in &result.updated {
850 prop_assert_eq!(event.op, OpType::Updated);
851 }
852 }
853
854 #[test]
855 fn unique_keys_conservation(
856 previous in arb_previous_snapshot(),
857 current in arb_raw_rows(),
858 ) {
859 let mut deduped: HashMap<&str, &RawRow> = HashMap::new();
861 for row in ¤t {
862 deduped.insert(&row.row_key, row);
863 }
864 let unique_current_keys: std::collections::HashSet<&str> =
865 deduped.keys().copied().collect();
866
867 let result = compute_diff(&previous, ¤t, "o", "q", 1);
868
869 let created_keys: std::collections::HashSet<&str> =
871 result.created.iter().map(|e| e.row_key.as_str()).collect();
872 let updated_keys: std::collections::HashSet<&str> =
873 result.updated.iter().map(|e| e.row_key.as_str()).collect();
874
875 prop_assert!(created_keys.len() + updated_keys.len() <= unique_current_keys.len());
877
878 let deleted_keys: std::collections::HashSet<&str> =
880 result.deleted.iter().map(|e| e.row_key.as_str()).collect();
881 for dk in &deleted_keys {
882 prop_assert!(!unique_current_keys.contains(dk));
883 }
884 }
885
886 #[test]
887 fn empty_previous_means_no_updates_or_deletes(
888 current in arb_raw_rows(),
889 ) {
890 let previous = HashMap::new();
891 let result = compute_diff(&previous, ¤t, "o", "q", 1);
892 prop_assert!(result.deleted.is_empty());
893 prop_assert!(result.updated.is_empty());
894 prop_assert_eq!(result.created.len(), current.len());
896 }
897
898 #[test]
899 fn empty_current_means_all_deleted(
900 previous in arb_previous_snapshot(),
901 ) {
902 let current: Vec<RawRow> = vec![];
903 let result = compute_diff(&previous, ¤t, "o", "q", 1);
904 prop_assert!(result.created.is_empty());
905 prop_assert!(result.updated.is_empty());
906 prop_assert_eq!(result.deleted.len(), previous.len());
907 }
908
909 #[test]
910 fn hash_is_deterministic(data in arb_json_value()) {
911 let h1 = hash_row_data(&data);
912 let h2 = hash_row_data(&data);
913 prop_assert_eq!(h1, h2);
914 }
915
916 #[test]
917 fn hash_is_64_hex_chars(data in arb_json_value()) {
918 let h = hash_row_data(&data);
919 prop_assert_eq!(h.len(), 64);
920 prop_assert!(h.chars().all(|c| c.is_ascii_hexdigit()));
921 }
922 }
923}