Skip to main content

oversync_core/
model.rs

1use std::collections::HashMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
8pub struct RawRow {
9	pub row_key: String,
10	pub row_data: serde_json::Value,
11}
12
13#[derive(Debug, Clone, Default)]
14pub struct DeltaResult {
15	pub created: Vec<DeltaEvent>,
16	pub updated: Vec<DeltaEvent>,
17	pub deleted: Vec<DeltaEvent>,
18}
19
20impl DeltaResult {
21	pub fn total(&self) -> usize {
22		self.created.len() + self.updated.len() + self.deleted.len()
23	}
24
25	pub fn is_empty(&self) -> bool {
26		self.total() == 0
27	}
28}
29
30/// Wire format for sink delivery. Each event is one message.
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub struct EventEnvelope {
33	pub meta: EventMeta,
34	pub data: serde_json::Value,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct EventMeta {
39	pub op: OpType,
40	pub origin_id: String,
41	pub query_id: String,
42	pub key: String,
43	pub hash: String,
44	pub cycle_id: u64,
45	pub timestamp: DateTime<Utc>,
46}
47
48impl From<&DeltaEvent> for EventEnvelope {
49	fn from(e: &DeltaEvent) -> Self {
50		Self {
51			meta: EventMeta {
52				op: e.op,
53				origin_id: e.origin_id.clone(),
54				query_id: e.query_id.clone(),
55				key: e.row_key.clone(),
56				hash: e.row_hash.clone(),
57				cycle_id: e.cycle_id,
58				timestamp: e.timestamp,
59			},
60			data: e.row_data.clone(),
61		}
62	}
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum CycleStatus {
68	Running,
69	Success,
70	Failed,
71	Aborted,
72}
73
74impl std::fmt::Display for CycleStatus {
75	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76		match self {
77			Self::Running => write!(f, "running"),
78			Self::Success => write!(f, "success"),
79			Self::Failed => write!(f, "failed"),
80			Self::Aborted => write!(f, "aborted"),
81		}
82	}
83}
84
85pub fn compute_diff(
86	previous: &HashMap<String, String>,
87	current: &[RawRow],
88	origin_id: &str,
89	query_id: &str,
90	cycle_id: u64,
91) -> DeltaResult {
92	let now = Utc::now();
93	let mut result = DeltaResult::default();
94	let mut seen_keys = std::collections::HashSet::new();
95
96	for row in current {
97		seen_keys.insert(row.row_key.clone());
98		let row_hash = hash_row_data(&row.row_data);
99
100		match previous.get(&row.row_key) {
101			None => {
102				result.created.push(DeltaEvent {
103					op: OpType::Created,
104					origin_id: origin_id.into(),
105					query_id: query_id.into(),
106					row_key: row.row_key.clone(),
107					row_data: row.row_data.clone(),
108					row_hash,
109					cycle_id,
110					timestamp: now,
111				});
112			}
113			Some(prev_hash) if *prev_hash != row_hash => {
114				result.updated.push(DeltaEvent {
115					op: OpType::Updated,
116					origin_id: origin_id.into(),
117					query_id: query_id.into(),
118					row_key: row.row_key.clone(),
119					row_data: row.row_data.clone(),
120					row_hash,
121					cycle_id,
122					timestamp: now,
123				});
124			}
125			_ => {}
126		}
127	}
128
129	for (key, hash) in previous {
130		if !seen_keys.contains(key) {
131			result.deleted.push(DeltaEvent {
132				op: OpType::Deleted,
133				origin_id: origin_id.into(),
134				query_id: query_id.into(),
135				row_key: key.clone(),
136				row_data: serde_json::Value::Null,
137				row_hash: hash.clone(),
138				cycle_id,
139				timestamp: now,
140			});
141		}
142	}
143
144	result
145}
146
147#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum OpType {
150	Created,
151	Updated,
152	Deleted,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
156pub struct DeltaEvent {
157	pub op: OpType,
158	pub origin_id: String,
159	pub query_id: String,
160	pub row_key: String,
161	pub row_data: serde_json::Value,
162	pub row_hash: String,
163	pub cycle_id: u64,
164	pub timestamp: DateTime<Utc>,
165}
166
167pub fn hash_row_data(data: &serde_json::Value) -> String {
168	let serialized = serde_json::to_string(data).expect("serde_json::Value is always serializable");
169	let mut hasher = Sha256::new();
170	hasher.update(serialized.as_bytes());
171	hex::encode(hasher.finalize())
172}
173
174impl std::fmt::Display for OpType {
175	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176		match self {
177			OpType::Created => write!(f, "created"),
178			OpType::Updated => write!(f, "updated"),
179			OpType::Deleted => write!(f, "deleted"),
180		}
181	}
182}
183
184#[derive(Debug, Clone, Deserialize)]
185#[serde(tag = "type", rename_all = "snake_case")]
186pub enum AuthConfig {
187	Bearer { token: String },
188	Header { name: String, value: String },
189	Basic { username: String, password: String },
190}
191
192#[derive(Debug, Clone, Deserialize)]
193pub struct KafkaAuth {
194	#[serde(default = "default_security_protocol")]
195	pub security_protocol: String,
196	pub sasl_mechanism: Option<String>,
197	pub sasl_username: Option<String>,
198	pub sasl_password: Option<String>,
199	pub sasl_kerberos_keytab: Option<String>,
200	pub sasl_kerberos_principal: Option<String>,
201	pub ssl_ca_location: Option<String>,
202	pub ssl_certificate_location: Option<String>,
203	pub ssl_key_location: Option<String>,
204}
205
206fn default_security_protocol() -> String {
207	"PLAINTEXT".to_string()
208}
209
210impl Default for KafkaAuth {
211	fn default() -> Self {
212		Self {
213			security_protocol: default_security_protocol(),
214			sasl_mechanism: None,
215			sasl_username: None,
216			sasl_password: None,
217			sasl_kerberos_keytab: None,
218			sasl_kerberos_principal: None,
219			ssl_ca_location: None,
220			ssl_certificate_location: None,
221			ssl_key_location: None,
222		}
223	}
224}
225
226impl KafkaAuth {
227	pub fn is_plaintext(&self) -> bool {
228		self.security_protocol == "PLAINTEXT"
229	}
230}
231
232#[cfg(test)]
233mod tests {
234	use super::*;
235
236	#[test]
237	fn hash_is_deterministic() {
238		let data = serde_json::json!({"name": "alice", "age": 30});
239		assert_eq!(hash_row_data(&data), hash_row_data(&data));
240	}
241
242	#[test]
243	fn hash_differs_for_different_data() {
244		let a = serde_json::json!({"name": "alice"});
245		let b = serde_json::json!({"name": "bob"});
246		assert_ne!(hash_row_data(&a), hash_row_data(&b));
247	}
248
249	#[test]
250	fn hash_is_64_char_hex() {
251		let hash = hash_row_data(&serde_json::json!({}));
252		assert_eq!(hash.len(), 64);
253		assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
254	}
255
256	#[test]
257	fn op_type_display() {
258		assert_eq!(OpType::Created.to_string(), "created");
259		assert_eq!(OpType::Updated.to_string(), "updated");
260		assert_eq!(OpType::Deleted.to_string(), "deleted");
261	}
262
263	#[test]
264	fn raw_row_roundtrip_json() {
265		let row = RawRow {
266			row_key: "pk-1".into(),
267			row_data: serde_json::json!({"col": "val"}),
268		};
269		let json = serde_json::to_string(&row).unwrap();
270		let back: RawRow = serde_json::from_str(&json).unwrap();
271		assert_eq!(row, back);
272	}
273
274	#[test]
275	fn delta_event_roundtrip_json() {
276		let evt = DeltaEvent {
277			op: OpType::Created,
278			origin_id: "pg-prod".into(),
279			query_id: "q1".into(),
280			row_key: "pk-1".into(),
281			row_data: serde_json::json!({"x": 1}),
282			row_hash: hash_row_data(&serde_json::json!({"x": 1})),
283			cycle_id: 42,
284			timestamp: chrono::Utc::now(),
285		};
286		let json = serde_json::to_string(&evt).unwrap();
287		let back: DeltaEvent = serde_json::from_str(&json).unwrap();
288		assert_eq!(evt.op, back.op);
289		assert_eq!(evt.row_key, back.row_key);
290		assert_eq!(evt.cycle_id, back.cycle_id);
291	}
292
293	#[test]
294	fn op_type_serde_snake_case() {
295		let json = serde_json::to_string(&OpType::Created).unwrap();
296		assert_eq!(json, "\"created\"");
297		let back: OpType = serde_json::from_str("\"updated\"").unwrap();
298		assert_eq!(back, OpType::Updated);
299	}
300
301	// ── DeltaResult tests ──────────────────────────────────────────
302
303	#[test]
304	fn delta_result_empty_by_default() {
305		let r = DeltaResult::default();
306		assert!(r.is_empty());
307		assert_eq!(r.total(), 0);
308	}
309
310	#[test]
311	fn delta_result_total_counts_all() {
312		let evt = || DeltaEvent {
313			op: OpType::Created,
314			origin_id: "s".into(),
315			query_id: "q".into(),
316			row_key: "k".into(),
317			row_data: serde_json::json!({}),
318			row_hash: "h".into(),
319			cycle_id: 1,
320			timestamp: Utc::now(),
321		};
322		let r = DeltaResult {
323			created: vec![evt()],
324			updated: vec![evt(), evt()],
325			deleted: vec![evt()],
326		};
327		assert_eq!(r.total(), 4);
328		assert!(!r.is_empty());
329	}
330
331	// ── CycleStatus tests ──────────────────────────────────────────
332
333	#[test]
334	fn cycle_status_display() {
335		assert_eq!(CycleStatus::Running.to_string(), "running");
336		assert_eq!(CycleStatus::Success.to_string(), "success");
337		assert_eq!(CycleStatus::Failed.to_string(), "failed");
338		assert_eq!(CycleStatus::Aborted.to_string(), "aborted");
339	}
340
341	#[test]
342	fn cycle_status_serde() {
343		let json = serde_json::to_string(&CycleStatus::Aborted).unwrap();
344		assert_eq!(json, "\"aborted\"");
345		let back: CycleStatus = serde_json::from_str("\"success\"").unwrap();
346		assert_eq!(back, CycleStatus::Success);
347	}
348
349	// ── compute_diff tests ─────────────────────────────────────────
350
351	#[test]
352	fn diff_all_created_when_no_previous() {
353		let prev = HashMap::new();
354		let rows = vec![
355			RawRow {
356				row_key: "a".into(),
357				row_data: serde_json::json!({"v": 1}),
358			},
359			RawRow {
360				row_key: "b".into(),
361				row_data: serde_json::json!({"v": 2}),
362			},
363		];
364		let r = compute_diff(&prev, &rows, "src", "q", 1);
365		assert_eq!(r.created.len(), 2);
366		assert!(r.updated.is_empty());
367		assert!(r.deleted.is_empty());
368		assert!(r.created.iter().all(|e| e.op == OpType::Created));
369	}
370
371	#[test]
372	fn diff_all_deleted_when_no_current() {
373		let prev = HashMap::from([
374			("a".into(), hash_row_data(&serde_json::json!({"v": 1}))),
375			("b".into(), hash_row_data(&serde_json::json!({"v": 2}))),
376		]);
377		let r = compute_diff(&prev, &[], "src", "q", 2);
378		assert!(r.created.is_empty());
379		assert!(r.updated.is_empty());
380		assert_eq!(r.deleted.len(), 2);
381		assert!(r.deleted.iter().all(|e| e.op == OpType::Deleted));
382	}
383
384	#[test]
385	fn diff_detects_update() {
386		let data_v1 = serde_json::json!({"name": "alice"});
387		let data_v2 = serde_json::json!({"name": "alice_updated"});
388		let prev = HashMap::from([("a".into(), hash_row_data(&data_v1))]);
389		let rows = vec![RawRow {
390			row_key: "a".into(),
391			row_data: data_v2,
392		}];
393		let r = compute_diff(&prev, &rows, "src", "q", 2);
394		assert!(r.created.is_empty());
395		assert_eq!(r.updated.len(), 1);
396		assert!(r.deleted.is_empty());
397		assert_eq!(r.updated[0].row_key, "a");
398	}
399
400	#[test]
401	fn diff_no_change_when_hash_matches() {
402		let data = serde_json::json!({"name": "alice"});
403		let prev = HashMap::from([("a".into(), hash_row_data(&data))]);
404		let rows = vec![RawRow {
405			row_key: "a".into(),
406			row_data: data,
407		}];
408		let r = compute_diff(&prev, &rows, "src", "q", 2);
409		assert!(r.is_empty());
410	}
411
412	#[test]
413	fn diff_mixed_operations() {
414		let data_a = serde_json::json!({"v": 1});
415		let data_b = serde_json::json!({"v": 2});
416		let data_b2 = serde_json::json!({"v": 3});
417		let prev = HashMap::from([
418			("a".into(), hash_row_data(&data_a)),
419			("b".into(), hash_row_data(&data_b)),
420			("c".into(), hash_row_data(&serde_json::json!({"v": 99}))),
421		]);
422		let rows = vec![
423			RawRow {
424				row_key: "a".into(),
425				row_data: data_a,
426			},
427			RawRow {
428				row_key: "b".into(),
429				row_data: data_b2,
430			},
431			RawRow {
432				row_key: "d".into(),
433				row_data: serde_json::json!({"v": 4}),
434			},
435		];
436		let r = compute_diff(&prev, &rows, "src", "q", 3);
437		assert_eq!(r.created.len(), 1);
438		assert_eq!(r.created[0].row_key, "d");
439		assert_eq!(r.updated.len(), 1);
440		assert_eq!(r.updated[0].row_key, "b");
441		assert_eq!(r.deleted.len(), 1);
442		assert_eq!(r.deleted[0].row_key, "c");
443	}
444
445	#[test]
446	fn diff_preserves_source_and_query_ids() {
447		let rows = vec![RawRow {
448			row_key: "x".into(),
449			row_data: serde_json::json!({}),
450		}];
451		let r = compute_diff(&HashMap::new(), &rows, "my-src", "my-q", 7);
452		assert_eq!(r.created[0].origin_id, "my-src");
453		assert_eq!(r.created[0].query_id, "my-q");
454		assert_eq!(r.created[0].cycle_id, 7);
455	}
456
457	#[test]
458	fn kafka_auth_default_is_plaintext() {
459		let auth = KafkaAuth::default();
460		assert_eq!(auth.security_protocol, "PLAINTEXT");
461		assert!(auth.is_plaintext());
462		assert!(auth.sasl_mechanism.is_none());
463	}
464
465	#[test]
466	fn kafka_auth_deserializes_sasl_plain() {
467		let json = serde_json::json!({
468			"security_protocol": "SASL_SSL",
469			"sasl_mechanism": "PLAIN",
470			"sasl_username": "user",
471			"sasl_password": "pass",
472			"ssl_ca_location": "/path/to/ca.pem"
473		});
474		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
475		assert_eq!(auth.security_protocol, "SASL_SSL");
476		assert_eq!(auth.sasl_mechanism.as_deref(), Some("PLAIN"));
477		assert_eq!(auth.sasl_username.as_deref(), Some("user"));
478		assert_eq!(auth.sasl_password.as_deref(), Some("pass"));
479		assert_eq!(auth.ssl_ca_location.as_deref(), Some("/path/to/ca.pem"));
480		assert!(!auth.is_plaintext());
481	}
482
483	#[test]
484	fn kafka_auth_deserializes_kerberos() {
485		let json = serde_json::json!({
486			"security_protocol": "SASL_PLAINTEXT",
487			"sasl_mechanism": "GSSAPI",
488			"sasl_kerberos_keytab": "/etc/krb5.keytab",
489			"sasl_kerberos_principal": "kafka/broker@REALM"
490		});
491		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
492		assert_eq!(auth.sasl_mechanism.as_deref(), Some("GSSAPI"));
493		assert_eq!(
494			auth.sasl_kerberos_keytab.as_deref(),
495			Some("/etc/krb5.keytab")
496		);
497		assert_eq!(
498			auth.sasl_kerberos_principal.as_deref(),
499			Some("kafka/broker@REALM")
500		);
501	}
502
503	#[test]
504	fn diff_deleted_events_carry_previous_hash() {
505		let data = serde_json::json!({"k": 1});
506		let h = hash_row_data(&data);
507		let prev = HashMap::from([("gone".into(), h.clone())]);
508		let r = compute_diff(&prev, &[], "s", "q", 5);
509		assert_eq!(r.deleted[0].row_hash, h);
510		assert_eq!(r.deleted[0].row_data, serde_json::Value::Null);
511	}
512
513	// ── Edge case tests ───────────────────────────────────────────
514
515	#[test]
516	fn diff_empty_previous_empty_current() {
517		let r = compute_diff(&HashMap::new(), &[], "s", "q", 1);
518		assert!(r.is_empty());
519	}
520
521	#[test]
522	fn diff_unicode_keys() {
523		let rows = vec![
524			RawRow {
525				row_key: "日本語キー".into(),
526				row_data: serde_json::json!({"emoji": "🎉"}),
527			},
528			RawRow {
529				row_key: "clé-français".into(),
530				row_data: serde_json::json!({"val": "été"}),
531			},
532		];
533		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
534		assert_eq!(r.created.len(), 2);
535		let keys: Vec<&str> = r.created.iter().map(|e| e.row_key.as_str()).collect();
536		assert!(keys.contains(&"日本語キー"));
537		assert!(keys.contains(&"clé-français"));
538	}
539
540	#[test]
541	fn diff_null_row_data() {
542		let rows = vec![RawRow {
543			row_key: "k1".into(),
544			row_data: serde_json::Value::Null,
545		}];
546		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
547		assert_eq!(r.created.len(), 1);
548		assert_eq!(r.created[0].row_data, serde_json::Value::Null);
549	}
550
551	#[test]
552	fn diff_empty_string_key() {
553		let rows = vec![RawRow {
554			row_key: "".into(),
555			row_data: serde_json::json!({"v": 1}),
556		}];
557		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
558		assert_eq!(r.created.len(), 1);
559		assert_eq!(r.created[0].row_key, "");
560	}
561
562	#[test]
563	fn diff_duplicate_keys_emit_multiple_created_events() {
564		let prev = HashMap::new();
565		let rows = vec![
566			RawRow {
567				row_key: "dup".into(),
568				row_data: serde_json::json!({"v": 1}),
569			},
570			RawRow {
571				row_key: "dup".into(),
572				row_data: serde_json::json!({"v": 2}),
573			},
574		];
575		let r = compute_diff(&prev, &rows, "s", "q", 1);
576		assert_eq!(r.created.len(), 2);
577	}
578
579	#[test]
580	fn diff_large_json_data() {
581		let big_obj: serde_json::Value = (0..100)
582			.map(|i| (format!("field_{i}"), serde_json::json!(i)))
583			.collect::<serde_json::Map<String, serde_json::Value>>()
584			.into();
585		let rows = vec![RawRow {
586			row_key: "big".into(),
587			row_data: big_obj.clone(),
588		}];
589		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
590		assert_eq!(r.created.len(), 1);
591		assert_eq!(r.created[0].row_hash, hash_row_data(&big_obj));
592	}
593
594	#[test]
595	fn hash_null_value() {
596		let h = hash_row_data(&serde_json::Value::Null);
597		assert_eq!(h.len(), 64);
598	}
599
600	#[test]
601	fn hash_nested_objects_differ() {
602		let a = serde_json::json!({"nested": {"a": 1}});
603		let b = serde_json::json!({"nested": {"a": 2}});
604		assert_ne!(hash_row_data(&a), hash_row_data(&b));
605	}
606
607	#[test]
608	fn hash_array_values() {
609		let data = serde_json::json!({"items": [1, 2, 3]});
610		let h = hash_row_data(&data);
611		assert_eq!(h.len(), 64);
612		assert_ne!(h, hash_row_data(&serde_json::json!({"items": [1, 2, 4]})));
613	}
614
615	// ── EventEnvelope conversion ──────────────────────────────────
616
617	#[test]
618	fn event_envelope_from_delta_event() {
619		let evt = DeltaEvent {
620			op: OpType::Updated,
621			origin_id: "pg".into(),
622			query_id: "q1".into(),
623			row_key: "pk-42".into(),
624			row_data: serde_json::json!({"name": "test"}),
625			row_hash: "abc123".into(),
626			cycle_id: 10,
627			timestamp: Utc::now(),
628		};
629		let envelope = EventEnvelope::from(&evt);
630		assert_eq!(envelope.meta.op, OpType::Updated);
631		assert_eq!(envelope.meta.origin_id, "pg");
632		assert_eq!(envelope.meta.query_id, "q1");
633		assert_eq!(envelope.meta.key, "pk-42");
634		assert_eq!(envelope.meta.hash, "abc123");
635		assert_eq!(envelope.meta.cycle_id, 10);
636		assert_eq!(envelope.data, serde_json::json!({"name": "test"}));
637	}
638
639	#[test]
640	fn event_envelope_roundtrip_json() {
641		let evt = DeltaEvent {
642			op: OpType::Created,
643			origin_id: "s".into(),
644			query_id: "q".into(),
645			row_key: "k".into(),
646			row_data: serde_json::json!(null),
647			row_hash: "h".into(),
648			cycle_id: 1,
649			timestamp: Utc::now(),
650		};
651		let envelope = EventEnvelope::from(&evt);
652		let json = serde_json::to_string(&envelope).unwrap();
653		let back: EventEnvelope = serde_json::from_str(&json).unwrap();
654		assert_eq!(envelope, back);
655	}
656
657	// ── AuthConfig tests ──────────────────────────────────────────
658
659	#[test]
660	fn auth_config_bearer() {
661		let json = serde_json::json!({"type": "bearer", "token": "tok123"});
662		let auth: AuthConfig = serde_json::from_value(json).unwrap();
663		assert!(matches!(auth, AuthConfig::Bearer { token } if token == "tok123"));
664	}
665
666	#[test]
667	fn auth_config_header() {
668		let json = serde_json::json!({"type": "header", "name": "X-Api-Key", "value": "secret"});
669		let auth: AuthConfig = serde_json::from_value(json).unwrap();
670		assert!(
671			matches!(auth, AuthConfig::Header { name, value } if name == "X-Api-Key" && value == "secret")
672		);
673	}
674
675	#[test]
676	fn auth_config_basic() {
677		let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
678		let auth: AuthConfig = serde_json::from_value(json).unwrap();
679		assert!(
680			matches!(auth, AuthConfig::Basic { username, password } if username == "user" && password == "pass")
681		);
682	}
683
684	#[test]
685	fn auth_config_unknown_type_fails() {
686		let json = serde_json::json!({"type": "oauth2", "token": "t"});
687		assert!(serde_json::from_value::<AuthConfig>(json).is_err());
688	}
689
690	#[test]
691	fn kafka_auth_ssl_only() {
692		let json = serde_json::json!({
693			"security_protocol": "SSL",
694			"ssl_ca_location": "/ca.pem",
695			"ssl_certificate_location": "/cert.pem",
696			"ssl_key_location": "/key.pem"
697		});
698		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
699		assert_eq!(auth.security_protocol, "SSL");
700		assert!(!auth.is_plaintext());
701		assert!(auth.sasl_mechanism.is_none());
702		assert_eq!(auth.ssl_ca_location.as_deref(), Some("/ca.pem"));
703		assert_eq!(auth.ssl_certificate_location.as_deref(), Some("/cert.pem"));
704		assert_eq!(auth.ssl_key_location.as_deref(), Some("/key.pem"));
705	}
706
707	#[test]
708	fn kafka_auth_minimal_deserialize() {
709		let json = serde_json::json!({});
710		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
711		assert_eq!(auth.security_protocol, "PLAINTEXT");
712		assert!(auth.is_plaintext());
713	}
714}
715
716#[cfg(test)]
717mod prop_tests {
718	use super::*;
719	use proptest::prelude::*;
720	use std::collections::HashMap;
721
722	fn arb_json_value() -> impl Strategy<Value = serde_json::Value> {
723		prop_oneof![
724			Just(serde_json::Value::Null),
725			any::<bool>().prop_map(serde_json::Value::Bool),
726			any::<i64>().prop_map(|n| serde_json::json!(n)),
727			any::<f64>()
728				.prop_filter("must be finite", |f| f.is_finite())
729				.prop_map(|n| serde_json::json!(n)),
730			"[a-zA-Z0-9_ ]{0,50}".prop_map(serde_json::Value::String),
731		]
732	}
733
734	fn arb_raw_row() -> impl Strategy<Value = RawRow> {
735		("[a-zA-Z0-9_]{1,20}", arb_json_value()).prop_map(|(key, data)| RawRow {
736			row_key: key,
737			row_data: data,
738		})
739	}
740
741	fn arb_raw_rows() -> impl Strategy<Value = Vec<RawRow>> {
742		prop::collection::vec(arb_raw_row(), 0..50)
743	}
744
745	fn arb_previous_snapshot() -> impl Strategy<Value = HashMap<String, String>> {
746		prop::collection::vec(("[a-zA-Z0-9_]{1,20}", "[a-f0-9]{64}"), 0..50)
747			.prop_map(|pairs| pairs.into_iter().collect())
748	}
749
750	proptest! {
751		#[test]
752		fn created_and_deleted_are_disjoint(
753			previous in arb_previous_snapshot(),
754			current in arb_raw_rows(),
755		) {
756			let result = compute_diff(&previous, &current, "o", "q", 1);
757			let created_keys: std::collections::HashSet<_> =
758				result.created.iter().map(|e| &e.row_key).collect();
759			let deleted_keys: std::collections::HashSet<_> =
760				result.deleted.iter().map(|e| &e.row_key).collect();
761			prop_assert!(created_keys.is_disjoint(&deleted_keys));
762		}
763
764		#[test]
765		fn created_and_updated_are_disjoint(
766			previous in arb_previous_snapshot(),
767			current in arb_raw_rows(),
768		) {
769			let result = compute_diff(&previous, &current, "o", "q", 1);
770			let created_keys: std::collections::HashSet<_> =
771				result.created.iter().map(|e| &e.row_key).collect();
772			let updated_keys: std::collections::HashSet<_> =
773				result.updated.iter().map(|e| &e.row_key).collect();
774			prop_assert!(created_keys.is_disjoint(&updated_keys));
775		}
776
777		#[test]
778		fn deleted_keys_come_from_previous(
779			previous in arb_previous_snapshot(),
780			current in arb_raw_rows(),
781		) {
782			let result = compute_diff(&previous, &current, "o", "q", 1);
783			for event in &result.deleted {
784				prop_assert!(previous.contains_key(&event.row_key));
785			}
786		}
787
788		#[test]
789		fn created_keys_not_in_previous(
790			previous in arb_previous_snapshot(),
791			current in arb_raw_rows(),
792		) {
793			let result = compute_diff(&previous, &current, "o", "q", 1);
794			for event in &result.created {
795				prop_assert!(!previous.contains_key(&event.row_key));
796			}
797		}
798
799		#[test]
800		fn updated_keys_in_both(
801			previous in arb_previous_snapshot(),
802			current in arb_raw_rows(),
803		) {
804			let current_keys: std::collections::HashSet<_> =
805				current.iter().map(|r| &r.row_key).collect();
806			let result = compute_diff(&previous, &current, "o", "q", 1);
807			for event in &result.updated {
808				prop_assert!(previous.contains_key(&event.row_key));
809				prop_assert!(current_keys.contains(&event.row_key));
810			}
811		}
812
813		#[test]
814		fn deleted_events_have_null_data(
815			previous in arb_previous_snapshot(),
816			current in arb_raw_rows(),
817		) {
818			let result = compute_diff(&previous, &current, "o", "q", 1);
819			for event in &result.deleted {
820				prop_assert_eq!(&event.row_data, &serde_json::Value::Null);
821				prop_assert_eq!(event.op, OpType::Deleted);
822			}
823		}
824
825		#[test]
826		fn created_events_have_correct_op(
827			previous in arb_previous_snapshot(),
828			current in arb_raw_rows(),
829		) {
830			let result = compute_diff(&previous, &current, "o", "q", 1);
831			for event in &result.created {
832				prop_assert_eq!(event.op, OpType::Created);
833			}
834			for event in &result.updated {
835				prop_assert_eq!(event.op, OpType::Updated);
836			}
837		}
838
839		#[test]
840		fn unique_keys_conservation(
841			previous in arb_previous_snapshot(),
842			current in arb_raw_rows(),
843		) {
844			// Deduplicate current by key (last-write-wins, matching compute_diff behavior)
845			let mut deduped: HashMap<&str, &RawRow> = HashMap::new();
846			for row in &current {
847				deduped.insert(&row.row_key, row);
848			}
849			let unique_current_keys: std::collections::HashSet<&str> =
850				deduped.keys().copied().collect();
851
852			let result = compute_diff(&previous, &current, "o", "q", 1);
853
854			// Every unique current key is either created, updated, or unchanged
855			let created_keys: std::collections::HashSet<&str> =
856				result.created.iter().map(|e| e.row_key.as_str()).collect();
857			let updated_keys: std::collections::HashSet<&str> =
858				result.updated.iter().map(|e| e.row_key.as_str()).collect();
859
860			// created + updated ≤ unique current keys
861			prop_assert!(created_keys.len() + updated_keys.len() <= unique_current_keys.len());
862
863			// Every deleted key is in previous but not in current
864			let deleted_keys: std::collections::HashSet<&str> =
865				result.deleted.iter().map(|e| e.row_key.as_str()).collect();
866			for dk in &deleted_keys {
867				prop_assert!(!unique_current_keys.contains(dk));
868			}
869		}
870
871		#[test]
872		fn empty_previous_means_no_updates_or_deletes(
873			current in arb_raw_rows(),
874		) {
875			let previous = HashMap::new();
876			let result = compute_diff(&previous, &current, "o", "q", 1);
877			prop_assert!(result.deleted.is_empty());
878			prop_assert!(result.updated.is_empty());
879			// Every row becomes a created event (including duplicate keys)
880			prop_assert_eq!(result.created.len(), current.len());
881		}
882
883		#[test]
884		fn empty_current_means_all_deleted(
885			previous in arb_previous_snapshot(),
886		) {
887			let current: Vec<RawRow> = vec![];
888			let result = compute_diff(&previous, &current, "o", "q", 1);
889			prop_assert!(result.created.is_empty());
890			prop_assert!(result.updated.is_empty());
891			prop_assert_eq!(result.deleted.len(), previous.len());
892		}
893
894		#[test]
895		fn hash_is_deterministic(data in arb_json_value()) {
896			let h1 = hash_row_data(&data);
897			let h2 = hash_row_data(&data);
898			prop_assert_eq!(h1, h2);
899		}
900
901		#[test]
902		fn hash_is_64_hex_chars(data in arb_json_value()) {
903			let h = hash_row_data(&data);
904			prop_assert_eq!(h.len(), 64);
905			prop_assert!(h.chars().all(|c| c.is_ascii_hexdigit()));
906		}
907	}
908}