1use std::collections::HashMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
8pub struct RawRow {
9 pub row_key: String,
10 pub row_data: serde_json::Value,
11}
12
13#[derive(Debug, Clone, Default)]
14pub struct DeltaResult {
15 pub created: Vec<DeltaEvent>,
16 pub updated: Vec<DeltaEvent>,
17 pub deleted: Vec<DeltaEvent>,
18}
19
20impl DeltaResult {
21 pub fn total(&self) -> usize {
22 self.created.len() + self.updated.len() + self.deleted.len()
23 }
24
25 pub fn is_empty(&self) -> bool {
26 self.total() == 0
27 }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub struct EventEnvelope {
33 pub meta: EventMeta,
34 pub data: serde_json::Value,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct EventMeta {
39 pub op: OpType,
40 pub origin_id: String,
41 pub query_id: String,
42 pub key: String,
43 pub hash: String,
44 pub cycle_id: u64,
45 pub timestamp: DateTime<Utc>,
46}
47
48impl From<&DeltaEvent> for EventEnvelope {
49 fn from(e: &DeltaEvent) -> Self {
50 Self {
51 meta: EventMeta {
52 op: e.op,
53 origin_id: e.origin_id.clone(),
54 query_id: e.query_id.clone(),
55 key: e.row_key.clone(),
56 hash: e.row_hash.clone(),
57 cycle_id: e.cycle_id,
58 timestamp: e.timestamp,
59 },
60 data: e.row_data.clone(),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum CycleStatus {
68 Running,
69 Success,
70 Failed,
71 Aborted,
72}
73
74impl std::fmt::Display for CycleStatus {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 Self::Running => write!(f, "running"),
78 Self::Success => write!(f, "success"),
79 Self::Failed => write!(f, "failed"),
80 Self::Aborted => write!(f, "aborted"),
81 }
82 }
83}
84
85pub fn compute_diff(
86 previous: &HashMap<String, String>,
87 current: &[RawRow],
88 origin_id: &str,
89 query_id: &str,
90 cycle_id: u64,
91) -> DeltaResult {
92 let now = Utc::now();
93 let mut result = DeltaResult::default();
94 let mut seen_keys = std::collections::HashSet::new();
95
96 for row in current {
97 seen_keys.insert(row.row_key.clone());
98 let row_hash = hash_row_data(&row.row_data);
99
100 match previous.get(&row.row_key) {
101 None => {
102 result.created.push(DeltaEvent {
103 op: OpType::Created,
104 origin_id: origin_id.into(),
105 query_id: query_id.into(),
106 row_key: row.row_key.clone(),
107 row_data: row.row_data.clone(),
108 row_hash,
109 cycle_id,
110 timestamp: now,
111 });
112 }
113 Some(prev_hash) if *prev_hash != row_hash => {
114 result.updated.push(DeltaEvent {
115 op: OpType::Updated,
116 origin_id: origin_id.into(),
117 query_id: query_id.into(),
118 row_key: row.row_key.clone(),
119 row_data: row.row_data.clone(),
120 row_hash,
121 cycle_id,
122 timestamp: now,
123 });
124 }
125 _ => {}
126 }
127 }
128
129 for (key, hash) in previous {
130 if !seen_keys.contains(key) {
131 result.deleted.push(DeltaEvent {
132 op: OpType::Deleted,
133 origin_id: origin_id.into(),
134 query_id: query_id.into(),
135 row_key: key.clone(),
136 row_data: serde_json::Value::Null,
137 row_hash: hash.clone(),
138 cycle_id,
139 timestamp: now,
140 });
141 }
142 }
143
144 result
145}
146
147#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum OpType {
150 Created,
151 Updated,
152 Deleted,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
156pub struct DeltaEvent {
157 pub op: OpType,
158 pub origin_id: String,
159 pub query_id: String,
160 pub row_key: String,
161 pub row_data: serde_json::Value,
162 pub row_hash: String,
163 pub cycle_id: u64,
164 pub timestamp: DateTime<Utc>,
165}
166
167pub fn hash_row_data(data: &serde_json::Value) -> String {
168 let serialized = serde_json::to_string(data).expect("serde_json::Value is always serializable");
169 let mut hasher = Sha256::new();
170 hasher.update(serialized.as_bytes());
171 hex::encode(hasher.finalize())
172}
173
174impl std::fmt::Display for OpType {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 match self {
177 OpType::Created => write!(f, "created"),
178 OpType::Updated => write!(f, "updated"),
179 OpType::Deleted => write!(f, "deleted"),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Deserialize)]
185#[serde(tag = "type", rename_all = "snake_case")]
186pub enum AuthConfig {
187 Bearer { token: String },
188 Header { name: String, value: String },
189 Basic { username: String, password: String },
190}
191
192#[derive(Debug, Clone, Deserialize)]
193pub struct KafkaAuth {
194 #[serde(default = "default_security_protocol")]
195 pub security_protocol: String,
196 pub sasl_mechanism: Option<String>,
197 pub sasl_username: Option<String>,
198 pub sasl_password: Option<String>,
199 pub sasl_kerberos_keytab: Option<String>,
200 pub sasl_kerberos_principal: Option<String>,
201 pub ssl_ca_location: Option<String>,
202 pub ssl_certificate_location: Option<String>,
203 pub ssl_key_location: Option<String>,
204}
205
206fn default_security_protocol() -> String {
207 "PLAINTEXT".to_string()
208}
209
210impl Default for KafkaAuth {
211 fn default() -> Self {
212 Self {
213 security_protocol: default_security_protocol(),
214 sasl_mechanism: None,
215 sasl_username: None,
216 sasl_password: None,
217 sasl_kerberos_keytab: None,
218 sasl_kerberos_principal: None,
219 ssl_ca_location: None,
220 ssl_certificate_location: None,
221 ssl_key_location: None,
222 }
223 }
224}
225
226impl KafkaAuth {
227 pub fn is_plaintext(&self) -> bool {
228 self.security_protocol == "PLAINTEXT"
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn hash_is_deterministic() {
238 let data = serde_json::json!({"name": "alice", "age": 30});
239 assert_eq!(hash_row_data(&data), hash_row_data(&data));
240 }
241
242 #[test]
243 fn hash_differs_for_different_data() {
244 let a = serde_json::json!({"name": "alice"});
245 let b = serde_json::json!({"name": "bob"});
246 assert_ne!(hash_row_data(&a), hash_row_data(&b));
247 }
248
249 #[test]
250 fn hash_is_64_char_hex() {
251 let hash = hash_row_data(&serde_json::json!({}));
252 assert_eq!(hash.len(), 64);
253 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
254 }
255
256 #[test]
257 fn op_type_display() {
258 assert_eq!(OpType::Created.to_string(), "created");
259 assert_eq!(OpType::Updated.to_string(), "updated");
260 assert_eq!(OpType::Deleted.to_string(), "deleted");
261 }
262
263 #[test]
264 fn raw_row_roundtrip_json() {
265 let row = RawRow {
266 row_key: "pk-1".into(),
267 row_data: serde_json::json!({"col": "val"}),
268 };
269 let json = serde_json::to_string(&row).unwrap();
270 let back: RawRow = serde_json::from_str(&json).unwrap();
271 assert_eq!(row, back);
272 }
273
274 #[test]
275 fn delta_event_roundtrip_json() {
276 let evt = DeltaEvent {
277 op: OpType::Created,
278 origin_id: "pg-prod".into(),
279 query_id: "q1".into(),
280 row_key: "pk-1".into(),
281 row_data: serde_json::json!({"x": 1}),
282 row_hash: hash_row_data(&serde_json::json!({"x": 1})),
283 cycle_id: 42,
284 timestamp: chrono::Utc::now(),
285 };
286 let json = serde_json::to_string(&evt).unwrap();
287 let back: DeltaEvent = serde_json::from_str(&json).unwrap();
288 assert_eq!(evt.op, back.op);
289 assert_eq!(evt.row_key, back.row_key);
290 assert_eq!(evt.cycle_id, back.cycle_id);
291 }
292
293 #[test]
294 fn op_type_serde_snake_case() {
295 let json = serde_json::to_string(&OpType::Created).unwrap();
296 assert_eq!(json, "\"created\"");
297 let back: OpType = serde_json::from_str("\"updated\"").unwrap();
298 assert_eq!(back, OpType::Updated);
299 }
300
301 #[test]
304 fn delta_result_empty_by_default() {
305 let r = DeltaResult::default();
306 assert!(r.is_empty());
307 assert_eq!(r.total(), 0);
308 }
309
310 #[test]
311 fn delta_result_total_counts_all() {
312 let evt = || DeltaEvent {
313 op: OpType::Created,
314 origin_id: "s".into(),
315 query_id: "q".into(),
316 row_key: "k".into(),
317 row_data: serde_json::json!({}),
318 row_hash: "h".into(),
319 cycle_id: 1,
320 timestamp: Utc::now(),
321 };
322 let r = DeltaResult {
323 created: vec![evt()],
324 updated: vec![evt(), evt()],
325 deleted: vec![evt()],
326 };
327 assert_eq!(r.total(), 4);
328 assert!(!r.is_empty());
329 }
330
331 #[test]
334 fn cycle_status_display() {
335 assert_eq!(CycleStatus::Running.to_string(), "running");
336 assert_eq!(CycleStatus::Success.to_string(), "success");
337 assert_eq!(CycleStatus::Failed.to_string(), "failed");
338 assert_eq!(CycleStatus::Aborted.to_string(), "aborted");
339 }
340
341 #[test]
342 fn cycle_status_serde() {
343 let json = serde_json::to_string(&CycleStatus::Aborted).unwrap();
344 assert_eq!(json, "\"aborted\"");
345 let back: CycleStatus = serde_json::from_str("\"success\"").unwrap();
346 assert_eq!(back, CycleStatus::Success);
347 }
348
349 #[test]
352 fn diff_all_created_when_no_previous() {
353 let prev = HashMap::new();
354 let rows = vec![
355 RawRow {
356 row_key: "a".into(),
357 row_data: serde_json::json!({"v": 1}),
358 },
359 RawRow {
360 row_key: "b".into(),
361 row_data: serde_json::json!({"v": 2}),
362 },
363 ];
364 let r = compute_diff(&prev, &rows, "src", "q", 1);
365 assert_eq!(r.created.len(), 2);
366 assert!(r.updated.is_empty());
367 assert!(r.deleted.is_empty());
368 assert!(r.created.iter().all(|e| e.op == OpType::Created));
369 }
370
371 #[test]
372 fn diff_all_deleted_when_no_current() {
373 let prev = HashMap::from([
374 ("a".into(), hash_row_data(&serde_json::json!({"v": 1}))),
375 ("b".into(), hash_row_data(&serde_json::json!({"v": 2}))),
376 ]);
377 let r = compute_diff(&prev, &[], "src", "q", 2);
378 assert!(r.created.is_empty());
379 assert!(r.updated.is_empty());
380 assert_eq!(r.deleted.len(), 2);
381 assert!(r.deleted.iter().all(|e| e.op == OpType::Deleted));
382 }
383
384 #[test]
385 fn diff_detects_update() {
386 let data_v1 = serde_json::json!({"name": "alice"});
387 let data_v2 = serde_json::json!({"name": "alice_updated"});
388 let prev = HashMap::from([("a".into(), hash_row_data(&data_v1))]);
389 let rows = vec![RawRow {
390 row_key: "a".into(),
391 row_data: data_v2,
392 }];
393 let r = compute_diff(&prev, &rows, "src", "q", 2);
394 assert!(r.created.is_empty());
395 assert_eq!(r.updated.len(), 1);
396 assert!(r.deleted.is_empty());
397 assert_eq!(r.updated[0].row_key, "a");
398 }
399
400 #[test]
401 fn diff_no_change_when_hash_matches() {
402 let data = serde_json::json!({"name": "alice"});
403 let prev = HashMap::from([("a".into(), hash_row_data(&data))]);
404 let rows = vec![RawRow {
405 row_key: "a".into(),
406 row_data: data,
407 }];
408 let r = compute_diff(&prev, &rows, "src", "q", 2);
409 assert!(r.is_empty());
410 }
411
412 #[test]
413 fn diff_mixed_operations() {
414 let data_a = serde_json::json!({"v": 1});
415 let data_b = serde_json::json!({"v": 2});
416 let data_b2 = serde_json::json!({"v": 3});
417 let prev = HashMap::from([
418 ("a".into(), hash_row_data(&data_a)),
419 ("b".into(), hash_row_data(&data_b)),
420 ("c".into(), hash_row_data(&serde_json::json!({"v": 99}))),
421 ]);
422 let rows = vec![
423 RawRow {
424 row_key: "a".into(),
425 row_data: data_a,
426 },
427 RawRow {
428 row_key: "b".into(),
429 row_data: data_b2,
430 },
431 RawRow {
432 row_key: "d".into(),
433 row_data: serde_json::json!({"v": 4}),
434 },
435 ];
436 let r = compute_diff(&prev, &rows, "src", "q", 3);
437 assert_eq!(r.created.len(), 1);
438 assert_eq!(r.created[0].row_key, "d");
439 assert_eq!(r.updated.len(), 1);
440 assert_eq!(r.updated[0].row_key, "b");
441 assert_eq!(r.deleted.len(), 1);
442 assert_eq!(r.deleted[0].row_key, "c");
443 }
444
445 #[test]
446 fn diff_preserves_source_and_query_ids() {
447 let rows = vec![RawRow {
448 row_key: "x".into(),
449 row_data: serde_json::json!({}),
450 }];
451 let r = compute_diff(&HashMap::new(), &rows, "my-src", "my-q", 7);
452 assert_eq!(r.created[0].origin_id, "my-src");
453 assert_eq!(r.created[0].query_id, "my-q");
454 assert_eq!(r.created[0].cycle_id, 7);
455 }
456
457 #[test]
458 fn kafka_auth_default_is_plaintext() {
459 let auth = KafkaAuth::default();
460 assert_eq!(auth.security_protocol, "PLAINTEXT");
461 assert!(auth.is_plaintext());
462 assert!(auth.sasl_mechanism.is_none());
463 }
464
465 #[test]
466 fn kafka_auth_deserializes_sasl_plain() {
467 let json = serde_json::json!({
468 "security_protocol": "SASL_SSL",
469 "sasl_mechanism": "PLAIN",
470 "sasl_username": "user",
471 "sasl_password": "pass",
472 "ssl_ca_location": "/path/to/ca.pem"
473 });
474 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
475 assert_eq!(auth.security_protocol, "SASL_SSL");
476 assert_eq!(auth.sasl_mechanism.as_deref(), Some("PLAIN"));
477 assert_eq!(auth.sasl_username.as_deref(), Some("user"));
478 assert_eq!(auth.sasl_password.as_deref(), Some("pass"));
479 assert_eq!(auth.ssl_ca_location.as_deref(), Some("/path/to/ca.pem"));
480 assert!(!auth.is_plaintext());
481 }
482
483 #[test]
484 fn kafka_auth_deserializes_kerberos() {
485 let json = serde_json::json!({
486 "security_protocol": "SASL_PLAINTEXT",
487 "sasl_mechanism": "GSSAPI",
488 "sasl_kerberos_keytab": "/etc/krb5.keytab",
489 "sasl_kerberos_principal": "kafka/broker@REALM"
490 });
491 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
492 assert_eq!(auth.sasl_mechanism.as_deref(), Some("GSSAPI"));
493 assert_eq!(
494 auth.sasl_kerberos_keytab.as_deref(),
495 Some("/etc/krb5.keytab")
496 );
497 assert_eq!(
498 auth.sasl_kerberos_principal.as_deref(),
499 Some("kafka/broker@REALM")
500 );
501 }
502
503 #[test]
504 fn diff_deleted_events_carry_previous_hash() {
505 let data = serde_json::json!({"k": 1});
506 let h = hash_row_data(&data);
507 let prev = HashMap::from([("gone".into(), h.clone())]);
508 let r = compute_diff(&prev, &[], "s", "q", 5);
509 assert_eq!(r.deleted[0].row_hash, h);
510 assert_eq!(r.deleted[0].row_data, serde_json::Value::Null);
511 }
512
513 #[test]
516 fn diff_empty_previous_empty_current() {
517 let r = compute_diff(&HashMap::new(), &[], "s", "q", 1);
518 assert!(r.is_empty());
519 }
520
521 #[test]
522 fn diff_unicode_keys() {
523 let rows = vec![
524 RawRow {
525 row_key: "日本語キー".into(),
526 row_data: serde_json::json!({"emoji": "🎉"}),
527 },
528 RawRow {
529 row_key: "clé-français".into(),
530 row_data: serde_json::json!({"val": "été"}),
531 },
532 ];
533 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
534 assert_eq!(r.created.len(), 2);
535 let keys: Vec<&str> = r.created.iter().map(|e| e.row_key.as_str()).collect();
536 assert!(keys.contains(&"日本語キー"));
537 assert!(keys.contains(&"clé-français"));
538 }
539
540 #[test]
541 fn diff_null_row_data() {
542 let rows = vec![RawRow {
543 row_key: "k1".into(),
544 row_data: serde_json::Value::Null,
545 }];
546 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
547 assert_eq!(r.created.len(), 1);
548 assert_eq!(r.created[0].row_data, serde_json::Value::Null);
549 }
550
551 #[test]
552 fn diff_empty_string_key() {
553 let rows = vec![RawRow {
554 row_key: "".into(),
555 row_data: serde_json::json!({"v": 1}),
556 }];
557 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
558 assert_eq!(r.created.len(), 1);
559 assert_eq!(r.created[0].row_key, "");
560 }
561
562 #[test]
563 fn diff_duplicate_keys_emit_multiple_created_events() {
564 let prev = HashMap::new();
565 let rows = vec![
566 RawRow {
567 row_key: "dup".into(),
568 row_data: serde_json::json!({"v": 1}),
569 },
570 RawRow {
571 row_key: "dup".into(),
572 row_data: serde_json::json!({"v": 2}),
573 },
574 ];
575 let r = compute_diff(&prev, &rows, "s", "q", 1);
576 assert_eq!(r.created.len(), 2);
577 }
578
579 #[test]
580 fn diff_large_json_data() {
581 let big_obj: serde_json::Value = (0..100)
582 .map(|i| (format!("field_{i}"), serde_json::json!(i)))
583 .collect::<serde_json::Map<String, serde_json::Value>>()
584 .into();
585 let rows = vec![RawRow {
586 row_key: "big".into(),
587 row_data: big_obj.clone(),
588 }];
589 let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
590 assert_eq!(r.created.len(), 1);
591 assert_eq!(r.created[0].row_hash, hash_row_data(&big_obj));
592 }
593
594 #[test]
595 fn hash_null_value() {
596 let h = hash_row_data(&serde_json::Value::Null);
597 assert_eq!(h.len(), 64);
598 }
599
600 #[test]
601 fn hash_nested_objects_differ() {
602 let a = serde_json::json!({"nested": {"a": 1}});
603 let b = serde_json::json!({"nested": {"a": 2}});
604 assert_ne!(hash_row_data(&a), hash_row_data(&b));
605 }
606
607 #[test]
608 fn hash_array_values() {
609 let data = serde_json::json!({"items": [1, 2, 3]});
610 let h = hash_row_data(&data);
611 assert_eq!(h.len(), 64);
612 assert_ne!(h, hash_row_data(&serde_json::json!({"items": [1, 2, 4]})));
613 }
614
615 #[test]
618 fn event_envelope_from_delta_event() {
619 let evt = DeltaEvent {
620 op: OpType::Updated,
621 origin_id: "pg".into(),
622 query_id: "q1".into(),
623 row_key: "pk-42".into(),
624 row_data: serde_json::json!({"name": "test"}),
625 row_hash: "abc123".into(),
626 cycle_id: 10,
627 timestamp: Utc::now(),
628 };
629 let envelope = EventEnvelope::from(&evt);
630 assert_eq!(envelope.meta.op, OpType::Updated);
631 assert_eq!(envelope.meta.origin_id, "pg");
632 assert_eq!(envelope.meta.query_id, "q1");
633 assert_eq!(envelope.meta.key, "pk-42");
634 assert_eq!(envelope.meta.hash, "abc123");
635 assert_eq!(envelope.meta.cycle_id, 10);
636 assert_eq!(envelope.data, serde_json::json!({"name": "test"}));
637 }
638
639 #[test]
640 fn event_envelope_roundtrip_json() {
641 let evt = DeltaEvent {
642 op: OpType::Created,
643 origin_id: "s".into(),
644 query_id: "q".into(),
645 row_key: "k".into(),
646 row_data: serde_json::json!(null),
647 row_hash: "h".into(),
648 cycle_id: 1,
649 timestamp: Utc::now(),
650 };
651 let envelope = EventEnvelope::from(&evt);
652 let json = serde_json::to_string(&envelope).unwrap();
653 let back: EventEnvelope = serde_json::from_str(&json).unwrap();
654 assert_eq!(envelope, back);
655 }
656
657 #[test]
660 fn auth_config_bearer() {
661 let json = serde_json::json!({"type": "bearer", "token": "tok123"});
662 let auth: AuthConfig = serde_json::from_value(json).unwrap();
663 assert!(matches!(auth, AuthConfig::Bearer { token } if token == "tok123"));
664 }
665
666 #[test]
667 fn auth_config_header() {
668 let json = serde_json::json!({"type": "header", "name": "X-Api-Key", "value": "secret"});
669 let auth: AuthConfig = serde_json::from_value(json).unwrap();
670 assert!(
671 matches!(auth, AuthConfig::Header { name, value } if name == "X-Api-Key" && value == "secret")
672 );
673 }
674
675 #[test]
676 fn auth_config_basic() {
677 let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
678 let auth: AuthConfig = serde_json::from_value(json).unwrap();
679 assert!(
680 matches!(auth, AuthConfig::Basic { username, password } if username == "user" && password == "pass")
681 );
682 }
683
684 #[test]
685 fn auth_config_unknown_type_fails() {
686 let json = serde_json::json!({"type": "oauth2", "token": "t"});
687 assert!(serde_json::from_value::<AuthConfig>(json).is_err());
688 }
689
690 #[test]
691 fn kafka_auth_ssl_only() {
692 let json = serde_json::json!({
693 "security_protocol": "SSL",
694 "ssl_ca_location": "/ca.pem",
695 "ssl_certificate_location": "/cert.pem",
696 "ssl_key_location": "/key.pem"
697 });
698 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
699 assert_eq!(auth.security_protocol, "SSL");
700 assert!(!auth.is_plaintext());
701 assert!(auth.sasl_mechanism.is_none());
702 assert_eq!(auth.ssl_ca_location.as_deref(), Some("/ca.pem"));
703 assert_eq!(auth.ssl_certificate_location.as_deref(), Some("/cert.pem"));
704 assert_eq!(auth.ssl_key_location.as_deref(), Some("/key.pem"));
705 }
706
707 #[test]
708 fn kafka_auth_minimal_deserialize() {
709 let json = serde_json::json!({});
710 let auth: KafkaAuth = serde_json::from_value(json).unwrap();
711 assert_eq!(auth.security_protocol, "PLAINTEXT");
712 assert!(auth.is_plaintext());
713 }
714}
715
716#[cfg(test)]
717mod prop_tests {
718 use super::*;
719 use proptest::prelude::*;
720 use std::collections::HashMap;
721
722 fn arb_json_value() -> impl Strategy<Value = serde_json::Value> {
723 prop_oneof![
724 Just(serde_json::Value::Null),
725 any::<bool>().prop_map(serde_json::Value::Bool),
726 any::<i64>().prop_map(|n| serde_json::json!(n)),
727 any::<f64>()
728 .prop_filter("must be finite", |f| f.is_finite())
729 .prop_map(|n| serde_json::json!(n)),
730 "[a-zA-Z0-9_ ]{0,50}".prop_map(serde_json::Value::String),
731 ]
732 }
733
734 fn arb_raw_row() -> impl Strategy<Value = RawRow> {
735 ("[a-zA-Z0-9_]{1,20}", arb_json_value()).prop_map(|(key, data)| RawRow {
736 row_key: key,
737 row_data: data,
738 })
739 }
740
741 fn arb_raw_rows() -> impl Strategy<Value = Vec<RawRow>> {
742 prop::collection::vec(arb_raw_row(), 0..50)
743 }
744
745 fn arb_previous_snapshot() -> impl Strategy<Value = HashMap<String, String>> {
746 prop::collection::vec(("[a-zA-Z0-9_]{1,20}", "[a-f0-9]{64}"), 0..50)
747 .prop_map(|pairs| pairs.into_iter().collect())
748 }
749
750 proptest! {
751 #[test]
752 fn created_and_deleted_are_disjoint(
753 previous in arb_previous_snapshot(),
754 current in arb_raw_rows(),
755 ) {
756 let result = compute_diff(&previous, ¤t, "o", "q", 1);
757 let created_keys: std::collections::HashSet<_> =
758 result.created.iter().map(|e| &e.row_key).collect();
759 let deleted_keys: std::collections::HashSet<_> =
760 result.deleted.iter().map(|e| &e.row_key).collect();
761 prop_assert!(created_keys.is_disjoint(&deleted_keys));
762 }
763
764 #[test]
765 fn created_and_updated_are_disjoint(
766 previous in arb_previous_snapshot(),
767 current in arb_raw_rows(),
768 ) {
769 let result = compute_diff(&previous, ¤t, "o", "q", 1);
770 let created_keys: std::collections::HashSet<_> =
771 result.created.iter().map(|e| &e.row_key).collect();
772 let updated_keys: std::collections::HashSet<_> =
773 result.updated.iter().map(|e| &e.row_key).collect();
774 prop_assert!(created_keys.is_disjoint(&updated_keys));
775 }
776
777 #[test]
778 fn deleted_keys_come_from_previous(
779 previous in arb_previous_snapshot(),
780 current in arb_raw_rows(),
781 ) {
782 let result = compute_diff(&previous, ¤t, "o", "q", 1);
783 for event in &result.deleted {
784 prop_assert!(previous.contains_key(&event.row_key));
785 }
786 }
787
788 #[test]
789 fn created_keys_not_in_previous(
790 previous in arb_previous_snapshot(),
791 current in arb_raw_rows(),
792 ) {
793 let result = compute_diff(&previous, ¤t, "o", "q", 1);
794 for event in &result.created {
795 prop_assert!(!previous.contains_key(&event.row_key));
796 }
797 }
798
799 #[test]
800 fn updated_keys_in_both(
801 previous in arb_previous_snapshot(),
802 current in arb_raw_rows(),
803 ) {
804 let current_keys: std::collections::HashSet<_> =
805 current.iter().map(|r| &r.row_key).collect();
806 let result = compute_diff(&previous, ¤t, "o", "q", 1);
807 for event in &result.updated {
808 prop_assert!(previous.contains_key(&event.row_key));
809 prop_assert!(current_keys.contains(&event.row_key));
810 }
811 }
812
813 #[test]
814 fn deleted_events_have_null_data(
815 previous in arb_previous_snapshot(),
816 current in arb_raw_rows(),
817 ) {
818 let result = compute_diff(&previous, ¤t, "o", "q", 1);
819 for event in &result.deleted {
820 prop_assert_eq!(&event.row_data, &serde_json::Value::Null);
821 prop_assert_eq!(event.op, OpType::Deleted);
822 }
823 }
824
825 #[test]
826 fn created_events_have_correct_op(
827 previous in arb_previous_snapshot(),
828 current in arb_raw_rows(),
829 ) {
830 let result = compute_diff(&previous, ¤t, "o", "q", 1);
831 for event in &result.created {
832 prop_assert_eq!(event.op, OpType::Created);
833 }
834 for event in &result.updated {
835 prop_assert_eq!(event.op, OpType::Updated);
836 }
837 }
838
839 #[test]
840 fn unique_keys_conservation(
841 previous in arb_previous_snapshot(),
842 current in arb_raw_rows(),
843 ) {
844 let mut deduped: HashMap<&str, &RawRow> = HashMap::new();
846 for row in ¤t {
847 deduped.insert(&row.row_key, row);
848 }
849 let unique_current_keys: std::collections::HashSet<&str> =
850 deduped.keys().copied().collect();
851
852 let result = compute_diff(&previous, ¤t, "o", "q", 1);
853
854 let created_keys: std::collections::HashSet<&str> =
856 result.created.iter().map(|e| e.row_key.as_str()).collect();
857 let updated_keys: std::collections::HashSet<&str> =
858 result.updated.iter().map(|e| e.row_key.as_str()).collect();
859
860 prop_assert!(created_keys.len() + updated_keys.len() <= unique_current_keys.len());
862
863 let deleted_keys: std::collections::HashSet<&str> =
865 result.deleted.iter().map(|e| e.row_key.as_str()).collect();
866 for dk in &deleted_keys {
867 prop_assert!(!unique_current_keys.contains(dk));
868 }
869 }
870
871 #[test]
872 fn empty_previous_means_no_updates_or_deletes(
873 current in arb_raw_rows(),
874 ) {
875 let previous = HashMap::new();
876 let result = compute_diff(&previous, ¤t, "o", "q", 1);
877 prop_assert!(result.deleted.is_empty());
878 prop_assert!(result.updated.is_empty());
879 prop_assert_eq!(result.created.len(), current.len());
881 }
882
883 #[test]
884 fn empty_current_means_all_deleted(
885 previous in arb_previous_snapshot(),
886 ) {
887 let current: Vec<RawRow> = vec![];
888 let result = compute_diff(&previous, ¤t, "o", "q", 1);
889 prop_assert!(result.created.is_empty());
890 prop_assert!(result.updated.is_empty());
891 prop_assert_eq!(result.deleted.len(), previous.len());
892 }
893
894 #[test]
895 fn hash_is_deterministic(data in arb_json_value()) {
896 let h1 = hash_row_data(&data);
897 let h2 = hash_row_data(&data);
898 prop_assert_eq!(h1, h2);
899 }
900
901 #[test]
902 fn hash_is_64_hex_chars(data in arb_json_value()) {
903 let h = hash_row_data(&data);
904 prop_assert_eq!(h.len(), 64);
905 prop_assert!(h.chars().all(|c| c.is_ascii_hexdigit()));
906 }
907 }
908}