1use std::path::{Path, PathBuf};
4
5use clap::Subcommand;
6
7use super::basic::load_dataset;
8use crate::{
9 drift::{DriftDetector, DriftSeverity, DriftTest},
10 sketch::{DataSketch, DistributedDriftDetector, SketchType},
11 ArrowDataset, Dataset,
12};
13
14#[derive(Subcommand)]
16pub enum DriftCommands {
17 Detect {
19 #[arg(short, long)]
21 reference: PathBuf,
22 #[arg(short, long)]
24 current: PathBuf,
25 #[arg(short, long, default_value = "ks,psi")]
27 tests: String,
28 #[arg(short, long, default_value = "0.05")]
30 alpha: f64,
31 #[arg(short, long, default_value = "text")]
33 format: String,
34 },
35 Report {
37 #[arg(short, long)]
39 reference: PathBuf,
40 #[arg(short, long)]
42 current: PathBuf,
43 #[arg(short, long)]
45 output: Option<PathBuf>,
46 },
47 Sketch {
49 input: PathBuf,
51 #[arg(short, long)]
53 output: PathBuf,
54 #[arg(short = 't', long, default_value = "tdigest")]
56 sketch_type: String,
57 #[arg(short, long)]
59 source: Option<String>,
60 #[arg(short, long, default_value = "json")]
62 format: String,
63 },
64 Merge {
66 #[arg(required = true)]
68 sketches: Vec<PathBuf>,
69 #[arg(short, long)]
71 output: PathBuf,
72 #[arg(short, long, default_value = "json")]
74 format: String,
75 },
76 Compare {
78 #[arg(short, long)]
80 reference: PathBuf,
81 #[arg(short, long)]
83 current: PathBuf,
84 #[arg(short = 't', long, default_value = "0.1")]
86 threshold: f64,
87 #[arg(short, long, default_value = "text")]
89 format: String,
90 },
91}
92
93pub(crate) fn parse_drift_tests(tests_str: &str) -> Vec<DriftTest> {
95 tests_str
96 .split(',')
97 .filter_map(|t| match t.trim().to_lowercase().as_str() {
98 "ks" => Some(DriftTest::KolmogorovSmirnov),
99 "chi2" | "chisquared" => Some(DriftTest::ChiSquared),
100 "psi" => Some(DriftTest::PSI),
101 "js" | "jensenshannon" => Some(DriftTest::JensenShannon),
102 _ => None,
103 })
104 .collect()
105}
106
107pub(crate) fn severity_symbol(severity: DriftSeverity) -> &'static str {
109 match severity {
110 DriftSeverity::None => "\u{2713}", DriftSeverity::Low => "\u{25CB}", DriftSeverity::Medium => "\u{25CF}", DriftSeverity::High => "\u{25B2}", DriftSeverity::Critical => "\u{2716}", }
116}
117
118pub(crate) fn cmd_drift_detect(
120 reference: &Path,
121 current: &Path,
122 tests_str: &str,
123 alpha: f64,
124 format: &str,
125) -> crate::Result<()> {
126 let ref_dataset = load_dataset(reference)?;
127 let cur_dataset = load_dataset(current)?;
128
129 let tests = parse_drift_tests(tests_str);
130 if tests.is_empty() {
131 return Err(crate::Error::invalid_config(
132 "No valid tests specified. Use: ks, chi2, psi, js",
133 ));
134 }
135
136 let report = run_drift_detection(ref_dataset, &cur_dataset, tests, alpha)?;
137
138 if format == "json" {
139 print_drift_report_json(&report)
140 } else {
141 print_drift_report_text(&report, reference, current, alpha);
142 Ok(())
143 }
144}
145
146fn run_drift_detection(
147 ref_dataset: ArrowDataset,
148 cur_dataset: &ArrowDataset,
149 tests: Vec<DriftTest>,
150 alpha: f64,
151) -> crate::Result<crate::drift::DriftReport> {
152 let mut detector = DriftDetector::new(ref_dataset).with_alpha(alpha);
153 for test in tests {
154 detector = detector.with_test(test);
155 }
156 detector.detect(cur_dataset)
157}
158
159fn print_drift_report_json(report: &crate::drift::DriftReport) -> crate::Result<()> {
160 let json = serde_json::json!({
161 "drift_detected": report.drift_detected,
162 "columns": report.column_scores.values().map(|d| {
163 serde_json::json!({
164 "column": d.column,
165 "test": format!("{:?}", d.test),
166 "statistic": d.statistic,
167 "p_value": d.p_value,
168 "drift_detected": d.drift_detected,
169 "severity": format!("{:?}", d.severity),
170 })
171 }).collect::<Vec<_>>()
172 });
173 println!(
174 "{}",
175 serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?
176 );
177 Ok(())
178}
179
180fn print_drift_report_text(
181 report: &crate::drift::DriftReport,
182 reference: &Path,
183 current: &Path,
184 alpha: f64,
185) {
186 println!("Drift Detection Report");
187 println!("======================");
188 println!("Reference: {}", reference.display());
189 println!("Current: {}", current.display());
190 println!("Alpha: {}", alpha);
191 println!();
192
193 if report.drift_detected {
194 println!("\u{26A0}\u{FE0F} DRIFT DETECTED\n");
195 } else {
196 println!("\u{2713} No significant drift detected\n");
197 }
198
199 print_drift_table(report);
200 print_drifted_columns(report);
201}
202
203fn print_drift_table(report: &crate::drift::DriftReport) {
204 println!(
205 "{:<20} {:<15} {:<12} {:<12} {:<10} DRIFT",
206 "COLUMN", "TEST", "STATISTIC", "P-VALUE", "SEVERITY"
207 );
208 println!("{}", "-".repeat(80));
209
210 for drift in report.column_scores.values() {
211 let drift_str = if drift.drift_detected { "YES" } else { "no" };
212 let p_value_str = drift
213 .p_value
214 .map_or_else(|| "N/A".to_string(), |p| format!("{:.4}", p));
215 println!(
216 "{:<20} {:<15} {:<12.4} {:<12} {:<10} {} {}",
217 drift.column,
218 format!("{:?}", drift.test),
219 drift.statistic,
220 p_value_str,
221 format!("{:?}", drift.severity),
222 severity_symbol(drift.severity),
223 drift_str
224 );
225 }
226}
227
228fn print_drifted_columns(report: &crate::drift::DriftReport) {
229 println!();
230 let drifted: Vec<_> = report
231 .column_scores
232 .values()
233 .filter(|d| d.drift_detected)
234 .map(|d| d.column.clone())
235 .collect();
236 if !drifted.is_empty() {
237 println!("Columns with drift: {}", drifted.join(", "));
238 }
239}
240
241pub(crate) fn cmd_drift_report(
243 reference: &Path,
244 current: &Path,
245 output: Option<&PathBuf>,
246) -> crate::Result<()> {
247 let ref_dataset = load_dataset(reference)?;
248 let cur_dataset = load_dataset(current)?;
249
250 let detector = DriftDetector::new(ref_dataset)
251 .with_test(DriftTest::KolmogorovSmirnov)
252 .with_test(DriftTest::PSI)
253 .with_test(DriftTest::JensenShannon);
254
255 let report = detector.detect(&cur_dataset)?;
256
257 let drifted_count = report
258 .column_scores
259 .values()
260 .filter(|d| d.drift_detected)
261 .count();
262 let json = serde_json::json!({
263 "reference": reference.display().to_string(),
264 "current": current.display().to_string(),
265 "drift_detected": report.drift_detected,
266 "summary": {
267 "total_columns": report.column_scores.len(),
268 "drifted_columns": drifted_count,
269 },
270 "columns": report.column_scores.values().map(|d| {
271 serde_json::json!({
272 "column": d.column,
273 "test": format!("{:?}", d.test),
274 "statistic": d.statistic,
275 "p_value": d.p_value,
276 "drift_detected": d.drift_detected,
277 "severity": format!("{:?}", d.severity),
278 })
279 }).collect::<Vec<_>>()
280 });
281
282 let json_str =
283 serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
284
285 if let Some(output_path) = output {
286 std::fs::write(output_path, &json_str).map_err(|e| crate::Error::io(e, output_path))?;
287 println!("Drift report written to: {}", output_path.display());
288 } else {
289 println!("{}", json_str);
290 }
291
292 Ok(())
293}
294
295pub(crate) fn parse_sketch_type(s: &str) -> Option<SketchType> {
297 match s.to_lowercase().as_str() {
298 "tdigest" | "t-digest" => Some(SketchType::TDigest),
299 "ddsketch" | "dd-sketch" => Some(SketchType::DDSketch),
300 _ => None,
301 }
302}
303
304pub(crate) fn cmd_drift_sketch(
306 input: &Path,
307 output: &Path,
308 sketch_type: &str,
309 source: Option<&str>,
310 format: &str,
311) -> crate::Result<()> {
312 let sketch_type = parse_sketch_type(sketch_type).ok_or_else(|| {
313 crate::Error::invalid_config(format!(
314 "Unknown sketch type: {}. Use 'tdigest' or 'ddsketch'",
315 sketch_type
316 ))
317 })?;
318
319 let dataset = load_dataset(input)?;
320 let mut sketch = DataSketch::from_dataset(&dataset, sketch_type)?;
321
322 if let Some(src) = source {
323 sketch = sketch.with_source(src);
324 }
325
326 match format {
327 "binary" | "bin" => {
328 let bytes = sketch.to_bytes()?;
329 std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
330 }
331 _ => {
332 let json = serde_json::to_string_pretty(&sketch)
334 .map_err(|e| crate::Error::Format(e.to_string()))?;
335 std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
336 }
337 }
338
339 println!(
340 "Created {} sketch from {} ({} rows) -> {}",
341 sketch_type.name(),
342 input.display(),
343 dataset.len(),
344 output.display()
345 );
346
347 Ok(())
348}
349
350pub(crate) fn load_sketch(path: &Path) -> crate::Result<DataSketch> {
352 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
353
354 match ext {
355 "bin" | "binary" => {
356 let bytes = std::fs::read(path).map_err(|e| crate::Error::io(e, path))?;
357 DataSketch::from_bytes(&bytes)
358 }
359 _ => {
360 let json = std::fs::read_to_string(path).map_err(|e| crate::Error::io(e, path))?;
362 serde_json::from_str(&json)
363 .map_err(|e| crate::Error::Format(format!("Invalid sketch JSON: {}", e)))
364 }
365 }
366}
367
368pub(crate) fn cmd_drift_merge(
370 sketches: &[PathBuf],
371 output: &Path,
372 format: &str,
373) -> crate::Result<()> {
374 if sketches.is_empty() {
375 return Err(crate::Error::invalid_config(
376 "No sketches provided to merge",
377 ));
378 }
379
380 let loaded: Vec<DataSketch> = sketches
381 .iter()
382 .map(|p| load_sketch(p))
383 .collect::<Result<Vec<_>, _>>()?;
384
385 let merged = DataSketch::merge(&loaded)?;
386
387 match format {
388 "binary" | "bin" => {
389 let bytes = merged.to_bytes()?;
390 std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
391 }
392 _ => {
393 let json = serde_json::to_string_pretty(&merged)
394 .map_err(|e| crate::Error::Format(e.to_string()))?;
395 std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
396 }
397 }
398
399 println!(
400 "Merged {} sketches ({} total rows) -> {}",
401 sketches.len(),
402 merged.row_count,
403 output.display()
404 );
405
406 Ok(())
407}
408
409pub(crate) fn cmd_drift_compare(
411 reference: &Path,
412 current: &Path,
413 threshold: f64,
414 format: &str,
415) -> crate::Result<()> {
416 let ref_sketch = load_sketch(reference)?;
417 let cur_sketch = load_sketch(current)?;
418
419 let detector = DistributedDriftDetector::new()
420 .with_sketch_type(ref_sketch.sketch_type)
421 .with_threshold(threshold);
422
423 let results = detector.compare(&ref_sketch, &cur_sketch)?;
424
425 let drift_detected = results.iter().any(|r| r.severity.is_drift());
426
427 if format == "json" {
428 let json = serde_json::json!({
429 "reference": reference.display().to_string(),
430 "current": current.display().to_string(),
431 "drift_detected": drift_detected,
432 "threshold": threshold,
433 "columns": results.iter().map(|r| {
434 serde_json::json!({
435 "column": r.column,
436 "statistic": r.statistic,
437 "severity": format!("{:?}", r.severity),
438 "drift_detected": r.severity.is_drift(),
439 })
440 }).collect::<Vec<_>>()
441 });
442
443 let json_str =
444 serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
445 println!("{}", json_str);
446 } else {
447 println!("Sketch Drift Comparison");
449 println!("=======================");
450 println!("Reference: {}", reference.display());
451 println!("Current: {}", current.display());
452 println!("Threshold: {}", threshold);
453 println!();
454
455 if results.is_empty() {
456 println!("No numeric columns to compare.");
457 } else {
458 println!(
459 "{:<20} {:>10} {:>10} DRIFT",
460 "COLUMN", "STATISTIC", "SEVERITY"
461 );
462 println!("{}", "-".repeat(55));
463
464 for result in &results {
465 let drift_symbol = if result.severity.is_drift() {
466 severity_symbol(result.severity)
467 } else {
468 "\u{2713}"
469 };
470 println!(
471 "{:<20} {:>10.4} {:>10} {}",
472 result.column,
473 result.statistic,
474 format!("{:?}", result.severity),
475 drift_symbol
476 );
477 }
478
479 println!();
480 if drift_detected {
481 println!("\u{26A0} Drift detected in one or more columns");
482 } else {
483 println!("\u{2713} No significant drift detected");
484 }
485 }
486 }
487
488 Ok(())
489}
490
491#[cfg(test)]
492#[allow(
493 clippy::cast_possible_truncation,
494 clippy::cast_possible_wrap,
495 clippy::cast_precision_loss,
496 clippy::uninlined_format_args,
497 clippy::unwrap_used,
498 clippy::expect_used,
499 clippy::redundant_clone,
500 clippy::cast_lossless,
501 clippy::redundant_closure_for_method_calls,
502 clippy::too_many_lines,
503 clippy::float_cmp,
504 clippy::similar_names,
505 clippy::needless_late_init,
506 clippy::redundant_pattern_matching
507)]
508mod tests {
509 use std::sync::Arc;
510
511 use arrow::{
512 array::{Float64Array, Int32Array, StringArray},
513 datatypes::{DataType, Field, Schema},
514 };
515
516 use super::*;
517 use crate::ArrowDataset;
518
519 fn create_test_parquet(path: &Path, rows: usize) {
520 let schema = Arc::new(Schema::new(vec![
521 Field::new("id", DataType::Int32, false),
522 Field::new("name", DataType::Utf8, false),
523 ]));
524
525 let ids: Vec<i32> = (0..rows as i32).collect();
526 let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
527
528 let batch = arrow::array::RecordBatch::try_new(
529 schema,
530 vec![
531 Arc::new(Int32Array::from(ids)),
532 Arc::new(StringArray::from(names)),
533 ],
534 )
535 .ok()
536 .unwrap_or_else(|| panic!("Should create batch"));
537
538 let dataset = ArrowDataset::from_batch(batch)
539 .ok()
540 .unwrap_or_else(|| panic!("Should create dataset"));
541
542 dataset
543 .to_parquet(path)
544 .ok()
545 .unwrap_or_else(|| panic!("Should write parquet"));
546 }
547
548 fn create_test_float_parquet(path: &Path, rows: usize) {
549 let schema = Arc::new(Schema::new(vec![
550 Field::new("id", DataType::Int32, false),
551 Field::new("value", DataType::Float64, false),
552 ]));
553
554 let ids: Vec<i32> = (0..rows as i32).collect();
555 let values: Vec<f64> = ids.iter().map(|i| *i as f64 * 1.5).collect();
556
557 let batch = arrow::array::RecordBatch::try_new(
558 schema,
559 vec![
560 Arc::new(Int32Array::from(ids)),
561 Arc::new(Float64Array::from(values)),
562 ],
563 )
564 .ok()
565 .unwrap_or_else(|| panic!("Should create batch"));
566
567 let dataset = ArrowDataset::from_batch(batch)
568 .ok()
569 .unwrap_or_else(|| panic!("Should create dataset"));
570
571 dataset
572 .to_parquet(path)
573 .ok()
574 .unwrap_or_else(|| panic!("Should write parquet"));
575 }
576
577 #[test]
578 fn test_parse_drift_tests() {
579 let tests = parse_drift_tests("ks,psi");
580 assert_eq!(tests.len(), 2);
581
582 let tests = parse_drift_tests("ks, chi2, psi, js");
583 assert_eq!(tests.len(), 4);
584
585 let tests = parse_drift_tests("invalid");
586 assert!(tests.is_empty());
587
588 let tests = parse_drift_tests("KS,PSI");
589 assert_eq!(tests.len(), 2);
590 }
591
592 #[test]
593 fn test_severity_symbol() {
594 assert_eq!(severity_symbol(DriftSeverity::None), "\u{2713}");
595 assert_eq!(severity_symbol(DriftSeverity::Low), "\u{25CB}");
596 assert_eq!(severity_symbol(DriftSeverity::Medium), "\u{25CF}");
597 assert_eq!(severity_symbol(DriftSeverity::High), "\u{25B2}");
598 assert_eq!(severity_symbol(DriftSeverity::Critical), "\u{2716}");
599 }
600
601 #[test]
602 fn test_cmd_drift_detect_same_data() {
603 let temp_dir = tempfile::tempdir()
604 .ok()
605 .unwrap_or_else(|| panic!("Should create temp dir"));
606 let path = temp_dir.path().join("data.parquet");
607 create_test_parquet(&path, 100);
608
609 let result = cmd_drift_detect(&path, &path, "ks,psi", 0.05, "text");
611 assert!(result.is_ok());
612 }
613
614 #[test]
615 fn test_cmd_drift_detect_json_format() {
616 let temp_dir = tempfile::tempdir()
617 .ok()
618 .unwrap_or_else(|| panic!("Should create temp dir"));
619 let path = temp_dir.path().join("data.parquet");
620 create_test_parquet(&path, 100);
621
622 let result = cmd_drift_detect(&path, &path, "ks", 0.05, "json");
623 assert!(result.is_ok());
624 }
625
626 #[test]
627 fn test_cmd_drift_detect_invalid_tests() {
628 let temp_dir = tempfile::tempdir()
629 .ok()
630 .unwrap_or_else(|| panic!("Should create temp dir"));
631 let path = temp_dir.path().join("data.parquet");
632 create_test_parquet(&path, 100);
633
634 let result = cmd_drift_detect(&path, &path, "invalid", 0.05, "text");
635 assert!(result.is_err());
636 }
637
638 #[test]
639 fn test_cmd_drift_report() {
640 let temp_dir = tempfile::tempdir()
641 .ok()
642 .unwrap_or_else(|| panic!("Should create temp dir"));
643 let path = temp_dir.path().join("data.parquet");
644 create_test_parquet(&path, 100);
645
646 let result = cmd_drift_report(&path, &path, None);
648 assert!(result.is_ok());
649 }
650
651 #[test]
652 fn test_cmd_drift_report_to_file() {
653 let temp_dir = tempfile::tempdir()
654 .ok()
655 .unwrap_or_else(|| panic!("Should create temp dir"));
656 let data_path = temp_dir.path().join("data.parquet");
657 let output_path = temp_dir.path().join("report.json");
658 create_test_parquet(&data_path, 100);
659
660 let result = cmd_drift_report(&data_path, &data_path, Some(&output_path));
661 assert!(result.is_ok());
662 assert!(output_path.exists());
663
664 let content = std::fs::read_to_string(&output_path)
666 .ok()
667 .unwrap_or_else(|| panic!("Should read file"));
668 let parsed: serde_json::Value = serde_json::from_str(&content)
669 .ok()
670 .unwrap_or_else(|| panic!("Should parse JSON"));
671 assert!(parsed.get("drift_detected").is_some());
672 }
673
674 #[test]
675 fn test_parse_sketch_type() {
676 assert!(matches!(
677 parse_sketch_type("tdigest"),
678 Some(SketchType::TDigest)
679 ));
680 assert!(matches!(
681 parse_sketch_type("TDIGEST"),
682 Some(SketchType::TDigest)
683 ));
684 assert!(matches!(
685 parse_sketch_type("ddsketch"),
686 Some(SketchType::DDSketch)
687 ));
688 assert!(matches!(
689 parse_sketch_type("DDSKETCH"),
690 Some(SketchType::DDSketch)
691 ));
692 assert!(parse_sketch_type("invalid").is_none());
693 }
694
695 #[test]
696 fn test_cmd_drift_sketch_tdigest() {
697 let temp_dir = tempfile::tempdir()
698 .ok()
699 .unwrap_or_else(|| panic!("Should create temp dir"));
700 let data_path = temp_dir.path().join("data.parquet");
701 let sketch_path = temp_dir.path().join("sketch.json");
702 create_test_float_parquet(&data_path, 100);
703
704 let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "json");
705 assert!(result.is_ok());
706 assert!(sketch_path.exists());
707
708 let content = std::fs::read_to_string(&sketch_path)
710 .ok()
711 .unwrap_or_else(|| panic!("Should read file"));
712 let parsed: serde_json::Value = serde_json::from_str(&content)
713 .ok()
714 .unwrap_or_else(|| panic!("Should parse JSON"));
715 assert!(parsed.get("sketch_type").is_some());
716 assert!(parsed.get("row_count").is_some());
717 }
718
719 #[test]
720 fn test_cmd_drift_sketch_ddsketch_type() {
721 let temp_dir = tempfile::tempdir()
722 .ok()
723 .unwrap_or_else(|| panic!("Should create temp dir"));
724 let data_path = temp_dir.path().join("data.parquet");
725 let sketch_path = temp_dir.path().join("sketch.json");
726 create_test_float_parquet(&data_path, 100);
727
728 let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
729 assert!(result.is_ok());
730 assert!(sketch_path.exists());
731 }
732
733 #[test]
734 fn test_cmd_drift_sketch_with_source() {
735 let temp_dir = tempfile::tempdir()
736 .ok()
737 .unwrap_or_else(|| panic!("Should create temp dir"));
738 let data_path = temp_dir.path().join("data.parquet");
739 let sketch_path = temp_dir.path().join("sketch.json");
740 create_test_float_parquet(&data_path, 50);
741
742 let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", Some("node-1"), "json");
743 assert!(result.is_ok());
744
745 let content = std::fs::read_to_string(&sketch_path)
747 .ok()
748 .unwrap_or_else(|| panic!("Should read file"));
749 let parsed: serde_json::Value = serde_json::from_str(&content)
750 .ok()
751 .unwrap_or_else(|| panic!("Should parse JSON"));
752 assert_eq!(
753 parsed.get("source").and_then(|v| v.as_str()),
754 Some("node-1")
755 );
756 }
757
758 #[test]
759 fn test_cmd_drift_sketch_binary_format() {
760 let temp_dir = tempfile::tempdir()
761 .ok()
762 .unwrap_or_else(|| panic!("Should create temp dir"));
763 let data_path = temp_dir.path().join("data.parquet");
764 let sketch_path = temp_dir.path().join("sketch.bin");
765 create_test_float_parquet(&data_path, 100);
766
767 let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "binary");
768 assert!(result.is_ok());
769 assert!(sketch_path.exists());
770
771 let metadata = std::fs::metadata(&sketch_path)
773 .ok()
774 .unwrap_or_else(|| panic!("Should get metadata"));
775 assert!(metadata.len() > 0);
776 }
777
778 #[test]
779 fn test_cmd_drift_sketch_invalid_type() {
780 let temp_dir = tempfile::tempdir()
781 .ok()
782 .unwrap_or_else(|| panic!("Should create temp dir"));
783 let data_path = temp_dir.path().join("data.parquet");
784 let sketch_path = temp_dir.path().join("sketch.json");
785 create_test_float_parquet(&data_path, 100);
786
787 let result = cmd_drift_sketch(&data_path, &sketch_path, "invalid", None, "json");
788 assert!(result.is_err());
789 }
790
791 #[test]
792 fn test_cmd_drift_merge_tdigest() {
793 let temp_dir = tempfile::tempdir()
794 .ok()
795 .unwrap_or_else(|| panic!("Should create temp dir"));
796
797 let data1 = temp_dir.path().join("data1.parquet");
799 let data2 = temp_dir.path().join("data2.parquet");
800 let sketch1 = temp_dir.path().join("sketch1.json");
801 let sketch2 = temp_dir.path().join("sketch2.json");
802 let merged = temp_dir.path().join("merged.json");
803
804 create_test_float_parquet(&data1, 50);
805 create_test_float_parquet(&data2, 50);
806
807 cmd_drift_sketch(&data1, &sketch1, "tdigest", Some("node-1"), "json")
809 .ok()
810 .unwrap_or_else(|| panic!("Should create sketch1"));
811 cmd_drift_sketch(&data2, &sketch2, "tdigest", Some("node-2"), "json")
812 .ok()
813 .unwrap_or_else(|| panic!("Should create sketch2"));
814
815 let sketches = vec![sketch1.clone(), sketch2.clone()];
817 let result = cmd_drift_merge(&sketches, &merged, "json");
818 assert!(result.is_ok());
819 assert!(merged.exists());
820
821 let content = std::fs::read_to_string(&merged)
823 .ok()
824 .unwrap_or_else(|| panic!("Should read file"));
825 let parsed: serde_json::Value = serde_json::from_str(&content)
826 .ok()
827 .unwrap_or_else(|| panic!("Should parse JSON"));
828 assert_eq!(parsed.get("row_count").and_then(|v| v.as_u64()), Some(100));
829 }
830
831 #[test]
832 fn test_cmd_drift_merge_single_sketch() {
833 let temp_dir = tempfile::tempdir()
834 .ok()
835 .unwrap_or_else(|| panic!("Should create temp dir"));
836
837 let data = temp_dir.path().join("data.parquet");
838 let sketch = temp_dir.path().join("sketch.json");
839 let merged = temp_dir.path().join("merged.json");
840
841 create_test_float_parquet(&data, 100);
842 cmd_drift_sketch(&data, &sketch, "tdigest", None, "json")
843 .ok()
844 .unwrap_or_else(|| panic!("Should create sketch"));
845
846 let sketches = vec![sketch.clone()];
848 let result = cmd_drift_merge(&sketches, &merged, "json");
849 assert!(result.is_ok());
850 }
851
852 #[test]
853 fn test_cmd_drift_merge_empty_fails() {
854 let temp_dir = tempfile::tempdir()
855 .ok()
856 .unwrap_or_else(|| panic!("Should create temp dir"));
857 let merged = temp_dir.path().join("merged.json");
858
859 let sketches: Vec<PathBuf> = vec![];
860 let result = cmd_drift_merge(&sketches, &merged, "json");
861 assert!(result.is_err());
862 }
863
864 #[test]
865 fn test_cmd_drift_compare_no_drift() {
866 let temp_dir = tempfile::tempdir()
867 .ok()
868 .unwrap_or_else(|| panic!("Should create temp dir"));
869
870 let data = temp_dir.path().join("data.parquet");
871 let sketch1 = temp_dir.path().join("sketch1.json");
872 let sketch2 = temp_dir.path().join("sketch2.json");
873
874 create_test_float_parquet(&data, 100);
875
876 cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
878 .ok()
879 .unwrap_or_else(|| panic!("Should create sketch1"));
880 cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
881 .ok()
882 .unwrap_or_else(|| panic!("Should create sketch2"));
883
884 let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "text");
885 assert!(result.is_ok());
886 }
887
888 #[test]
889 fn test_cmd_drift_compare_json_format() {
890 let temp_dir = tempfile::tempdir()
891 .ok()
892 .unwrap_or_else(|| panic!("Should create temp dir"));
893
894 let data = temp_dir.path().join("data.parquet");
895 let sketch1 = temp_dir.path().join("sketch1.json");
896 let sketch2 = temp_dir.path().join("sketch2.json");
897
898 create_test_float_parquet(&data, 100);
899
900 cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
901 .ok()
902 .unwrap_or_else(|| panic!("Should create sketch1"));
903 cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
904 .ok()
905 .unwrap_or_else(|| panic!("Should create sketch2"));
906
907 let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "json");
908 assert!(result.is_ok());
909 }
910
911 #[test]
912 fn test_cmd_drift_merge_binary_format() {
913 let temp_dir = tempfile::tempdir()
914 .ok()
915 .unwrap_or_else(|| panic!("Should create temp dir"));
916
917 let data = temp_dir.path().join("data.parquet");
918 let sketch = temp_dir.path().join("sketch.bin");
919 let merged = temp_dir.path().join("merged.bin");
920
921 create_test_float_parquet(&data, 100);
922 cmd_drift_sketch(&data, &sketch, "tdigest", None, "binary")
923 .ok()
924 .unwrap_or_else(|| panic!("Should create sketch"));
925
926 let sketches = vec![sketch.clone()];
927 let result = cmd_drift_merge(&sketches, &merged, "binary");
928 assert!(result.is_ok());
929 assert!(merged.exists());
930 }
931
932 #[test]
933 fn test_load_sketch_invalid_json() {
934 let temp_dir = tempfile::tempdir()
935 .ok()
936 .unwrap_or_else(|| panic!("Should create temp dir"));
937 let sketch_path = temp_dir.path().join("invalid.json");
938
939 std::fs::write(&sketch_path, "{ invalid json }")
940 .ok()
941 .unwrap_or_else(|| panic!("Should write file"));
942
943 let result = load_sketch(&sketch_path);
944 assert!(result.is_err());
945 }
946
947 #[test]
948 fn test_cmd_drift_detect_all_tests() {
949 let temp_dir = tempfile::tempdir()
950 .ok()
951 .unwrap_or_else(|| panic!("Should create temp dir"));
952 let path = temp_dir.path().join("data.parquet");
953 create_test_parquet(&path, 100);
954
955 let result = cmd_drift_detect(&path, &path, "ks,chi2,psi,js", 0.05, "text");
956 assert!(result.is_ok());
957 }
958
959 #[test]
960 fn test_cmd_drift_detect_with_drift() {
961 let temp_dir = tempfile::tempdir()
962 .ok()
963 .unwrap_or_else(|| panic!("Should create temp dir"));
964 let ref_path = temp_dir.path().join("ref.parquet");
965 let cur_path = temp_dir.path().join("cur.parquet");
966
967 create_test_parquet(&ref_path, 100);
968
969 let schema = Arc::new(Schema::new(vec![
970 Field::new("id", DataType::Int32, false),
971 Field::new("name", DataType::Utf8, false),
972 ]));
973
974 let ids: Vec<i32> = (500..600).collect();
975 let names: Vec<String> = ids.iter().map(|i| format!("different_{}", i)).collect();
976
977 let batch = arrow::array::RecordBatch::try_new(
978 schema,
979 vec![
980 Arc::new(Int32Array::from(ids)),
981 Arc::new(StringArray::from(names)),
982 ],
983 )
984 .ok()
985 .unwrap_or_else(|| panic!("Should create batch"));
986
987 let dataset = ArrowDataset::from_batch(batch)
988 .ok()
989 .unwrap_or_else(|| panic!("Should create dataset"));
990
991 dataset
992 .to_parquet(&cur_path)
993 .ok()
994 .unwrap_or_else(|| panic!("Should write parquet"));
995
996 let result = cmd_drift_detect(&ref_path, &cur_path, "ks,psi", 0.05, "text");
997 assert!(result.is_ok());
998 }
999
1000 #[test]
1001 fn test_cmd_drift_compare_empty_results() {
1002 let temp_dir = tempfile::tempdir()
1003 .ok()
1004 .unwrap_or_else(|| panic!("Should create temp dir"));
1005
1006 let data_path = temp_dir.path().join("strings.parquet");
1008 let sketch1_path = temp_dir.path().join("sketch1.json");
1009 let sketch2_path = temp_dir.path().join("sketch2.json");
1010
1011 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
1012
1013 let names: Vec<String> = (0..50).map(|i| format!("name_{}", i)).collect();
1014
1015 let batch =
1016 arrow::array::RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(names))])
1017 .ok()
1018 .unwrap_or_else(|| panic!("Should create batch"));
1019
1020 let dataset = ArrowDataset::from_batch(batch)
1021 .ok()
1022 .unwrap_or_else(|| panic!("Should create dataset"));
1023
1024 dataset
1025 .to_parquet(&data_path)
1026 .ok()
1027 .unwrap_or_else(|| panic!("Should write parquet"));
1028
1029 cmd_drift_sketch(&data_path, &sketch1_path, "tdigest", None, "json")
1030 .ok()
1031 .unwrap_or_else(|| panic!("Should create sketch1"));
1032 cmd_drift_sketch(&data_path, &sketch2_path, "tdigest", None, "json")
1033 .ok()
1034 .unwrap_or_else(|| panic!("Should create sketch2"));
1035
1036 let result = cmd_drift_compare(&sketch1_path, &sketch2_path, 0.1, "text");
1037 assert!(result.is_ok());
1038 }
1039
1040 #[test]
1041 fn test_parse_drift_tests_single() {
1042 let tests = parse_drift_tests("ks");
1043 assert_eq!(tests.len(), 1);
1044 assert!(matches!(tests[0], DriftTest::KolmogorovSmirnov));
1045 }
1046
1047 #[test]
1048 fn test_parse_drift_tests_multiple() {
1049 let tests = parse_drift_tests("ks,chi2,psi,js");
1050 assert_eq!(tests.len(), 4);
1051 }
1052
1053 #[test]
1054 fn test_parse_drift_tests_with_spaces() {
1055 let tests = parse_drift_tests("ks, chi2 ,psi");
1056 assert_eq!(tests.len(), 3);
1057 }
1058
1059 #[test]
1060 fn test_parse_drift_tests_unknown() {
1061 let tests = parse_drift_tests("unknown");
1062 assert!(tests.is_empty());
1063 }
1064
1065 #[test]
1066 fn test_parse_sketch_type_tdigest() {
1067 assert!(matches!(
1068 parse_sketch_type("tdigest"),
1069 Some(SketchType::TDigest)
1070 ));
1071 }
1072
1073 #[test]
1074 fn test_parse_sketch_type_ddsketch() {
1075 assert!(matches!(
1076 parse_sketch_type("ddsketch"),
1077 Some(SketchType::DDSketch)
1078 ));
1079 }
1080
1081 #[test]
1082 fn test_parse_sketch_type_unknown() {
1083 assert!(parse_sketch_type("unknown").is_none());
1084 }
1085
1086 #[test]
1087 fn test_cmd_drift_report_basic() {
1088 let temp_dir = tempfile::tempdir()
1089 .ok()
1090 .unwrap_or_else(|| panic!("Should create temp dir"));
1091 let ref_path = temp_dir.path().join("ref.parquet");
1092 let cur_path = temp_dir.path().join("cur.parquet");
1093 create_test_parquet(&ref_path, 100);
1094 create_test_parquet(&cur_path, 100);
1095
1096 let result = cmd_drift_report(&ref_path, &cur_path, None);
1097 assert!(result.is_ok());
1098 }
1099
1100 #[test]
1101 fn test_cmd_drift_report_with_output() {
1102 let temp_dir = tempfile::tempdir()
1103 .ok()
1104 .unwrap_or_else(|| panic!("Should create temp dir"));
1105 let ref_path = temp_dir.path().join("ref.parquet");
1106 let cur_path = temp_dir.path().join("cur.parquet");
1107 let output = temp_dir.path().join("report.html");
1108 create_test_parquet(&ref_path, 100);
1109 create_test_parquet(&cur_path, 100);
1110
1111 let result = cmd_drift_report(&ref_path, &cur_path, Some(&output));
1112 assert!(result.is_ok());
1113 assert!(output.exists());
1114 }
1115
1116 #[test]
1117 fn test_cmd_drift_sketch_ddsketch_json_output() {
1118 let temp_dir = tempfile::tempdir()
1119 .ok()
1120 .unwrap_or_else(|| panic!("Should create temp dir"));
1121 let data_path = temp_dir.path().join("data.parquet");
1122 let sketch_path = temp_dir.path().join("sketch.json");
1123 create_test_parquet(&data_path, 100);
1124
1125 let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
1126 assert!(result.is_ok());
1127 }
1128
1129 #[test]
1130 fn test_cmd_drift_merge() {
1131 let temp_dir = tempfile::tempdir()
1132 .ok()
1133 .unwrap_or_else(|| panic!("Should create temp dir"));
1134 let data1 = temp_dir.path().join("data1.parquet");
1135 let data2 = temp_dir.path().join("data2.parquet");
1136 let sketch1 = temp_dir.path().join("sketch1.json");
1137 let sketch2 = temp_dir.path().join("sketch2.json");
1138 let merged = temp_dir.path().join("merged.json");
1139
1140 let schema = Arc::new(Schema::new(vec![Field::new(
1142 "value",
1143 DataType::Float64,
1144 false,
1145 )]));
1146
1147 let batch1 = arrow::array::RecordBatch::try_new(
1148 schema.clone(),
1149 vec![Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]))],
1150 )
1151 .unwrap();
1152 let dataset1 = ArrowDataset::from_batch(batch1).unwrap();
1153 dataset1.to_parquet(&data1).unwrap();
1154
1155 let batch2 = arrow::array::RecordBatch::try_new(
1156 schema,
1157 vec![Arc::new(Float64Array::from(vec![6.0, 7.0, 8.0, 9.0, 10.0]))],
1158 )
1159 .unwrap();
1160 let dataset2 = ArrowDataset::from_batch(batch2).unwrap();
1161 dataset2.to_parquet(&data2).unwrap();
1162
1163 cmd_drift_sketch(&data1, &sketch1, "tdigest", None, "json").unwrap();
1164 cmd_drift_sketch(&data2, &sketch2, "tdigest", None, "json").unwrap();
1165
1166 let sketches = vec![sketch1, sketch2];
1167 let result = cmd_drift_merge(&sketches, &merged, "json");
1168 assert!(result.is_ok());
1169 }
1170}