Skip to main content

oversync_core/
model.rs

1use std::collections::HashMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
8pub struct RawRow {
9	pub row_key: String,
10	pub row_data: serde_json::Value,
11}
12
13#[derive(Debug, Clone, Default)]
14pub struct DeltaResult {
15	pub created: Vec<DeltaEvent>,
16	pub updated: Vec<DeltaEvent>,
17	pub deleted: Vec<DeltaEvent>,
18}
19
20impl DeltaResult {
21	pub fn total(&self) -> usize {
22		self.created.len() + self.updated.len() + self.deleted.len()
23	}
24
25	pub fn is_empty(&self) -> bool {
26		self.total() == 0
27	}
28}
29
30/// Wire format for sink delivery. Each event is one message.
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub struct EventEnvelope {
33	pub meta: EventMeta,
34	pub data: serde_json::Value,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct EventMeta {
39	pub op: OpType,
40	pub origin_id: String,
41	pub query_id: String,
42	pub key: String,
43	pub hash: String,
44	pub cycle_id: u64,
45	pub timestamp: DateTime<Utc>,
46}
47
48impl From<&DeltaEvent> for EventEnvelope {
49	fn from(e: &DeltaEvent) -> Self {
50		Self {
51			meta: EventMeta {
52				op: e.op,
53				origin_id: e.origin_id.clone(),
54				query_id: e.query_id.clone(),
55				key: e.row_key.clone(),
56				hash: e.row_hash.clone(),
57				cycle_id: e.cycle_id,
58				timestamp: e.timestamp,
59			},
60			data: e.row_data.clone(),
61		}
62	}
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
66#[serde(rename_all = "snake_case")]
67pub enum CycleStatus {
68	Running,
69	Success,
70	Failed,
71	Aborted,
72}
73
74impl std::fmt::Display for CycleStatus {
75	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76		match self {
77			Self::Running => write!(f, "running"),
78			Self::Success => write!(f, "success"),
79			Self::Failed => write!(f, "failed"),
80			Self::Aborted => write!(f, "aborted"),
81		}
82	}
83}
84
85pub fn compute_diff(
86	previous: &HashMap<String, String>,
87	current: &[RawRow],
88	origin_id: &str,
89	query_id: &str,
90	cycle_id: u64,
91) -> DeltaResult {
92	let now = Utc::now();
93	let mut result = DeltaResult::default();
94	let hashes = hash_rows(current);
95	let mut seen_keys = std::collections::HashSet::with_capacity(current.len());
96
97	for (row, row_hash) in current.iter().zip(hashes) {
98		seen_keys.insert(row.row_key.clone());
99
100		match previous.get(&row.row_key) {
101			None => {
102				result.created.push(DeltaEvent {
103					op: OpType::Created,
104					origin_id: origin_id.into(),
105					query_id: query_id.into(),
106					row_key: row.row_key.clone(),
107					row_data: row.row_data.clone(),
108					row_hash,
109					cycle_id,
110					timestamp: now,
111				});
112			}
113			Some(prev_hash) if *prev_hash != row_hash => {
114				result.updated.push(DeltaEvent {
115					op: OpType::Updated,
116					origin_id: origin_id.into(),
117					query_id: query_id.into(),
118					row_key: row.row_key.clone(),
119					row_data: row.row_data.clone(),
120					row_hash,
121					cycle_id,
122					timestamp: now,
123				});
124			}
125			_ => {}
126		}
127	}
128
129	for (key, hash) in previous {
130		if !seen_keys.contains(key) {
131			result.deleted.push(DeltaEvent {
132				op: OpType::Deleted,
133				origin_id: origin_id.into(),
134				query_id: query_id.into(),
135				row_key: key.clone(),
136				row_data: serde_json::Value::Null,
137				row_hash: hash.clone(),
138				cycle_id,
139				timestamp: now,
140			});
141		}
142	}
143
144	result
145}
146
147#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum OpType {
150	Created,
151	Updated,
152	Deleted,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
156pub struct DeltaEvent {
157	pub op: OpType,
158	pub origin_id: String,
159	pub query_id: String,
160	pub row_key: String,
161	pub row_data: serde_json::Value,
162	pub row_hash: String,
163	pub cycle_id: u64,
164	pub timestamp: DateTime<Utc>,
165}
166
167pub fn hash_row_data(data: &serde_json::Value) -> String {
168	let serialized = serde_json::to_string(data).expect("serde_json::Value is always serializable");
169	let mut hasher = Sha256::new();
170	hasher.update(serialized.as_bytes());
171	const_hex::encode(hasher.finalize())
172}
173
174/// Hash multiple rows, using rayon parallelism when the `parallel` feature is enabled.
175pub fn hash_rows(rows: &[RawRow]) -> Vec<String> {
176	#[cfg(feature = "parallel")]
177	{
178		use rayon::prelude::*;
179		rows.par_iter()
180			.map(|r| hash_row_data(&r.row_data))
181			.collect()
182	}
183	#[cfg(not(feature = "parallel"))]
184	{
185		rows.iter().map(|r| hash_row_data(&r.row_data)).collect()
186	}
187}
188
189impl std::fmt::Display for OpType {
190	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191		match self {
192			OpType::Created => write!(f, "created"),
193			OpType::Updated => write!(f, "updated"),
194			OpType::Deleted => write!(f, "deleted"),
195		}
196	}
197}
198
199#[derive(Debug, Clone, Deserialize)]
200#[serde(tag = "type", rename_all = "snake_case")]
201pub enum AuthConfig {
202	Bearer { token: String },
203	Header { name: String, value: String },
204	Basic { username: String, password: String },
205}
206
207#[derive(Debug, Clone, Deserialize)]
208pub struct KafkaAuth {
209	#[serde(default = "default_security_protocol")]
210	pub security_protocol: String,
211	pub sasl_mechanism: Option<String>,
212	pub sasl_username: Option<String>,
213	pub sasl_password: Option<String>,
214	pub sasl_kerberos_keytab: Option<String>,
215	pub sasl_kerberos_principal: Option<String>,
216	pub ssl_ca_location: Option<String>,
217	pub ssl_certificate_location: Option<String>,
218	pub ssl_key_location: Option<String>,
219}
220
221fn default_security_protocol() -> String {
222	"PLAINTEXT".to_string()
223}
224
225impl Default for KafkaAuth {
226	fn default() -> Self {
227		Self {
228			security_protocol: default_security_protocol(),
229			sasl_mechanism: None,
230			sasl_username: None,
231			sasl_password: None,
232			sasl_kerberos_keytab: None,
233			sasl_kerberos_principal: None,
234			ssl_ca_location: None,
235			ssl_certificate_location: None,
236			ssl_key_location: None,
237		}
238	}
239}
240
241impl KafkaAuth {
242	pub fn is_plaintext(&self) -> bool {
243		self.security_protocol == "PLAINTEXT"
244	}
245}
246
247#[cfg(test)]
248mod tests {
249	use super::*;
250
251	#[test]
252	fn hash_is_deterministic() {
253		let data = serde_json::json!({"name": "alice", "age": 30});
254		assert_eq!(hash_row_data(&data), hash_row_data(&data));
255	}
256
257	#[test]
258	fn hash_differs_for_different_data() {
259		let a = serde_json::json!({"name": "alice"});
260		let b = serde_json::json!({"name": "bob"});
261		assert_ne!(hash_row_data(&a), hash_row_data(&b));
262	}
263
264	#[test]
265	fn hash_is_64_char_hex() {
266		let hash = hash_row_data(&serde_json::json!({}));
267		assert_eq!(hash.len(), 64);
268		assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
269	}
270
271	#[test]
272	fn op_type_display() {
273		assert_eq!(OpType::Created.to_string(), "created");
274		assert_eq!(OpType::Updated.to_string(), "updated");
275		assert_eq!(OpType::Deleted.to_string(), "deleted");
276	}
277
278	#[test]
279	fn raw_row_roundtrip_json() {
280		let row = RawRow {
281			row_key: "pk-1".into(),
282			row_data: serde_json::json!({"col": "val"}),
283		};
284		let json = serde_json::to_string(&row).unwrap();
285		let back: RawRow = serde_json::from_str(&json).unwrap();
286		assert_eq!(row, back);
287	}
288
289	#[test]
290	fn delta_event_roundtrip_json() {
291		let evt = DeltaEvent {
292			op: OpType::Created,
293			origin_id: "pg-prod".into(),
294			query_id: "q1".into(),
295			row_key: "pk-1".into(),
296			row_data: serde_json::json!({"x": 1}),
297			row_hash: hash_row_data(&serde_json::json!({"x": 1})),
298			cycle_id: 42,
299			timestamp: chrono::Utc::now(),
300		};
301		let json = serde_json::to_string(&evt).unwrap();
302		let back: DeltaEvent = serde_json::from_str(&json).unwrap();
303		assert_eq!(evt.op, back.op);
304		assert_eq!(evt.row_key, back.row_key);
305		assert_eq!(evt.cycle_id, back.cycle_id);
306	}
307
308	#[test]
309	fn op_type_serde_snake_case() {
310		let json = serde_json::to_string(&OpType::Created).unwrap();
311		assert_eq!(json, "\"created\"");
312		let back: OpType = serde_json::from_str("\"updated\"").unwrap();
313		assert_eq!(back, OpType::Updated);
314	}
315
316	// ── DeltaResult tests ──────────────────────────────────────────
317
318	#[test]
319	fn delta_result_empty_by_default() {
320		let r = DeltaResult::default();
321		assert!(r.is_empty());
322		assert_eq!(r.total(), 0);
323	}
324
325	#[test]
326	fn delta_result_total_counts_all() {
327		let evt = || DeltaEvent {
328			op: OpType::Created,
329			origin_id: "s".into(),
330			query_id: "q".into(),
331			row_key: "k".into(),
332			row_data: serde_json::json!({}),
333			row_hash: "h".into(),
334			cycle_id: 1,
335			timestamp: Utc::now(),
336		};
337		let r = DeltaResult {
338			created: vec![evt()],
339			updated: vec![evt(), evt()],
340			deleted: vec![evt()],
341		};
342		assert_eq!(r.total(), 4);
343		assert!(!r.is_empty());
344	}
345
346	// ── CycleStatus tests ──────────────────────────────────────────
347
348	#[test]
349	fn cycle_status_display() {
350		assert_eq!(CycleStatus::Running.to_string(), "running");
351		assert_eq!(CycleStatus::Success.to_string(), "success");
352		assert_eq!(CycleStatus::Failed.to_string(), "failed");
353		assert_eq!(CycleStatus::Aborted.to_string(), "aborted");
354	}
355
356	#[test]
357	fn cycle_status_serde() {
358		let json = serde_json::to_string(&CycleStatus::Aborted).unwrap();
359		assert_eq!(json, "\"aborted\"");
360		let back: CycleStatus = serde_json::from_str("\"success\"").unwrap();
361		assert_eq!(back, CycleStatus::Success);
362	}
363
364	// ── compute_diff tests ─────────────────────────────────────────
365
366	#[test]
367	fn diff_all_created_when_no_previous() {
368		let prev = HashMap::new();
369		let rows = vec![
370			RawRow {
371				row_key: "a".into(),
372				row_data: serde_json::json!({"v": 1}),
373			},
374			RawRow {
375				row_key: "b".into(),
376				row_data: serde_json::json!({"v": 2}),
377			},
378		];
379		let r = compute_diff(&prev, &rows, "src", "q", 1);
380		assert_eq!(r.created.len(), 2);
381		assert!(r.updated.is_empty());
382		assert!(r.deleted.is_empty());
383		assert!(r.created.iter().all(|e| e.op == OpType::Created));
384	}
385
386	#[test]
387	fn diff_all_deleted_when_no_current() {
388		let prev = HashMap::from([
389			("a".into(), hash_row_data(&serde_json::json!({"v": 1}))),
390			("b".into(), hash_row_data(&serde_json::json!({"v": 2}))),
391		]);
392		let r = compute_diff(&prev, &[], "src", "q", 2);
393		assert!(r.created.is_empty());
394		assert!(r.updated.is_empty());
395		assert_eq!(r.deleted.len(), 2);
396		assert!(r.deleted.iter().all(|e| e.op == OpType::Deleted));
397	}
398
399	#[test]
400	fn diff_detects_update() {
401		let data_v1 = serde_json::json!({"name": "alice"});
402		let data_v2 = serde_json::json!({"name": "alice_updated"});
403		let prev = HashMap::from([("a".into(), hash_row_data(&data_v1))]);
404		let rows = vec![RawRow {
405			row_key: "a".into(),
406			row_data: data_v2,
407		}];
408		let r = compute_diff(&prev, &rows, "src", "q", 2);
409		assert!(r.created.is_empty());
410		assert_eq!(r.updated.len(), 1);
411		assert!(r.deleted.is_empty());
412		assert_eq!(r.updated[0].row_key, "a");
413	}
414
415	#[test]
416	fn diff_no_change_when_hash_matches() {
417		let data = serde_json::json!({"name": "alice"});
418		let prev = HashMap::from([("a".into(), hash_row_data(&data))]);
419		let rows = vec![RawRow {
420			row_key: "a".into(),
421			row_data: data,
422		}];
423		let r = compute_diff(&prev, &rows, "src", "q", 2);
424		assert!(r.is_empty());
425	}
426
427	#[test]
428	fn diff_mixed_operations() {
429		let data_a = serde_json::json!({"v": 1});
430		let data_b = serde_json::json!({"v": 2});
431		let data_b2 = serde_json::json!({"v": 3});
432		let prev = HashMap::from([
433			("a".into(), hash_row_data(&data_a)),
434			("b".into(), hash_row_data(&data_b)),
435			("c".into(), hash_row_data(&serde_json::json!({"v": 99}))),
436		]);
437		let rows = vec![
438			RawRow {
439				row_key: "a".into(),
440				row_data: data_a,
441			},
442			RawRow {
443				row_key: "b".into(),
444				row_data: data_b2,
445			},
446			RawRow {
447				row_key: "d".into(),
448				row_data: serde_json::json!({"v": 4}),
449			},
450		];
451		let r = compute_diff(&prev, &rows, "src", "q", 3);
452		assert_eq!(r.created.len(), 1);
453		assert_eq!(r.created[0].row_key, "d");
454		assert_eq!(r.updated.len(), 1);
455		assert_eq!(r.updated[0].row_key, "b");
456		assert_eq!(r.deleted.len(), 1);
457		assert_eq!(r.deleted[0].row_key, "c");
458	}
459
460	#[test]
461	fn diff_preserves_source_and_query_ids() {
462		let rows = vec![RawRow {
463			row_key: "x".into(),
464			row_data: serde_json::json!({}),
465		}];
466		let r = compute_diff(&HashMap::new(), &rows, "my-src", "my-q", 7);
467		assert_eq!(r.created[0].origin_id, "my-src");
468		assert_eq!(r.created[0].query_id, "my-q");
469		assert_eq!(r.created[0].cycle_id, 7);
470	}
471
472	#[test]
473	fn kafka_auth_default_is_plaintext() {
474		let auth = KafkaAuth::default();
475		assert_eq!(auth.security_protocol, "PLAINTEXT");
476		assert!(auth.is_plaintext());
477		assert!(auth.sasl_mechanism.is_none());
478	}
479
480	#[test]
481	fn kafka_auth_deserializes_sasl_plain() {
482		let json = serde_json::json!({
483			"security_protocol": "SASL_SSL",
484			"sasl_mechanism": "PLAIN",
485			"sasl_username": "user",
486			"sasl_password": "pass",
487			"ssl_ca_location": "/path/to/ca.pem"
488		});
489		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
490		assert_eq!(auth.security_protocol, "SASL_SSL");
491		assert_eq!(auth.sasl_mechanism.as_deref(), Some("PLAIN"));
492		assert_eq!(auth.sasl_username.as_deref(), Some("user"));
493		assert_eq!(auth.sasl_password.as_deref(), Some("pass"));
494		assert_eq!(auth.ssl_ca_location.as_deref(), Some("/path/to/ca.pem"));
495		assert!(!auth.is_plaintext());
496	}
497
498	#[test]
499	fn kafka_auth_deserializes_kerberos() {
500		let json = serde_json::json!({
501			"security_protocol": "SASL_PLAINTEXT",
502			"sasl_mechanism": "GSSAPI",
503			"sasl_kerberos_keytab": "/etc/krb5.keytab",
504			"sasl_kerberos_principal": "kafka/broker@REALM"
505		});
506		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
507		assert_eq!(auth.sasl_mechanism.as_deref(), Some("GSSAPI"));
508		assert_eq!(
509			auth.sasl_kerberos_keytab.as_deref(),
510			Some("/etc/krb5.keytab")
511		);
512		assert_eq!(
513			auth.sasl_kerberos_principal.as_deref(),
514			Some("kafka/broker@REALM")
515		);
516	}
517
518	#[test]
519	fn diff_deleted_events_carry_previous_hash() {
520		let data = serde_json::json!({"k": 1});
521		let h = hash_row_data(&data);
522		let prev = HashMap::from([("gone".into(), h.clone())]);
523		let r = compute_diff(&prev, &[], "s", "q", 5);
524		assert_eq!(r.deleted[0].row_hash, h);
525		assert_eq!(r.deleted[0].row_data, serde_json::Value::Null);
526	}
527
528	// ── Edge case tests ───────────────────────────────────────────
529
530	#[test]
531	fn diff_empty_previous_empty_current() {
532		let r = compute_diff(&HashMap::new(), &[], "s", "q", 1);
533		assert!(r.is_empty());
534	}
535
536	#[test]
537	fn diff_unicode_keys() {
538		let rows = vec![
539			RawRow {
540				row_key: "日本語キー".into(),
541				row_data: serde_json::json!({"emoji": "🎉"}),
542			},
543			RawRow {
544				row_key: "clé-français".into(),
545				row_data: serde_json::json!({"val": "été"}),
546			},
547		];
548		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
549		assert_eq!(r.created.len(), 2);
550		let keys: Vec<&str> = r.created.iter().map(|e| e.row_key.as_str()).collect();
551		assert!(keys.contains(&"日本語キー"));
552		assert!(keys.contains(&"clé-français"));
553	}
554
555	#[test]
556	fn diff_null_row_data() {
557		let rows = vec![RawRow {
558			row_key: "k1".into(),
559			row_data: serde_json::Value::Null,
560		}];
561		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
562		assert_eq!(r.created.len(), 1);
563		assert_eq!(r.created[0].row_data, serde_json::Value::Null);
564	}
565
566	#[test]
567	fn diff_empty_string_key() {
568		let rows = vec![RawRow {
569			row_key: "".into(),
570			row_data: serde_json::json!({"v": 1}),
571		}];
572		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
573		assert_eq!(r.created.len(), 1);
574		assert_eq!(r.created[0].row_key, "");
575	}
576
577	#[test]
578	fn diff_duplicate_keys_emit_multiple_created_events() {
579		let prev = HashMap::new();
580		let rows = vec![
581			RawRow {
582				row_key: "dup".into(),
583				row_data: serde_json::json!({"v": 1}),
584			},
585			RawRow {
586				row_key: "dup".into(),
587				row_data: serde_json::json!({"v": 2}),
588			},
589		];
590		let r = compute_diff(&prev, &rows, "s", "q", 1);
591		assert_eq!(r.created.len(), 2);
592	}
593
594	#[test]
595	fn diff_large_json_data() {
596		let big_obj: serde_json::Value = (0..100)
597			.map(|i| (format!("field_{i}"), serde_json::json!(i)))
598			.collect::<serde_json::Map<String, serde_json::Value>>()
599			.into();
600		let rows = vec![RawRow {
601			row_key: "big".into(),
602			row_data: big_obj.clone(),
603		}];
604		let r = compute_diff(&HashMap::new(), &rows, "s", "q", 1);
605		assert_eq!(r.created.len(), 1);
606		assert_eq!(r.created[0].row_hash, hash_row_data(&big_obj));
607	}
608
609	#[test]
610	fn hash_null_value() {
611		let h = hash_row_data(&serde_json::Value::Null);
612		assert_eq!(h.len(), 64);
613	}
614
615	#[test]
616	fn hash_nested_objects_differ() {
617		let a = serde_json::json!({"nested": {"a": 1}});
618		let b = serde_json::json!({"nested": {"a": 2}});
619		assert_ne!(hash_row_data(&a), hash_row_data(&b));
620	}
621
622	#[test]
623	fn hash_array_values() {
624		let data = serde_json::json!({"items": [1, 2, 3]});
625		let h = hash_row_data(&data);
626		assert_eq!(h.len(), 64);
627		assert_ne!(h, hash_row_data(&serde_json::json!({"items": [1, 2, 4]})));
628	}
629
630	// ── EventEnvelope conversion ──────────────────────────────────
631
632	#[test]
633	fn event_envelope_from_delta_event() {
634		let evt = DeltaEvent {
635			op: OpType::Updated,
636			origin_id: "pg".into(),
637			query_id: "q1".into(),
638			row_key: "pk-42".into(),
639			row_data: serde_json::json!({"name": "test"}),
640			row_hash: "abc123".into(),
641			cycle_id: 10,
642			timestamp: Utc::now(),
643		};
644		let envelope = EventEnvelope::from(&evt);
645		assert_eq!(envelope.meta.op, OpType::Updated);
646		assert_eq!(envelope.meta.origin_id, "pg");
647		assert_eq!(envelope.meta.query_id, "q1");
648		assert_eq!(envelope.meta.key, "pk-42");
649		assert_eq!(envelope.meta.hash, "abc123");
650		assert_eq!(envelope.meta.cycle_id, 10);
651		assert_eq!(envelope.data, serde_json::json!({"name": "test"}));
652	}
653
654	#[test]
655	fn event_envelope_roundtrip_json() {
656		let evt = DeltaEvent {
657			op: OpType::Created,
658			origin_id: "s".into(),
659			query_id: "q".into(),
660			row_key: "k".into(),
661			row_data: serde_json::json!(null),
662			row_hash: "h".into(),
663			cycle_id: 1,
664			timestamp: Utc::now(),
665		};
666		let envelope = EventEnvelope::from(&evt);
667		let json = serde_json::to_string(&envelope).unwrap();
668		let back: EventEnvelope = serde_json::from_str(&json).unwrap();
669		assert_eq!(envelope, back);
670	}
671
672	// ── AuthConfig tests ──────────────────────────────────────────
673
674	#[test]
675	fn auth_config_bearer() {
676		let json = serde_json::json!({"type": "bearer", "token": "tok123"});
677		let auth: AuthConfig = serde_json::from_value(json).unwrap();
678		assert!(matches!(auth, AuthConfig::Bearer { token } if token == "tok123"));
679	}
680
681	#[test]
682	fn auth_config_header() {
683		let json = serde_json::json!({"type": "header", "name": "X-Api-Key", "value": "secret"});
684		let auth: AuthConfig = serde_json::from_value(json).unwrap();
685		assert!(
686			matches!(auth, AuthConfig::Header { name, value } if name == "X-Api-Key" && value == "secret")
687		);
688	}
689
690	#[test]
691	fn auth_config_basic() {
692		let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
693		let auth: AuthConfig = serde_json::from_value(json).unwrap();
694		assert!(
695			matches!(auth, AuthConfig::Basic { username, password } if username == "user" && password == "pass")
696		);
697	}
698
699	#[test]
700	fn auth_config_unknown_type_fails() {
701		let json = serde_json::json!({"type": "oauth2", "token": "t"});
702		assert!(serde_json::from_value::<AuthConfig>(json).is_err());
703	}
704
705	#[test]
706	fn kafka_auth_ssl_only() {
707		let json = serde_json::json!({
708			"security_protocol": "SSL",
709			"ssl_ca_location": "/ca.pem",
710			"ssl_certificate_location": "/cert.pem",
711			"ssl_key_location": "/key.pem"
712		});
713		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
714		assert_eq!(auth.security_protocol, "SSL");
715		assert!(!auth.is_plaintext());
716		assert!(auth.sasl_mechanism.is_none());
717		assert_eq!(auth.ssl_ca_location.as_deref(), Some("/ca.pem"));
718		assert_eq!(auth.ssl_certificate_location.as_deref(), Some("/cert.pem"));
719		assert_eq!(auth.ssl_key_location.as_deref(), Some("/key.pem"));
720	}
721
722	#[test]
723	fn kafka_auth_minimal_deserialize() {
724		let json = serde_json::json!({});
725		let auth: KafkaAuth = serde_json::from_value(json).unwrap();
726		assert_eq!(auth.security_protocol, "PLAINTEXT");
727		assert!(auth.is_plaintext());
728	}
729}
730
731#[cfg(test)]
732mod prop_tests {
733	use super::*;
734	use proptest::prelude::*;
735	use std::collections::HashMap;
736
737	fn arb_json_value() -> impl Strategy<Value = serde_json::Value> {
738		prop_oneof![
739			Just(serde_json::Value::Null),
740			any::<bool>().prop_map(serde_json::Value::Bool),
741			any::<i64>().prop_map(|n| serde_json::json!(n)),
742			any::<f64>()
743				.prop_filter("must be finite", |f| f.is_finite())
744				.prop_map(|n| serde_json::json!(n)),
745			"[a-zA-Z0-9_ ]{0,50}".prop_map(serde_json::Value::String),
746		]
747	}
748
749	fn arb_raw_row() -> impl Strategy<Value = RawRow> {
750		("[a-zA-Z0-9_]{1,20}", arb_json_value()).prop_map(|(key, data)| RawRow {
751			row_key: key,
752			row_data: data,
753		})
754	}
755
756	fn arb_raw_rows() -> impl Strategy<Value = Vec<RawRow>> {
757		prop::collection::vec(arb_raw_row(), 0..50)
758	}
759
760	fn arb_previous_snapshot() -> impl Strategy<Value = HashMap<String, String>> {
761		prop::collection::vec(("[a-zA-Z0-9_]{1,20}", "[a-f0-9]{64}"), 0..50)
762			.prop_map(|pairs| pairs.into_iter().collect())
763	}
764
765	proptest! {
766		#[test]
767		fn created_and_deleted_are_disjoint(
768			previous in arb_previous_snapshot(),
769			current in arb_raw_rows(),
770		) {
771			let result = compute_diff(&previous, &current, "o", "q", 1);
772			let created_keys: std::collections::HashSet<_> =
773				result.created.iter().map(|e| &e.row_key).collect();
774			let deleted_keys: std::collections::HashSet<_> =
775				result.deleted.iter().map(|e| &e.row_key).collect();
776			prop_assert!(created_keys.is_disjoint(&deleted_keys));
777		}
778
779		#[test]
780		fn created_and_updated_are_disjoint(
781			previous in arb_previous_snapshot(),
782			current in arb_raw_rows(),
783		) {
784			let result = compute_diff(&previous, &current, "o", "q", 1);
785			let created_keys: std::collections::HashSet<_> =
786				result.created.iter().map(|e| &e.row_key).collect();
787			let updated_keys: std::collections::HashSet<_> =
788				result.updated.iter().map(|e| &e.row_key).collect();
789			prop_assert!(created_keys.is_disjoint(&updated_keys));
790		}
791
792		#[test]
793		fn deleted_keys_come_from_previous(
794			previous in arb_previous_snapshot(),
795			current in arb_raw_rows(),
796		) {
797			let result = compute_diff(&previous, &current, "o", "q", 1);
798			for event in &result.deleted {
799				prop_assert!(previous.contains_key(&event.row_key));
800			}
801		}
802
803		#[test]
804		fn created_keys_not_in_previous(
805			previous in arb_previous_snapshot(),
806			current in arb_raw_rows(),
807		) {
808			let result = compute_diff(&previous, &current, "o", "q", 1);
809			for event in &result.created {
810				prop_assert!(!previous.contains_key(&event.row_key));
811			}
812		}
813
814		#[test]
815		fn updated_keys_in_both(
816			previous in arb_previous_snapshot(),
817			current in arb_raw_rows(),
818		) {
819			let current_keys: std::collections::HashSet<_> =
820				current.iter().map(|r| &r.row_key).collect();
821			let result = compute_diff(&previous, &current, "o", "q", 1);
822			for event in &result.updated {
823				prop_assert!(previous.contains_key(&event.row_key));
824				prop_assert!(current_keys.contains(&event.row_key));
825			}
826		}
827
828		#[test]
829		fn deleted_events_have_null_data(
830			previous in arb_previous_snapshot(),
831			current in arb_raw_rows(),
832		) {
833			let result = compute_diff(&previous, &current, "o", "q", 1);
834			for event in &result.deleted {
835				prop_assert_eq!(&event.row_data, &serde_json::Value::Null);
836				prop_assert_eq!(event.op, OpType::Deleted);
837			}
838		}
839
840		#[test]
841		fn created_events_have_correct_op(
842			previous in arb_previous_snapshot(),
843			current in arb_raw_rows(),
844		) {
845			let result = compute_diff(&previous, &current, "o", "q", 1);
846			for event in &result.created {
847				prop_assert_eq!(event.op, OpType::Created);
848			}
849			for event in &result.updated {
850				prop_assert_eq!(event.op, OpType::Updated);
851			}
852		}
853
854		#[test]
855		fn unique_keys_conservation(
856			previous in arb_previous_snapshot(),
857			current in arb_raw_rows(),
858		) {
859			// Deduplicate current by key (last-write-wins, matching compute_diff behavior)
860			let mut deduped: HashMap<&str, &RawRow> = HashMap::new();
861			for row in &current {
862				deduped.insert(&row.row_key, row);
863			}
864			let unique_current_keys: std::collections::HashSet<&str> =
865				deduped.keys().copied().collect();
866
867			let result = compute_diff(&previous, &current, "o", "q", 1);
868
869			// Every unique current key is either created, updated, or unchanged
870			let created_keys: std::collections::HashSet<&str> =
871				result.created.iter().map(|e| e.row_key.as_str()).collect();
872			let updated_keys: std::collections::HashSet<&str> =
873				result.updated.iter().map(|e| e.row_key.as_str()).collect();
874
875			// created + updated ≤ unique current keys
876			prop_assert!(created_keys.len() + updated_keys.len() <= unique_current_keys.len());
877
878			// Every deleted key is in previous but not in current
879			let deleted_keys: std::collections::HashSet<&str> =
880				result.deleted.iter().map(|e| e.row_key.as_str()).collect();
881			for dk in &deleted_keys {
882				prop_assert!(!unique_current_keys.contains(dk));
883			}
884		}
885
886		#[test]
887		fn empty_previous_means_no_updates_or_deletes(
888			current in arb_raw_rows(),
889		) {
890			let previous = HashMap::new();
891			let result = compute_diff(&previous, &current, "o", "q", 1);
892			prop_assert!(result.deleted.is_empty());
893			prop_assert!(result.updated.is_empty());
894			// Every row becomes a created event (including duplicate keys)
895			prop_assert_eq!(result.created.len(), current.len());
896		}
897
898		#[test]
899		fn empty_current_means_all_deleted(
900			previous in arb_previous_snapshot(),
901		) {
902			let current: Vec<RawRow> = vec![];
903			let result = compute_diff(&previous, &current, "o", "q", 1);
904			prop_assert!(result.created.is_empty());
905			prop_assert!(result.updated.is_empty());
906			prop_assert_eq!(result.deleted.len(), previous.len());
907		}
908
909		#[test]
910		fn hash_is_deterministic(data in arb_json_value()) {
911			let h1 = hash_row_data(&data);
912			let h2 = hash_row_data(&data);
913			prop_assert_eq!(h1, h2);
914		}
915
916		#[test]
917		fn hash_is_64_hex_chars(data in arb_json_value()) {
918			let h = hash_row_data(&data);
919			prop_assert_eq!(h.len(), 64);
920			prop_assert!(h.chars().all(|c| c.is_ascii_hexdigit()));
921		}
922	}
923}