1use std::path::{Path, PathBuf};
4
5use clap::Subcommand;
6
7use super::basic::load_dataset;
8use crate::{
9 drift::{DriftDetector, DriftSeverity, DriftTest},
10 sketch::{DataSketch, DistributedDriftDetector, SketchType},
11 Dataset,
12};
13
14#[derive(Subcommand)]
16pub enum DriftCommands {
17 Detect {
19 #[arg(short, long)]
21 reference: PathBuf,
22 #[arg(short, long)]
24 current: PathBuf,
25 #[arg(short, long, default_value = "ks,psi")]
27 tests: String,
28 #[arg(short, long, default_value = "0.05")]
30 alpha: f64,
31 #[arg(short, long, default_value = "text")]
33 format: String,
34 },
35 Report {
37 #[arg(short, long)]
39 reference: PathBuf,
40 #[arg(short, long)]
42 current: PathBuf,
43 #[arg(short, long)]
45 output: Option<PathBuf>,
46 },
47 Sketch {
49 input: PathBuf,
51 #[arg(short, long)]
53 output: PathBuf,
54 #[arg(short = 't', long, default_value = "tdigest")]
56 sketch_type: String,
57 #[arg(short, long)]
59 source: Option<String>,
60 #[arg(short, long, default_value = "json")]
62 format: String,
63 },
64 Merge {
66 #[arg(required = true)]
68 sketches: Vec<PathBuf>,
69 #[arg(short, long)]
71 output: PathBuf,
72 #[arg(short, long, default_value = "json")]
74 format: String,
75 },
76 Compare {
78 #[arg(short, long)]
80 reference: PathBuf,
81 #[arg(short, long)]
83 current: PathBuf,
84 #[arg(short = 't', long, default_value = "0.1")]
86 threshold: f64,
87 #[arg(short, long, default_value = "text")]
89 format: String,
90 },
91}
92
93pub(crate) fn parse_drift_tests(tests_str: &str) -> Vec<DriftTest> {
95 tests_str
96 .split(',')
97 .filter_map(|t| match t.trim().to_lowercase().as_str() {
98 "ks" => Some(DriftTest::KolmogorovSmirnov),
99 "chi2" | "chisquared" => Some(DriftTest::ChiSquared),
100 "psi" => Some(DriftTest::PSI),
101 "js" | "jensenshannon" => Some(DriftTest::JensenShannon),
102 _ => None,
103 })
104 .collect()
105}
106
107pub(crate) fn severity_symbol(severity: DriftSeverity) -> &'static str {
109 match severity {
110 DriftSeverity::None => "\u{2713}", DriftSeverity::Low => "\u{25CB}", DriftSeverity::Medium => "\u{25CF}", DriftSeverity::High => "\u{25B2}", DriftSeverity::Critical => "\u{2716}", }
116}
117
118pub(crate) fn cmd_drift_detect(
120 reference: &Path,
121 current: &Path,
122 tests_str: &str,
123 alpha: f64,
124 format: &str,
125) -> crate::Result<()> {
126 let ref_dataset = load_dataset(reference)?;
127 let cur_dataset = load_dataset(current)?;
128
129 let tests = parse_drift_tests(tests_str);
130 if tests.is_empty() {
131 return Err(crate::Error::invalid_config(
132 "No valid tests specified. Use: ks, chi2, psi, js",
133 ));
134 }
135
136 let mut detector = DriftDetector::new(ref_dataset).with_alpha(alpha);
137 for test in tests {
138 detector = detector.with_test(test);
139 }
140
141 let report = detector.detect(&cur_dataset)?;
142
143 if format == "json" {
144 let json = serde_json::json!({
146 "drift_detected": report.drift_detected,
147 "columns": report.column_scores.values().map(|d| {
148 serde_json::json!({
149 "column": d.column,
150 "test": format!("{:?}", d.test),
151 "statistic": d.statistic,
152 "p_value": d.p_value,
153 "drift_detected": d.drift_detected,
154 "severity": format!("{:?}", d.severity),
155 })
156 }).collect::<Vec<_>>()
157 });
158 println!(
159 "{}",
160 serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?
161 );
162 } else {
163 println!("Drift Detection Report");
165 println!("======================");
166 println!("Reference: {}", reference.display());
167 println!("Current: {}", current.display());
168 println!("Alpha: {}", alpha);
169 println!();
170
171 if report.drift_detected {
172 println!("\u{26A0}\u{FE0F} DRIFT DETECTED\n");
173 } else {
174 println!("\u{2713} No significant drift detected\n");
175 }
176
177 println!(
178 "{:<20} {:<15} {:<12} {:<12} {:<10} DRIFT",
179 "COLUMN", "TEST", "STATISTIC", "P-VALUE", "SEVERITY"
180 );
181 println!("{}", "-".repeat(80));
182
183 for drift in report.column_scores.values() {
184 let drift_str = if drift.drift_detected { "YES" } else { "no" };
185 let p_value_str = drift
186 .p_value
187 .map_or_else(|| "N/A".to_string(), |p| format!("{:.4}", p));
188 println!(
189 "{:<20} {:<15} {:<12.4} {:<12} {:<10} {} {}",
190 drift.column,
191 format!("{:?}", drift.test),
192 drift.statistic,
193 p_value_str,
194 format!("{:?}", drift.severity),
195 severity_symbol(drift.severity),
196 drift_str
197 );
198 }
199
200 println!();
201 let drifted: Vec<_> = report
202 .column_scores
203 .values()
204 .filter(|d| d.drift_detected)
205 .map(|d| d.column.clone())
206 .collect();
207 if !drifted.is_empty() {
208 println!("Columns with drift: {}", drifted.join(", "));
209 }
210 }
211
212 Ok(())
213}
214
215pub(crate) fn cmd_drift_report(
217 reference: &Path,
218 current: &Path,
219 output: Option<&PathBuf>,
220) -> crate::Result<()> {
221 let ref_dataset = load_dataset(reference)?;
222 let cur_dataset = load_dataset(current)?;
223
224 let detector = DriftDetector::new(ref_dataset)
225 .with_test(DriftTest::KolmogorovSmirnov)
226 .with_test(DriftTest::PSI)
227 .with_test(DriftTest::JensenShannon);
228
229 let report = detector.detect(&cur_dataset)?;
230
231 let drifted_count = report
232 .column_scores
233 .values()
234 .filter(|d| d.drift_detected)
235 .count();
236 let json = serde_json::json!({
237 "reference": reference.display().to_string(),
238 "current": current.display().to_string(),
239 "drift_detected": report.drift_detected,
240 "summary": {
241 "total_columns": report.column_scores.len(),
242 "drifted_columns": drifted_count,
243 },
244 "columns": report.column_scores.values().map(|d| {
245 serde_json::json!({
246 "column": d.column,
247 "test": format!("{:?}", d.test),
248 "statistic": d.statistic,
249 "p_value": d.p_value,
250 "drift_detected": d.drift_detected,
251 "severity": format!("{:?}", d.severity),
252 })
253 }).collect::<Vec<_>>()
254 });
255
256 let json_str =
257 serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
258
259 if let Some(output_path) = output {
260 std::fs::write(output_path, &json_str).map_err(|e| crate::Error::io(e, output_path))?;
261 println!("Drift report written to: {}", output_path.display());
262 } else {
263 println!("{}", json_str);
264 }
265
266 Ok(())
267}
268
269pub(crate) fn parse_sketch_type(s: &str) -> Option<SketchType> {
271 match s.to_lowercase().as_str() {
272 "tdigest" | "t-digest" => Some(SketchType::TDigest),
273 "ddsketch" | "dd-sketch" => Some(SketchType::DDSketch),
274 _ => None,
275 }
276}
277
278pub(crate) fn cmd_drift_sketch(
280 input: &Path,
281 output: &Path,
282 sketch_type: &str,
283 source: Option<&str>,
284 format: &str,
285) -> crate::Result<()> {
286 let sketch_type = parse_sketch_type(sketch_type).ok_or_else(|| {
287 crate::Error::invalid_config(format!(
288 "Unknown sketch type: {}. Use 'tdigest' or 'ddsketch'",
289 sketch_type
290 ))
291 })?;
292
293 let dataset = load_dataset(input)?;
294 let mut sketch = DataSketch::from_dataset(&dataset, sketch_type)?;
295
296 if let Some(src) = source {
297 sketch = sketch.with_source(src);
298 }
299
300 match format {
301 "binary" | "bin" => {
302 let bytes = sketch.to_bytes()?;
303 std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
304 }
305 _ => {
306 let json = serde_json::to_string_pretty(&sketch)
308 .map_err(|e| crate::Error::Format(e.to_string()))?;
309 std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
310 }
311 }
312
313 println!(
314 "Created {} sketch from {} ({} rows) -> {}",
315 sketch_type.name(),
316 input.display(),
317 dataset.len(),
318 output.display()
319 );
320
321 Ok(())
322}
323
324pub(crate) fn load_sketch(path: &Path) -> crate::Result<DataSketch> {
326 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
327
328 match ext {
329 "bin" | "binary" => {
330 let bytes = std::fs::read(path).map_err(|e| crate::Error::io(e, path))?;
331 DataSketch::from_bytes(&bytes)
332 }
333 _ => {
334 let json = std::fs::read_to_string(path).map_err(|e| crate::Error::io(e, path))?;
336 serde_json::from_str(&json)
337 .map_err(|e| crate::Error::Format(format!("Invalid sketch JSON: {}", e)))
338 }
339 }
340}
341
342pub(crate) fn cmd_drift_merge(
344 sketches: &[PathBuf],
345 output: &Path,
346 format: &str,
347) -> crate::Result<()> {
348 if sketches.is_empty() {
349 return Err(crate::Error::invalid_config(
350 "No sketches provided to merge",
351 ));
352 }
353
354 let loaded: Vec<DataSketch> = sketches
355 .iter()
356 .map(|p| load_sketch(p))
357 .collect::<Result<Vec<_>, _>>()?;
358
359 let merged = DataSketch::merge(&loaded)?;
360
361 match format {
362 "binary" | "bin" => {
363 let bytes = merged.to_bytes()?;
364 std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
365 }
366 _ => {
367 let json = serde_json::to_string_pretty(&merged)
368 .map_err(|e| crate::Error::Format(e.to_string()))?;
369 std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
370 }
371 }
372
373 println!(
374 "Merged {} sketches ({} total rows) -> {}",
375 sketches.len(),
376 merged.row_count,
377 output.display()
378 );
379
380 Ok(())
381}
382
383pub(crate) fn cmd_drift_compare(
385 reference: &Path,
386 current: &Path,
387 threshold: f64,
388 format: &str,
389) -> crate::Result<()> {
390 let ref_sketch = load_sketch(reference)?;
391 let cur_sketch = load_sketch(current)?;
392
393 let detector = DistributedDriftDetector::new()
394 .with_sketch_type(ref_sketch.sketch_type)
395 .with_threshold(threshold);
396
397 let results = detector.compare(&ref_sketch, &cur_sketch)?;
398
399 let drift_detected = results.iter().any(|r| r.severity.is_drift());
400
401 if format == "json" {
402 let json = serde_json::json!({
403 "reference": reference.display().to_string(),
404 "current": current.display().to_string(),
405 "drift_detected": drift_detected,
406 "threshold": threshold,
407 "columns": results.iter().map(|r| {
408 serde_json::json!({
409 "column": r.column,
410 "statistic": r.statistic,
411 "severity": format!("{:?}", r.severity),
412 "drift_detected": r.severity.is_drift(),
413 })
414 }).collect::<Vec<_>>()
415 });
416
417 let json_str =
418 serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
419 println!("{}", json_str);
420 } else {
421 println!("Sketch Drift Comparison");
423 println!("=======================");
424 println!("Reference: {}", reference.display());
425 println!("Current: {}", current.display());
426 println!("Threshold: {}", threshold);
427 println!();
428
429 if results.is_empty() {
430 println!("No numeric columns to compare.");
431 } else {
432 println!(
433 "{:<20} {:>10} {:>10} DRIFT",
434 "COLUMN", "STATISTIC", "SEVERITY"
435 );
436 println!("{}", "-".repeat(55));
437
438 for result in &results {
439 let drift_symbol = if result.severity.is_drift() {
440 severity_symbol(result.severity)
441 } else {
442 "\u{2713}"
443 };
444 println!(
445 "{:<20} {:>10.4} {:>10} {}",
446 result.column,
447 result.statistic,
448 format!("{:?}", result.severity),
449 drift_symbol
450 );
451 }
452
453 println!();
454 if drift_detected {
455 println!("\u{26A0} Drift detected in one or more columns");
456 } else {
457 println!("\u{2713} No significant drift detected");
458 }
459 }
460 }
461
462 Ok(())
463}
464
465#[cfg(test)]
466#[allow(
467 clippy::cast_possible_truncation,
468 clippy::cast_possible_wrap,
469 clippy::cast_precision_loss,
470 clippy::uninlined_format_args,
471 clippy::unwrap_used,
472 clippy::expect_used,
473 clippy::redundant_clone,
474 clippy::cast_lossless,
475 clippy::redundant_closure_for_method_calls,
476 clippy::too_many_lines,
477 clippy::float_cmp,
478 clippy::similar_names,
479 clippy::needless_late_init,
480 clippy::redundant_pattern_matching
481)]
482mod tests {
483 use std::sync::Arc;
484
485 use arrow::{
486 array::{Float64Array, Int32Array, StringArray},
487 datatypes::{DataType, Field, Schema},
488 };
489
490 use super::*;
491 use crate::ArrowDataset;
492
493 fn create_test_parquet(path: &Path, rows: usize) {
494 let schema = Arc::new(Schema::new(vec![
495 Field::new("id", DataType::Int32, false),
496 Field::new("name", DataType::Utf8, false),
497 ]));
498
499 let ids: Vec<i32> = (0..rows as i32).collect();
500 let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
501
502 let batch = arrow::array::RecordBatch::try_new(
503 schema,
504 vec![
505 Arc::new(Int32Array::from(ids)),
506 Arc::new(StringArray::from(names)),
507 ],
508 )
509 .ok()
510 .unwrap_or_else(|| panic!("Should create batch"));
511
512 let dataset = ArrowDataset::from_batch(batch)
513 .ok()
514 .unwrap_or_else(|| panic!("Should create dataset"));
515
516 dataset
517 .to_parquet(path)
518 .ok()
519 .unwrap_or_else(|| panic!("Should write parquet"));
520 }
521
522 fn create_test_float_parquet(path: &Path, rows: usize) {
523 let schema = Arc::new(Schema::new(vec![
524 Field::new("id", DataType::Int32, false),
525 Field::new("value", DataType::Float64, false),
526 ]));
527
528 let ids: Vec<i32> = (0..rows as i32).collect();
529 let values: Vec<f64> = ids.iter().map(|i| *i as f64 * 1.5).collect();
530
531 let batch = arrow::array::RecordBatch::try_new(
532 schema,
533 vec![
534 Arc::new(Int32Array::from(ids)),
535 Arc::new(Float64Array::from(values)),
536 ],
537 )
538 .ok()
539 .unwrap_or_else(|| panic!("Should create batch"));
540
541 let dataset = ArrowDataset::from_batch(batch)
542 .ok()
543 .unwrap_or_else(|| panic!("Should create dataset"));
544
545 dataset
546 .to_parquet(path)
547 .ok()
548 .unwrap_or_else(|| panic!("Should write parquet"));
549 }
550
551 #[test]
552 fn test_parse_drift_tests() {
553 let tests = parse_drift_tests("ks,psi");
554 assert_eq!(tests.len(), 2);
555
556 let tests = parse_drift_tests("ks, chi2, psi, js");
557 assert_eq!(tests.len(), 4);
558
559 let tests = parse_drift_tests("invalid");
560 assert!(tests.is_empty());
561
562 let tests = parse_drift_tests("KS,PSI");
563 assert_eq!(tests.len(), 2);
564 }
565
566 #[test]
567 fn test_severity_symbol() {
568 assert_eq!(severity_symbol(DriftSeverity::None), "\u{2713}");
569 assert_eq!(severity_symbol(DriftSeverity::Low), "\u{25CB}");
570 assert_eq!(severity_symbol(DriftSeverity::Medium), "\u{25CF}");
571 assert_eq!(severity_symbol(DriftSeverity::High), "\u{25B2}");
572 assert_eq!(severity_symbol(DriftSeverity::Critical), "\u{2716}");
573 }
574
575 #[test]
576 fn test_cmd_drift_detect_same_data() {
577 let temp_dir = tempfile::tempdir()
578 .ok()
579 .unwrap_or_else(|| panic!("Should create temp dir"));
580 let path = temp_dir.path().join("data.parquet");
581 create_test_parquet(&path, 100);
582
583 let result = cmd_drift_detect(&path, &path, "ks,psi", 0.05, "text");
585 assert!(result.is_ok());
586 }
587
588 #[test]
589 fn test_cmd_drift_detect_json_format() {
590 let temp_dir = tempfile::tempdir()
591 .ok()
592 .unwrap_or_else(|| panic!("Should create temp dir"));
593 let path = temp_dir.path().join("data.parquet");
594 create_test_parquet(&path, 100);
595
596 let result = cmd_drift_detect(&path, &path, "ks", 0.05, "json");
597 assert!(result.is_ok());
598 }
599
600 #[test]
601 fn test_cmd_drift_detect_invalid_tests() {
602 let temp_dir = tempfile::tempdir()
603 .ok()
604 .unwrap_or_else(|| panic!("Should create temp dir"));
605 let path = temp_dir.path().join("data.parquet");
606 create_test_parquet(&path, 100);
607
608 let result = cmd_drift_detect(&path, &path, "invalid", 0.05, "text");
609 assert!(result.is_err());
610 }
611
612 #[test]
613 fn test_cmd_drift_report() {
614 let temp_dir = tempfile::tempdir()
615 .ok()
616 .unwrap_or_else(|| panic!("Should create temp dir"));
617 let path = temp_dir.path().join("data.parquet");
618 create_test_parquet(&path, 100);
619
620 let result = cmd_drift_report(&path, &path, None);
622 assert!(result.is_ok());
623 }
624
625 #[test]
626 fn test_cmd_drift_report_to_file() {
627 let temp_dir = tempfile::tempdir()
628 .ok()
629 .unwrap_or_else(|| panic!("Should create temp dir"));
630 let data_path = temp_dir.path().join("data.parquet");
631 let output_path = temp_dir.path().join("report.json");
632 create_test_parquet(&data_path, 100);
633
634 let result = cmd_drift_report(&data_path, &data_path, Some(&output_path));
635 assert!(result.is_ok());
636 assert!(output_path.exists());
637
638 let content = std::fs::read_to_string(&output_path)
640 .ok()
641 .unwrap_or_else(|| panic!("Should read file"));
642 let parsed: serde_json::Value = serde_json::from_str(&content)
643 .ok()
644 .unwrap_or_else(|| panic!("Should parse JSON"));
645 assert!(parsed.get("drift_detected").is_some());
646 }
647
648 #[test]
649 fn test_parse_sketch_type() {
650 assert!(matches!(
651 parse_sketch_type("tdigest"),
652 Some(SketchType::TDigest)
653 ));
654 assert!(matches!(
655 parse_sketch_type("TDIGEST"),
656 Some(SketchType::TDigest)
657 ));
658 assert!(matches!(
659 parse_sketch_type("ddsketch"),
660 Some(SketchType::DDSketch)
661 ));
662 assert!(matches!(
663 parse_sketch_type("DDSKETCH"),
664 Some(SketchType::DDSketch)
665 ));
666 assert!(parse_sketch_type("invalid").is_none());
667 }
668
669 #[test]
670 fn test_cmd_drift_sketch_tdigest() {
671 let temp_dir = tempfile::tempdir()
672 .ok()
673 .unwrap_or_else(|| panic!("Should create temp dir"));
674 let data_path = temp_dir.path().join("data.parquet");
675 let sketch_path = temp_dir.path().join("sketch.json");
676 create_test_float_parquet(&data_path, 100);
677
678 let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "json");
679 assert!(result.is_ok());
680 assert!(sketch_path.exists());
681
682 let content = std::fs::read_to_string(&sketch_path)
684 .ok()
685 .unwrap_or_else(|| panic!("Should read file"));
686 let parsed: serde_json::Value = serde_json::from_str(&content)
687 .ok()
688 .unwrap_or_else(|| panic!("Should parse JSON"));
689 assert!(parsed.get("sketch_type").is_some());
690 assert!(parsed.get("row_count").is_some());
691 }
692
693 #[test]
694 fn test_cmd_drift_sketch_ddsketch_type() {
695 let temp_dir = tempfile::tempdir()
696 .ok()
697 .unwrap_or_else(|| panic!("Should create temp dir"));
698 let data_path = temp_dir.path().join("data.parquet");
699 let sketch_path = temp_dir.path().join("sketch.json");
700 create_test_float_parquet(&data_path, 100);
701
702 let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
703 assert!(result.is_ok());
704 assert!(sketch_path.exists());
705 }
706
707 #[test]
708 fn test_cmd_drift_sketch_with_source() {
709 let temp_dir = tempfile::tempdir()
710 .ok()
711 .unwrap_or_else(|| panic!("Should create temp dir"));
712 let data_path = temp_dir.path().join("data.parquet");
713 let sketch_path = temp_dir.path().join("sketch.json");
714 create_test_float_parquet(&data_path, 50);
715
716 let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", Some("node-1"), "json");
717 assert!(result.is_ok());
718
719 let content = std::fs::read_to_string(&sketch_path)
721 .ok()
722 .unwrap_or_else(|| panic!("Should read file"));
723 let parsed: serde_json::Value = serde_json::from_str(&content)
724 .ok()
725 .unwrap_or_else(|| panic!("Should parse JSON"));
726 assert_eq!(
727 parsed.get("source").and_then(|v| v.as_str()),
728 Some("node-1")
729 );
730 }
731
732 #[test]
733 fn test_cmd_drift_sketch_binary_format() {
734 let temp_dir = tempfile::tempdir()
735 .ok()
736 .unwrap_or_else(|| panic!("Should create temp dir"));
737 let data_path = temp_dir.path().join("data.parquet");
738 let sketch_path = temp_dir.path().join("sketch.bin");
739 create_test_float_parquet(&data_path, 100);
740
741 let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "binary");
742 assert!(result.is_ok());
743 assert!(sketch_path.exists());
744
745 let metadata = std::fs::metadata(&sketch_path)
747 .ok()
748 .unwrap_or_else(|| panic!("Should get metadata"));
749 assert!(metadata.len() > 0);
750 }
751
752 #[test]
753 fn test_cmd_drift_sketch_invalid_type() {
754 let temp_dir = tempfile::tempdir()
755 .ok()
756 .unwrap_or_else(|| panic!("Should create temp dir"));
757 let data_path = temp_dir.path().join("data.parquet");
758 let sketch_path = temp_dir.path().join("sketch.json");
759 create_test_float_parquet(&data_path, 100);
760
761 let result = cmd_drift_sketch(&data_path, &sketch_path, "invalid", None, "json");
762 assert!(result.is_err());
763 }
764
765 #[test]
766 fn test_cmd_drift_merge_tdigest() {
767 let temp_dir = tempfile::tempdir()
768 .ok()
769 .unwrap_or_else(|| panic!("Should create temp dir"));
770
771 let data1 = temp_dir.path().join("data1.parquet");
773 let data2 = temp_dir.path().join("data2.parquet");
774 let sketch1 = temp_dir.path().join("sketch1.json");
775 let sketch2 = temp_dir.path().join("sketch2.json");
776 let merged = temp_dir.path().join("merged.json");
777
778 create_test_float_parquet(&data1, 50);
779 create_test_float_parquet(&data2, 50);
780
781 cmd_drift_sketch(&data1, &sketch1, "tdigest", Some("node-1"), "json")
783 .ok()
784 .unwrap_or_else(|| panic!("Should create sketch1"));
785 cmd_drift_sketch(&data2, &sketch2, "tdigest", Some("node-2"), "json")
786 .ok()
787 .unwrap_or_else(|| panic!("Should create sketch2"));
788
789 let sketches = vec![sketch1.clone(), sketch2.clone()];
791 let result = cmd_drift_merge(&sketches, &merged, "json");
792 assert!(result.is_ok());
793 assert!(merged.exists());
794
795 let content = std::fs::read_to_string(&merged)
797 .ok()
798 .unwrap_or_else(|| panic!("Should read file"));
799 let parsed: serde_json::Value = serde_json::from_str(&content)
800 .ok()
801 .unwrap_or_else(|| panic!("Should parse JSON"));
802 assert_eq!(parsed.get("row_count").and_then(|v| v.as_u64()), Some(100));
803 }
804
805 #[test]
806 fn test_cmd_drift_merge_single_sketch() {
807 let temp_dir = tempfile::tempdir()
808 .ok()
809 .unwrap_or_else(|| panic!("Should create temp dir"));
810
811 let data = temp_dir.path().join("data.parquet");
812 let sketch = temp_dir.path().join("sketch.json");
813 let merged = temp_dir.path().join("merged.json");
814
815 create_test_float_parquet(&data, 100);
816 cmd_drift_sketch(&data, &sketch, "tdigest", None, "json")
817 .ok()
818 .unwrap_or_else(|| panic!("Should create sketch"));
819
820 let sketches = vec![sketch.clone()];
822 let result = cmd_drift_merge(&sketches, &merged, "json");
823 assert!(result.is_ok());
824 }
825
826 #[test]
827 fn test_cmd_drift_merge_empty_fails() {
828 let temp_dir = tempfile::tempdir()
829 .ok()
830 .unwrap_or_else(|| panic!("Should create temp dir"));
831 let merged = temp_dir.path().join("merged.json");
832
833 let sketches: Vec<PathBuf> = vec![];
834 let result = cmd_drift_merge(&sketches, &merged, "json");
835 assert!(result.is_err());
836 }
837
838 #[test]
839 fn test_cmd_drift_compare_no_drift() {
840 let temp_dir = tempfile::tempdir()
841 .ok()
842 .unwrap_or_else(|| panic!("Should create temp dir"));
843
844 let data = temp_dir.path().join("data.parquet");
845 let sketch1 = temp_dir.path().join("sketch1.json");
846 let sketch2 = temp_dir.path().join("sketch2.json");
847
848 create_test_float_parquet(&data, 100);
849
850 cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
852 .ok()
853 .unwrap_or_else(|| panic!("Should create sketch1"));
854 cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
855 .ok()
856 .unwrap_or_else(|| panic!("Should create sketch2"));
857
858 let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "text");
859 assert!(result.is_ok());
860 }
861
862 #[test]
863 fn test_cmd_drift_compare_json_format() {
864 let temp_dir = tempfile::tempdir()
865 .ok()
866 .unwrap_or_else(|| panic!("Should create temp dir"));
867
868 let data = temp_dir.path().join("data.parquet");
869 let sketch1 = temp_dir.path().join("sketch1.json");
870 let sketch2 = temp_dir.path().join("sketch2.json");
871
872 create_test_float_parquet(&data, 100);
873
874 cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
875 .ok()
876 .unwrap_or_else(|| panic!("Should create sketch1"));
877 cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
878 .ok()
879 .unwrap_or_else(|| panic!("Should create sketch2"));
880
881 let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "json");
882 assert!(result.is_ok());
883 }
884
885 #[test]
886 fn test_cmd_drift_merge_binary_format() {
887 let temp_dir = tempfile::tempdir()
888 .ok()
889 .unwrap_or_else(|| panic!("Should create temp dir"));
890
891 let data = temp_dir.path().join("data.parquet");
892 let sketch = temp_dir.path().join("sketch.bin");
893 let merged = temp_dir.path().join("merged.bin");
894
895 create_test_float_parquet(&data, 100);
896 cmd_drift_sketch(&data, &sketch, "tdigest", None, "binary")
897 .ok()
898 .unwrap_or_else(|| panic!("Should create sketch"));
899
900 let sketches = vec![sketch.clone()];
901 let result = cmd_drift_merge(&sketches, &merged, "binary");
902 assert!(result.is_ok());
903 assert!(merged.exists());
904 }
905
906 #[test]
907 fn test_load_sketch_invalid_json() {
908 let temp_dir = tempfile::tempdir()
909 .ok()
910 .unwrap_or_else(|| panic!("Should create temp dir"));
911 let sketch_path = temp_dir.path().join("invalid.json");
912
913 std::fs::write(&sketch_path, "{ invalid json }")
914 .ok()
915 .unwrap_or_else(|| panic!("Should write file"));
916
917 let result = load_sketch(&sketch_path);
918 assert!(result.is_err());
919 }
920
921 #[test]
922 fn test_cmd_drift_detect_all_tests() {
923 let temp_dir = tempfile::tempdir()
924 .ok()
925 .unwrap_or_else(|| panic!("Should create temp dir"));
926 let path = temp_dir.path().join("data.parquet");
927 create_test_parquet(&path, 100);
928
929 let result = cmd_drift_detect(&path, &path, "ks,chi2,psi,js", 0.05, "text");
930 assert!(result.is_ok());
931 }
932
933 #[test]
934 fn test_cmd_drift_detect_with_drift() {
935 let temp_dir = tempfile::tempdir()
936 .ok()
937 .unwrap_or_else(|| panic!("Should create temp dir"));
938 let ref_path = temp_dir.path().join("ref.parquet");
939 let cur_path = temp_dir.path().join("cur.parquet");
940
941 create_test_parquet(&ref_path, 100);
942
943 let schema = Arc::new(Schema::new(vec![
944 Field::new("id", DataType::Int32, false),
945 Field::new("name", DataType::Utf8, false),
946 ]));
947
948 let ids: Vec<i32> = (500..600).collect();
949 let names: Vec<String> = ids.iter().map(|i| format!("different_{}", i)).collect();
950
951 let batch = arrow::array::RecordBatch::try_new(
952 schema,
953 vec![
954 Arc::new(Int32Array::from(ids)),
955 Arc::new(StringArray::from(names)),
956 ],
957 )
958 .ok()
959 .unwrap_or_else(|| panic!("Should create batch"));
960
961 let dataset = ArrowDataset::from_batch(batch)
962 .ok()
963 .unwrap_or_else(|| panic!("Should create dataset"));
964
965 dataset
966 .to_parquet(&cur_path)
967 .ok()
968 .unwrap_or_else(|| panic!("Should write parquet"));
969
970 let result = cmd_drift_detect(&ref_path, &cur_path, "ks,psi", 0.05, "text");
971 assert!(result.is_ok());
972 }
973
974 #[test]
975 fn test_cmd_drift_compare_empty_results() {
976 let temp_dir = tempfile::tempdir()
977 .ok()
978 .unwrap_or_else(|| panic!("Should create temp dir"));
979
980 let data_path = temp_dir.path().join("strings.parquet");
982 let sketch1_path = temp_dir.path().join("sketch1.json");
983 let sketch2_path = temp_dir.path().join("sketch2.json");
984
985 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
986
987 let names: Vec<String> = (0..50).map(|i| format!("name_{}", i)).collect();
988
989 let batch =
990 arrow::array::RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(names))])
991 .ok()
992 .unwrap_or_else(|| panic!("Should create batch"));
993
994 let dataset = ArrowDataset::from_batch(batch)
995 .ok()
996 .unwrap_or_else(|| panic!("Should create dataset"));
997
998 dataset
999 .to_parquet(&data_path)
1000 .ok()
1001 .unwrap_or_else(|| panic!("Should write parquet"));
1002
1003 cmd_drift_sketch(&data_path, &sketch1_path, "tdigest", None, "json")
1004 .ok()
1005 .unwrap_or_else(|| panic!("Should create sketch1"));
1006 cmd_drift_sketch(&data_path, &sketch2_path, "tdigest", None, "json")
1007 .ok()
1008 .unwrap_or_else(|| panic!("Should create sketch2"));
1009
1010 let result = cmd_drift_compare(&sketch1_path, &sketch2_path, 0.1, "text");
1011 assert!(result.is_ok());
1012 }
1013
1014 #[test]
1015 fn test_parse_drift_tests_single() {
1016 let tests = parse_drift_tests("ks");
1017 assert_eq!(tests.len(), 1);
1018 assert!(matches!(tests[0], DriftTest::KolmogorovSmirnov));
1019 }
1020
1021 #[test]
1022 fn test_parse_drift_tests_multiple() {
1023 let tests = parse_drift_tests("ks,chi2,psi,js");
1024 assert_eq!(tests.len(), 4);
1025 }
1026
1027 #[test]
1028 fn test_parse_drift_tests_with_spaces() {
1029 let tests = parse_drift_tests("ks, chi2 ,psi");
1030 assert_eq!(tests.len(), 3);
1031 }
1032
1033 #[test]
1034 fn test_parse_drift_tests_unknown() {
1035 let tests = parse_drift_tests("unknown");
1036 assert!(tests.is_empty());
1037 }
1038
1039 #[test]
1040 fn test_parse_sketch_type_tdigest() {
1041 assert!(matches!(
1042 parse_sketch_type("tdigest"),
1043 Some(SketchType::TDigest)
1044 ));
1045 }
1046
1047 #[test]
1048 fn test_parse_sketch_type_ddsketch() {
1049 assert!(matches!(
1050 parse_sketch_type("ddsketch"),
1051 Some(SketchType::DDSketch)
1052 ));
1053 }
1054
1055 #[test]
1056 fn test_parse_sketch_type_unknown() {
1057 assert!(parse_sketch_type("unknown").is_none());
1058 }
1059
1060 #[test]
1061 fn test_cmd_drift_report_basic() {
1062 let temp_dir = tempfile::tempdir()
1063 .ok()
1064 .unwrap_or_else(|| panic!("Should create temp dir"));
1065 let ref_path = temp_dir.path().join("ref.parquet");
1066 let cur_path = temp_dir.path().join("cur.parquet");
1067 create_test_parquet(&ref_path, 100);
1068 create_test_parquet(&cur_path, 100);
1069
1070 let result = cmd_drift_report(&ref_path, &cur_path, None);
1071 assert!(result.is_ok());
1072 }
1073
1074 #[test]
1075 fn test_cmd_drift_report_with_output() {
1076 let temp_dir = tempfile::tempdir()
1077 .ok()
1078 .unwrap_or_else(|| panic!("Should create temp dir"));
1079 let ref_path = temp_dir.path().join("ref.parquet");
1080 let cur_path = temp_dir.path().join("cur.parquet");
1081 let output = temp_dir.path().join("report.html");
1082 create_test_parquet(&ref_path, 100);
1083 create_test_parquet(&cur_path, 100);
1084
1085 let result = cmd_drift_report(&ref_path, &cur_path, Some(&output));
1086 assert!(result.is_ok());
1087 assert!(output.exists());
1088 }
1089
1090 #[test]
1091 fn test_cmd_drift_sketch_ddsketch_json_output() {
1092 let temp_dir = tempfile::tempdir()
1093 .ok()
1094 .unwrap_or_else(|| panic!("Should create temp dir"));
1095 let data_path = temp_dir.path().join("data.parquet");
1096 let sketch_path = temp_dir.path().join("sketch.json");
1097 create_test_parquet(&data_path, 100);
1098
1099 let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
1100 assert!(result.is_ok());
1101 }
1102
1103 #[test]
1104 fn test_cmd_drift_merge() {
1105 let temp_dir = tempfile::tempdir()
1106 .ok()
1107 .unwrap_or_else(|| panic!("Should create temp dir"));
1108 let data1 = temp_dir.path().join("data1.parquet");
1109 let data2 = temp_dir.path().join("data2.parquet");
1110 let sketch1 = temp_dir.path().join("sketch1.json");
1111 let sketch2 = temp_dir.path().join("sketch2.json");
1112 let merged = temp_dir.path().join("merged.json");
1113
1114 let schema = Arc::new(Schema::new(vec![Field::new(
1116 "value",
1117 DataType::Float64,
1118 false,
1119 )]));
1120
1121 let batch1 = arrow::array::RecordBatch::try_new(
1122 schema.clone(),
1123 vec![Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]))],
1124 )
1125 .unwrap();
1126 let dataset1 = ArrowDataset::from_batch(batch1).unwrap();
1127 dataset1.to_parquet(&data1).unwrap();
1128
1129 let batch2 = arrow::array::RecordBatch::try_new(
1130 schema,
1131 vec![Arc::new(Float64Array::from(vec![6.0, 7.0, 8.0, 9.0, 10.0]))],
1132 )
1133 .unwrap();
1134 let dataset2 = ArrowDataset::from_batch(batch2).unwrap();
1135 dataset2.to_parquet(&data2).unwrap();
1136
1137 cmd_drift_sketch(&data1, &sketch1, "tdigest", None, "json").unwrap();
1138 cmd_drift_sketch(&data2, &sketch2, "tdigest", None, "json").unwrap();
1139
1140 let sketches = vec![sketch1, sketch2];
1141 let result = cmd_drift_merge(&sketches, &merged, "json");
1142 assert!(result.is_ok());
1143 }
1144}