Skip to main content

alimentar/cli/
drift.rs

1//! Drift detection CLI commands.
2
3use std::path::{Path, PathBuf};
4
5use clap::Subcommand;
6
7use super::basic::load_dataset;
8use crate::{
9    drift::{DriftDetector, DriftSeverity, DriftTest},
10    sketch::{DataSketch, DistributedDriftDetector, SketchType},
11    Dataset,
12};
13
14/// Drift detection commands.
15#[derive(Subcommand)]
16pub enum DriftCommands {
17    /// Detect drift between reference and current datasets
18    Detect {
19        /// Reference (baseline) dataset
20        #[arg(short, long)]
21        reference: PathBuf,
22        /// Current dataset to compare
23        #[arg(short, long)]
24        current: PathBuf,
25        /// Statistical tests to use (ks, chi2, psi, js)
26        #[arg(short, long, default_value = "ks,psi")]
27        tests: String,
28        /// Significance threshold (alpha)
29        #[arg(short, long, default_value = "0.05")]
30        alpha: f64,
31        /// Output format (text, json)
32        #[arg(short, long, default_value = "text")]
33        format: String,
34    },
35    /// Generate a drift report summary
36    Report {
37        /// Reference (baseline) dataset
38        #[arg(short, long)]
39        reference: PathBuf,
40        /// Current dataset to compare
41        #[arg(short, long)]
42        current: PathBuf,
43        /// Output file for the report (JSON format)
44        #[arg(short, long)]
45        output: Option<PathBuf>,
46    },
47    /// Create a sketch from a dataset for distributed drift detection
48    Sketch {
49        /// Input dataset file
50        input: PathBuf,
51        /// Output sketch file
52        #[arg(short, long)]
53        output: PathBuf,
54        /// Sketch algorithm type (tdigest, ddsketch)
55        #[arg(short = 't', long, default_value = "tdigest")]
56        sketch_type: String,
57        /// Source identifier (e.g., node name)
58        #[arg(short, long)]
59        source: Option<String>,
60        /// Output format (json, binary)
61        #[arg(short, long, default_value = "json")]
62        format: String,
63    },
64    /// Merge multiple sketches into one
65    Merge {
66        /// Sketch files to merge
67        #[arg(required = true)]
68        sketches: Vec<PathBuf>,
69        /// Output merged sketch file
70        #[arg(short, long)]
71        output: PathBuf,
72        /// Output format (json, binary)
73        #[arg(short, long, default_value = "json")]
74        format: String,
75    },
76    /// Compare two sketches for drift
77    Compare {
78        /// Reference (baseline) sketch
79        #[arg(short, long)]
80        reference: PathBuf,
81        /// Current sketch to compare
82        #[arg(short, long)]
83        current: PathBuf,
84        /// Drift detection threshold
85        #[arg(short = 't', long, default_value = "0.1")]
86        threshold: f64,
87        /// Output format (text, json)
88        #[arg(short, long, default_value = "text")]
89        format: String,
90    },
91}
92
93/// Parse drift test types from a comma-separated string.
94pub(crate) fn parse_drift_tests(tests_str: &str) -> Vec<DriftTest> {
95    tests_str
96        .split(',')
97        .filter_map(|t| match t.trim().to_lowercase().as_str() {
98            "ks" => Some(DriftTest::KolmogorovSmirnov),
99            "chi2" | "chisquared" => Some(DriftTest::ChiSquared),
100            "psi" => Some(DriftTest::PSI),
101            "js" | "jensenshannon" => Some(DriftTest::JensenShannon),
102            _ => None,
103        })
104        .collect()
105}
106
107/// Get severity symbol for display.
108pub(crate) fn severity_symbol(severity: DriftSeverity) -> &'static str {
109    match severity {
110        DriftSeverity::None => "\u{2713}",     // checkmark
111        DriftSeverity::Low => "\u{25CB}",      // empty circle
112        DriftSeverity::Medium => "\u{25CF}",   // filled circle
113        DriftSeverity::High => "\u{25B2}",     // triangle
114        DriftSeverity::Critical => "\u{2716}", // X mark
115    }
116}
117
118/// Detect drift between reference and current datasets.
119pub(crate) fn cmd_drift_detect(
120    reference: &Path,
121    current: &Path,
122    tests_str: &str,
123    alpha: f64,
124    format: &str,
125) -> crate::Result<()> {
126    let ref_dataset = load_dataset(reference)?;
127    let cur_dataset = load_dataset(current)?;
128
129    let tests = parse_drift_tests(tests_str);
130    if tests.is_empty() {
131        return Err(crate::Error::invalid_config(
132            "No valid tests specified. Use: ks, chi2, psi, js",
133        ));
134    }
135
136    let mut detector = DriftDetector::new(ref_dataset).with_alpha(alpha);
137    for test in tests {
138        detector = detector.with_test(test);
139    }
140
141    let report = detector.detect(&cur_dataset)?;
142
143    if format == "json" {
144        // JSON output
145        let json = serde_json::json!({
146            "drift_detected": report.drift_detected,
147            "columns": report.column_scores.values().map(|d| {
148                serde_json::json!({
149                    "column": d.column,
150                    "test": format!("{:?}", d.test),
151                    "statistic": d.statistic,
152                    "p_value": d.p_value,
153                    "drift_detected": d.drift_detected,
154                    "severity": format!("{:?}", d.severity),
155                })
156            }).collect::<Vec<_>>()
157        });
158        println!(
159            "{}",
160            serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?
161        );
162    } else {
163        // Text output
164        println!("Drift Detection Report");
165        println!("======================");
166        println!("Reference: {}", reference.display());
167        println!("Current:   {}", current.display());
168        println!("Alpha:     {}", alpha);
169        println!();
170
171        if report.drift_detected {
172            println!("\u{26A0}\u{FE0F}  DRIFT DETECTED\n");
173        } else {
174            println!("\u{2713} No significant drift detected\n");
175        }
176
177        println!(
178            "{:<20} {:<15} {:<12} {:<12} {:<10} DRIFT",
179            "COLUMN", "TEST", "STATISTIC", "P-VALUE", "SEVERITY"
180        );
181        println!("{}", "-".repeat(80));
182
183        for drift in report.column_scores.values() {
184            let drift_str = if drift.drift_detected { "YES" } else { "no" };
185            let p_value_str = drift
186                .p_value
187                .map_or_else(|| "N/A".to_string(), |p| format!("{:.4}", p));
188            println!(
189                "{:<20} {:<15} {:<12.4} {:<12} {:<10} {} {}",
190                drift.column,
191                format!("{:?}", drift.test),
192                drift.statistic,
193                p_value_str,
194                format!("{:?}", drift.severity),
195                severity_symbol(drift.severity),
196                drift_str
197            );
198        }
199
200        println!();
201        let drifted: Vec<_> = report
202            .column_scores
203            .values()
204            .filter(|d| d.drift_detected)
205            .map(|d| d.column.clone())
206            .collect();
207        if !drifted.is_empty() {
208            println!("Columns with drift: {}", drifted.join(", "));
209        }
210    }
211
212    Ok(())
213}
214
215/// Generate a drift report summary.
216pub(crate) fn cmd_drift_report(
217    reference: &Path,
218    current: &Path,
219    output: Option<&PathBuf>,
220) -> crate::Result<()> {
221    let ref_dataset = load_dataset(reference)?;
222    let cur_dataset = load_dataset(current)?;
223
224    let detector = DriftDetector::new(ref_dataset)
225        .with_test(DriftTest::KolmogorovSmirnov)
226        .with_test(DriftTest::PSI)
227        .with_test(DriftTest::JensenShannon);
228
229    let report = detector.detect(&cur_dataset)?;
230
231    let drifted_count = report
232        .column_scores
233        .values()
234        .filter(|d| d.drift_detected)
235        .count();
236    let json = serde_json::json!({
237        "reference": reference.display().to_string(),
238        "current": current.display().to_string(),
239        "drift_detected": report.drift_detected,
240        "summary": {
241            "total_columns": report.column_scores.len(),
242            "drifted_columns": drifted_count,
243        },
244        "columns": report.column_scores.values().map(|d| {
245            serde_json::json!({
246                "column": d.column,
247                "test": format!("{:?}", d.test),
248                "statistic": d.statistic,
249                "p_value": d.p_value,
250                "drift_detected": d.drift_detected,
251                "severity": format!("{:?}", d.severity),
252            })
253        }).collect::<Vec<_>>()
254    });
255
256    let json_str =
257        serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
258
259    if let Some(output_path) = output {
260        std::fs::write(output_path, &json_str).map_err(|e| crate::Error::io(e, output_path))?;
261        println!("Drift report written to: {}", output_path.display());
262    } else {
263        println!("{}", json_str);
264    }
265
266    Ok(())
267}
268
269/// Parse sketch type from string.
270pub(crate) fn parse_sketch_type(s: &str) -> Option<SketchType> {
271    match s.to_lowercase().as_str() {
272        "tdigest" | "t-digest" => Some(SketchType::TDigest),
273        "ddsketch" | "dd-sketch" => Some(SketchType::DDSketch),
274        _ => None,
275    }
276}
277
278/// Create a sketch from a dataset for distributed drift detection.
279pub(crate) fn cmd_drift_sketch(
280    input: &Path,
281    output: &Path,
282    sketch_type: &str,
283    source: Option<&str>,
284    format: &str,
285) -> crate::Result<()> {
286    let sketch_type = parse_sketch_type(sketch_type).ok_or_else(|| {
287        crate::Error::invalid_config(format!(
288            "Unknown sketch type: {}. Use 'tdigest' or 'ddsketch'",
289            sketch_type
290        ))
291    })?;
292
293    let dataset = load_dataset(input)?;
294    let mut sketch = DataSketch::from_dataset(&dataset, sketch_type)?;
295
296    if let Some(src) = source {
297        sketch = sketch.with_source(src);
298    }
299
300    match format {
301        "binary" | "bin" => {
302            let bytes = sketch.to_bytes()?;
303            std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
304        }
305        _ => {
306            // Default to JSON
307            let json = serde_json::to_string_pretty(&sketch)
308                .map_err(|e| crate::Error::Format(e.to_string()))?;
309            std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
310        }
311    }
312
313    println!(
314        "Created {} sketch from {} ({} rows) -> {}",
315        sketch_type.name(),
316        input.display(),
317        dataset.len(),
318        output.display()
319    );
320
321    Ok(())
322}
323
324/// Load a sketch from a file.
325pub(crate) fn load_sketch(path: &Path) -> crate::Result<DataSketch> {
326    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
327
328    match ext {
329        "bin" | "binary" => {
330            let bytes = std::fs::read(path).map_err(|e| crate::Error::io(e, path))?;
331            DataSketch::from_bytes(&bytes)
332        }
333        _ => {
334            // Default to JSON
335            let json = std::fs::read_to_string(path).map_err(|e| crate::Error::io(e, path))?;
336            serde_json::from_str(&json)
337                .map_err(|e| crate::Error::Format(format!("Invalid sketch JSON: {}", e)))
338        }
339    }
340}
341
342/// Merge multiple sketches into one.
343pub(crate) fn cmd_drift_merge(
344    sketches: &[PathBuf],
345    output: &Path,
346    format: &str,
347) -> crate::Result<()> {
348    if sketches.is_empty() {
349        return Err(crate::Error::invalid_config(
350            "No sketches provided to merge",
351        ));
352    }
353
354    let loaded: Vec<DataSketch> = sketches
355        .iter()
356        .map(|p| load_sketch(p))
357        .collect::<Result<Vec<_>, _>>()?;
358
359    let merged = DataSketch::merge(&loaded)?;
360
361    match format {
362        "binary" | "bin" => {
363            let bytes = merged.to_bytes()?;
364            std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
365        }
366        _ => {
367            let json = serde_json::to_string_pretty(&merged)
368                .map_err(|e| crate::Error::Format(e.to_string()))?;
369            std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
370        }
371    }
372
373    println!(
374        "Merged {} sketches ({} total rows) -> {}",
375        sketches.len(),
376        merged.row_count,
377        output.display()
378    );
379
380    Ok(())
381}
382
383/// Compare two sketches for drift.
384pub(crate) fn cmd_drift_compare(
385    reference: &Path,
386    current: &Path,
387    threshold: f64,
388    format: &str,
389) -> crate::Result<()> {
390    let ref_sketch = load_sketch(reference)?;
391    let cur_sketch = load_sketch(current)?;
392
393    let detector = DistributedDriftDetector::new()
394        .with_sketch_type(ref_sketch.sketch_type)
395        .with_threshold(threshold);
396
397    let results = detector.compare(&ref_sketch, &cur_sketch)?;
398
399    let drift_detected = results.iter().any(|r| r.severity.is_drift());
400
401    if format == "json" {
402        let json = serde_json::json!({
403            "reference": reference.display().to_string(),
404            "current": current.display().to_string(),
405            "drift_detected": drift_detected,
406            "threshold": threshold,
407            "columns": results.iter().map(|r| {
408                serde_json::json!({
409                    "column": r.column,
410                    "statistic": r.statistic,
411                    "severity": format!("{:?}", r.severity),
412                    "drift_detected": r.severity.is_drift(),
413                })
414            }).collect::<Vec<_>>()
415        });
416
417        let json_str =
418            serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
419        println!("{}", json_str);
420    } else {
421        // Text format
422        println!("Sketch Drift Comparison");
423        println!("=======================");
424        println!("Reference: {}", reference.display());
425        println!("Current:   {}", current.display());
426        println!("Threshold: {}", threshold);
427        println!();
428
429        if results.is_empty() {
430            println!("No numeric columns to compare.");
431        } else {
432            println!(
433                "{:<20} {:>10} {:>10} DRIFT",
434                "COLUMN", "STATISTIC", "SEVERITY"
435            );
436            println!("{}", "-".repeat(55));
437
438            for result in &results {
439                let drift_symbol = if result.severity.is_drift() {
440                    severity_symbol(result.severity)
441                } else {
442                    "\u{2713}"
443                };
444                println!(
445                    "{:<20} {:>10.4} {:>10} {}",
446                    result.column,
447                    result.statistic,
448                    format!("{:?}", result.severity),
449                    drift_symbol
450                );
451            }
452
453            println!();
454            if drift_detected {
455                println!("\u{26A0} Drift detected in one or more columns");
456            } else {
457                println!("\u{2713} No significant drift detected");
458            }
459        }
460    }
461
462    Ok(())
463}
464
465#[cfg(test)]
466#[allow(
467    clippy::cast_possible_truncation,
468    clippy::cast_possible_wrap,
469    clippy::cast_precision_loss,
470    clippy::uninlined_format_args,
471    clippy::unwrap_used,
472    clippy::expect_used,
473    clippy::redundant_clone,
474    clippy::cast_lossless,
475    clippy::redundant_closure_for_method_calls,
476    clippy::too_many_lines,
477    clippy::float_cmp,
478    clippy::similar_names,
479    clippy::needless_late_init,
480    clippy::redundant_pattern_matching
481)]
482mod tests {
483    use std::sync::Arc;
484
485    use arrow::{
486        array::{Float64Array, Int32Array, StringArray},
487        datatypes::{DataType, Field, Schema},
488    };
489
490    use super::*;
491    use crate::ArrowDataset;
492
493    fn create_test_parquet(path: &Path, rows: usize) {
494        let schema = Arc::new(Schema::new(vec![
495            Field::new("id", DataType::Int32, false),
496            Field::new("name", DataType::Utf8, false),
497        ]));
498
499        let ids: Vec<i32> = (0..rows as i32).collect();
500        let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
501
502        let batch = arrow::array::RecordBatch::try_new(
503            schema,
504            vec![
505                Arc::new(Int32Array::from(ids)),
506                Arc::new(StringArray::from(names)),
507            ],
508        )
509        .ok()
510        .unwrap_or_else(|| panic!("Should create batch"));
511
512        let dataset = ArrowDataset::from_batch(batch)
513            .ok()
514            .unwrap_or_else(|| panic!("Should create dataset"));
515
516        dataset
517            .to_parquet(path)
518            .ok()
519            .unwrap_or_else(|| panic!("Should write parquet"));
520    }
521
522    fn create_test_float_parquet(path: &Path, rows: usize) {
523        let schema = Arc::new(Schema::new(vec![
524            Field::new("id", DataType::Int32, false),
525            Field::new("value", DataType::Float64, false),
526        ]));
527
528        let ids: Vec<i32> = (0..rows as i32).collect();
529        let values: Vec<f64> = ids.iter().map(|i| *i as f64 * 1.5).collect();
530
531        let batch = arrow::array::RecordBatch::try_new(
532            schema,
533            vec![
534                Arc::new(Int32Array::from(ids)),
535                Arc::new(Float64Array::from(values)),
536            ],
537        )
538        .ok()
539        .unwrap_or_else(|| panic!("Should create batch"));
540
541        let dataset = ArrowDataset::from_batch(batch)
542            .ok()
543            .unwrap_or_else(|| panic!("Should create dataset"));
544
545        dataset
546            .to_parquet(path)
547            .ok()
548            .unwrap_or_else(|| panic!("Should write parquet"));
549    }
550
551    #[test]
552    fn test_parse_drift_tests() {
553        let tests = parse_drift_tests("ks,psi");
554        assert_eq!(tests.len(), 2);
555
556        let tests = parse_drift_tests("ks, chi2, psi, js");
557        assert_eq!(tests.len(), 4);
558
559        let tests = parse_drift_tests("invalid");
560        assert!(tests.is_empty());
561
562        let tests = parse_drift_tests("KS,PSI");
563        assert_eq!(tests.len(), 2);
564    }
565
566    #[test]
567    fn test_severity_symbol() {
568        assert_eq!(severity_symbol(DriftSeverity::None), "\u{2713}");
569        assert_eq!(severity_symbol(DriftSeverity::Low), "\u{25CB}");
570        assert_eq!(severity_symbol(DriftSeverity::Medium), "\u{25CF}");
571        assert_eq!(severity_symbol(DriftSeverity::High), "\u{25B2}");
572        assert_eq!(severity_symbol(DriftSeverity::Critical), "\u{2716}");
573    }
574
575    #[test]
576    fn test_cmd_drift_detect_same_data() {
577        let temp_dir = tempfile::tempdir()
578            .ok()
579            .unwrap_or_else(|| panic!("Should create temp dir"));
580        let path = temp_dir.path().join("data.parquet");
581        create_test_parquet(&path, 100);
582
583        // Compare dataset with itself - should detect no drift
584        let result = cmd_drift_detect(&path, &path, "ks,psi", 0.05, "text");
585        assert!(result.is_ok());
586    }
587
588    #[test]
589    fn test_cmd_drift_detect_json_format() {
590        let temp_dir = tempfile::tempdir()
591            .ok()
592            .unwrap_or_else(|| panic!("Should create temp dir"));
593        let path = temp_dir.path().join("data.parquet");
594        create_test_parquet(&path, 100);
595
596        let result = cmd_drift_detect(&path, &path, "ks", 0.05, "json");
597        assert!(result.is_ok());
598    }
599
600    #[test]
601    fn test_cmd_drift_detect_invalid_tests() {
602        let temp_dir = tempfile::tempdir()
603            .ok()
604            .unwrap_or_else(|| panic!("Should create temp dir"));
605        let path = temp_dir.path().join("data.parquet");
606        create_test_parquet(&path, 100);
607
608        let result = cmd_drift_detect(&path, &path, "invalid", 0.05, "text");
609        assert!(result.is_err());
610    }
611
612    #[test]
613    fn test_cmd_drift_report() {
614        let temp_dir = tempfile::tempdir()
615            .ok()
616            .unwrap_or_else(|| panic!("Should create temp dir"));
617        let path = temp_dir.path().join("data.parquet");
618        create_test_parquet(&path, 100);
619
620        // Report without output file (prints to stdout)
621        let result = cmd_drift_report(&path, &path, None);
622        assert!(result.is_ok());
623    }
624
625    #[test]
626    fn test_cmd_drift_report_to_file() {
627        let temp_dir = tempfile::tempdir()
628            .ok()
629            .unwrap_or_else(|| panic!("Should create temp dir"));
630        let data_path = temp_dir.path().join("data.parquet");
631        let output_path = temp_dir.path().join("report.json");
632        create_test_parquet(&data_path, 100);
633
634        let result = cmd_drift_report(&data_path, &data_path, Some(&output_path));
635        assert!(result.is_ok());
636        assert!(output_path.exists());
637
638        // Verify JSON is valid
639        let content = std::fs::read_to_string(&output_path)
640            .ok()
641            .unwrap_or_else(|| panic!("Should read file"));
642        let parsed: serde_json::Value = serde_json::from_str(&content)
643            .ok()
644            .unwrap_or_else(|| panic!("Should parse JSON"));
645        assert!(parsed.get("drift_detected").is_some());
646    }
647
648    #[test]
649    fn test_parse_sketch_type() {
650        assert!(matches!(
651            parse_sketch_type("tdigest"),
652            Some(SketchType::TDigest)
653        ));
654        assert!(matches!(
655            parse_sketch_type("TDIGEST"),
656            Some(SketchType::TDigest)
657        ));
658        assert!(matches!(
659            parse_sketch_type("ddsketch"),
660            Some(SketchType::DDSketch)
661        ));
662        assert!(matches!(
663            parse_sketch_type("DDSKETCH"),
664            Some(SketchType::DDSketch)
665        ));
666        assert!(parse_sketch_type("invalid").is_none());
667    }
668
669    #[test]
670    fn test_cmd_drift_sketch_tdigest() {
671        let temp_dir = tempfile::tempdir()
672            .ok()
673            .unwrap_or_else(|| panic!("Should create temp dir"));
674        let data_path = temp_dir.path().join("data.parquet");
675        let sketch_path = temp_dir.path().join("sketch.json");
676        create_test_float_parquet(&data_path, 100);
677
678        let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "json");
679        assert!(result.is_ok());
680        assert!(sketch_path.exists());
681
682        // Verify sketch file is valid JSON
683        let content = std::fs::read_to_string(&sketch_path)
684            .ok()
685            .unwrap_or_else(|| panic!("Should read file"));
686        let parsed: serde_json::Value = serde_json::from_str(&content)
687            .ok()
688            .unwrap_or_else(|| panic!("Should parse JSON"));
689        assert!(parsed.get("sketch_type").is_some());
690        assert!(parsed.get("row_count").is_some());
691    }
692
693    #[test]
694    fn test_cmd_drift_sketch_ddsketch_type() {
695        let temp_dir = tempfile::tempdir()
696            .ok()
697            .unwrap_or_else(|| panic!("Should create temp dir"));
698        let data_path = temp_dir.path().join("data.parquet");
699        let sketch_path = temp_dir.path().join("sketch.json");
700        create_test_float_parquet(&data_path, 100);
701
702        let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
703        assert!(result.is_ok());
704        assert!(sketch_path.exists());
705    }
706
707    #[test]
708    fn test_cmd_drift_sketch_with_source() {
709        let temp_dir = tempfile::tempdir()
710            .ok()
711            .unwrap_or_else(|| panic!("Should create temp dir"));
712        let data_path = temp_dir.path().join("data.parquet");
713        let sketch_path = temp_dir.path().join("sketch.json");
714        create_test_float_parquet(&data_path, 50);
715
716        let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", Some("node-1"), "json");
717        assert!(result.is_ok());
718
719        // Verify source is in the output
720        let content = std::fs::read_to_string(&sketch_path)
721            .ok()
722            .unwrap_or_else(|| panic!("Should read file"));
723        let parsed: serde_json::Value = serde_json::from_str(&content)
724            .ok()
725            .unwrap_or_else(|| panic!("Should parse JSON"));
726        assert_eq!(
727            parsed.get("source").and_then(|v| v.as_str()),
728            Some("node-1")
729        );
730    }
731
732    #[test]
733    fn test_cmd_drift_sketch_binary_format() {
734        let temp_dir = tempfile::tempdir()
735            .ok()
736            .unwrap_or_else(|| panic!("Should create temp dir"));
737        let data_path = temp_dir.path().join("data.parquet");
738        let sketch_path = temp_dir.path().join("sketch.bin");
739        create_test_float_parquet(&data_path, 100);
740
741        let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "binary");
742        assert!(result.is_ok());
743        assert!(sketch_path.exists());
744
745        // Binary file should exist and be non-empty
746        let metadata = std::fs::metadata(&sketch_path)
747            .ok()
748            .unwrap_or_else(|| panic!("Should get metadata"));
749        assert!(metadata.len() > 0);
750    }
751
752    #[test]
753    fn test_cmd_drift_sketch_invalid_type() {
754        let temp_dir = tempfile::tempdir()
755            .ok()
756            .unwrap_or_else(|| panic!("Should create temp dir"));
757        let data_path = temp_dir.path().join("data.parquet");
758        let sketch_path = temp_dir.path().join("sketch.json");
759        create_test_float_parquet(&data_path, 100);
760
761        let result = cmd_drift_sketch(&data_path, &sketch_path, "invalid", None, "json");
762        assert!(result.is_err());
763    }
764
765    #[test]
766    fn test_cmd_drift_merge_tdigest() {
767        let temp_dir = tempfile::tempdir()
768            .ok()
769            .unwrap_or_else(|| panic!("Should create temp dir"));
770
771        // Create two datasets
772        let data1 = temp_dir.path().join("data1.parquet");
773        let data2 = temp_dir.path().join("data2.parquet");
774        let sketch1 = temp_dir.path().join("sketch1.json");
775        let sketch2 = temp_dir.path().join("sketch2.json");
776        let merged = temp_dir.path().join("merged.json");
777
778        create_test_float_parquet(&data1, 50);
779        create_test_float_parquet(&data2, 50);
780
781        // Create sketches
782        cmd_drift_sketch(&data1, &sketch1, "tdigest", Some("node-1"), "json")
783            .ok()
784            .unwrap_or_else(|| panic!("Should create sketch1"));
785        cmd_drift_sketch(&data2, &sketch2, "tdigest", Some("node-2"), "json")
786            .ok()
787            .unwrap_or_else(|| panic!("Should create sketch2"));
788
789        // Merge
790        let sketches = vec![sketch1.clone(), sketch2.clone()];
791        let result = cmd_drift_merge(&sketches, &merged, "json");
792        assert!(result.is_ok());
793        assert!(merged.exists());
794
795        // Verify merged sketch
796        let content = std::fs::read_to_string(&merged)
797            .ok()
798            .unwrap_or_else(|| panic!("Should read file"));
799        let parsed: serde_json::Value = serde_json::from_str(&content)
800            .ok()
801            .unwrap_or_else(|| panic!("Should parse JSON"));
802        assert_eq!(parsed.get("row_count").and_then(|v| v.as_u64()), Some(100));
803    }
804
805    #[test]
806    fn test_cmd_drift_merge_single_sketch() {
807        let temp_dir = tempfile::tempdir()
808            .ok()
809            .unwrap_or_else(|| panic!("Should create temp dir"));
810
811        let data = temp_dir.path().join("data.parquet");
812        let sketch = temp_dir.path().join("sketch.json");
813        let merged = temp_dir.path().join("merged.json");
814
815        create_test_float_parquet(&data, 100);
816        cmd_drift_sketch(&data, &sketch, "tdigest", None, "json")
817            .ok()
818            .unwrap_or_else(|| panic!("Should create sketch"));
819
820        // Merge single sketch
821        let sketches = vec![sketch.clone()];
822        let result = cmd_drift_merge(&sketches, &merged, "json");
823        assert!(result.is_ok());
824    }
825
826    #[test]
827    fn test_cmd_drift_merge_empty_fails() {
828        let temp_dir = tempfile::tempdir()
829            .ok()
830            .unwrap_or_else(|| panic!("Should create temp dir"));
831        let merged = temp_dir.path().join("merged.json");
832
833        let sketches: Vec<PathBuf> = vec![];
834        let result = cmd_drift_merge(&sketches, &merged, "json");
835        assert!(result.is_err());
836    }
837
838    #[test]
839    fn test_cmd_drift_compare_no_drift() {
840        let temp_dir = tempfile::tempdir()
841            .ok()
842            .unwrap_or_else(|| panic!("Should create temp dir"));
843
844        let data = temp_dir.path().join("data.parquet");
845        let sketch1 = temp_dir.path().join("sketch1.json");
846        let sketch2 = temp_dir.path().join("sketch2.json");
847
848        create_test_float_parquet(&data, 100);
849
850        // Create two identical sketches
851        cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
852            .ok()
853            .unwrap_or_else(|| panic!("Should create sketch1"));
854        cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
855            .ok()
856            .unwrap_or_else(|| panic!("Should create sketch2"));
857
858        let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "text");
859        assert!(result.is_ok());
860    }
861
862    #[test]
863    fn test_cmd_drift_compare_json_format() {
864        let temp_dir = tempfile::tempdir()
865            .ok()
866            .unwrap_or_else(|| panic!("Should create temp dir"));
867
868        let data = temp_dir.path().join("data.parquet");
869        let sketch1 = temp_dir.path().join("sketch1.json");
870        let sketch2 = temp_dir.path().join("sketch2.json");
871
872        create_test_float_parquet(&data, 100);
873
874        cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
875            .ok()
876            .unwrap_or_else(|| panic!("Should create sketch1"));
877        cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
878            .ok()
879            .unwrap_or_else(|| panic!("Should create sketch2"));
880
881        let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "json");
882        assert!(result.is_ok());
883    }
884
885    #[test]
886    fn test_cmd_drift_merge_binary_format() {
887        let temp_dir = tempfile::tempdir()
888            .ok()
889            .unwrap_or_else(|| panic!("Should create temp dir"));
890
891        let data = temp_dir.path().join("data.parquet");
892        let sketch = temp_dir.path().join("sketch.bin");
893        let merged = temp_dir.path().join("merged.bin");
894
895        create_test_float_parquet(&data, 100);
896        cmd_drift_sketch(&data, &sketch, "tdigest", None, "binary")
897            .ok()
898            .unwrap_or_else(|| panic!("Should create sketch"));
899
900        let sketches = vec![sketch.clone()];
901        let result = cmd_drift_merge(&sketches, &merged, "binary");
902        assert!(result.is_ok());
903        assert!(merged.exists());
904    }
905
906    #[test]
907    fn test_load_sketch_invalid_json() {
908        let temp_dir = tempfile::tempdir()
909            .ok()
910            .unwrap_or_else(|| panic!("Should create temp dir"));
911        let sketch_path = temp_dir.path().join("invalid.json");
912
913        std::fs::write(&sketch_path, "{ invalid json }")
914            .ok()
915            .unwrap_or_else(|| panic!("Should write file"));
916
917        let result = load_sketch(&sketch_path);
918        assert!(result.is_err());
919    }
920
921    #[test]
922    fn test_cmd_drift_detect_all_tests() {
923        let temp_dir = tempfile::tempdir()
924            .ok()
925            .unwrap_or_else(|| panic!("Should create temp dir"));
926        let path = temp_dir.path().join("data.parquet");
927        create_test_parquet(&path, 100);
928
929        let result = cmd_drift_detect(&path, &path, "ks,chi2,psi,js", 0.05, "text");
930        assert!(result.is_ok());
931    }
932
933    #[test]
934    fn test_cmd_drift_detect_with_drift() {
935        let temp_dir = tempfile::tempdir()
936            .ok()
937            .unwrap_or_else(|| panic!("Should create temp dir"));
938        let ref_path = temp_dir.path().join("ref.parquet");
939        let cur_path = temp_dir.path().join("cur.parquet");
940
941        create_test_parquet(&ref_path, 100);
942
943        let schema = Arc::new(Schema::new(vec![
944            Field::new("id", DataType::Int32, false),
945            Field::new("name", DataType::Utf8, false),
946        ]));
947
948        let ids: Vec<i32> = (500..600).collect();
949        let names: Vec<String> = ids.iter().map(|i| format!("different_{}", i)).collect();
950
951        let batch = arrow::array::RecordBatch::try_new(
952            schema,
953            vec![
954                Arc::new(Int32Array::from(ids)),
955                Arc::new(StringArray::from(names)),
956            ],
957        )
958        .ok()
959        .unwrap_or_else(|| panic!("Should create batch"));
960
961        let dataset = ArrowDataset::from_batch(batch)
962            .ok()
963            .unwrap_or_else(|| panic!("Should create dataset"));
964
965        dataset
966            .to_parquet(&cur_path)
967            .ok()
968            .unwrap_or_else(|| panic!("Should write parquet"));
969
970        let result = cmd_drift_detect(&ref_path, &cur_path, "ks,psi", 0.05, "text");
971        assert!(result.is_ok());
972    }
973
974    #[test]
975    fn test_cmd_drift_compare_empty_results() {
976        let temp_dir = tempfile::tempdir()
977            .ok()
978            .unwrap_or_else(|| panic!("Should create temp dir"));
979
980        // Create dataset with only string column (no numeric for sketch)
981        let data_path = temp_dir.path().join("strings.parquet");
982        let sketch1_path = temp_dir.path().join("sketch1.json");
983        let sketch2_path = temp_dir.path().join("sketch2.json");
984
985        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
986
987        let names: Vec<String> = (0..50).map(|i| format!("name_{}", i)).collect();
988
989        let batch =
990            arrow::array::RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(names))])
991                .ok()
992                .unwrap_or_else(|| panic!("Should create batch"));
993
994        let dataset = ArrowDataset::from_batch(batch)
995            .ok()
996            .unwrap_or_else(|| panic!("Should create dataset"));
997
998        dataset
999            .to_parquet(&data_path)
1000            .ok()
1001            .unwrap_or_else(|| panic!("Should write parquet"));
1002
1003        cmd_drift_sketch(&data_path, &sketch1_path, "tdigest", None, "json")
1004            .ok()
1005            .unwrap_or_else(|| panic!("Should create sketch1"));
1006        cmd_drift_sketch(&data_path, &sketch2_path, "tdigest", None, "json")
1007            .ok()
1008            .unwrap_or_else(|| panic!("Should create sketch2"));
1009
1010        let result = cmd_drift_compare(&sketch1_path, &sketch2_path, 0.1, "text");
1011        assert!(result.is_ok());
1012    }
1013
1014    #[test]
1015    fn test_parse_drift_tests_single() {
1016        let tests = parse_drift_tests("ks");
1017        assert_eq!(tests.len(), 1);
1018        assert!(matches!(tests[0], DriftTest::KolmogorovSmirnov));
1019    }
1020
1021    #[test]
1022    fn test_parse_drift_tests_multiple() {
1023        let tests = parse_drift_tests("ks,chi2,psi,js");
1024        assert_eq!(tests.len(), 4);
1025    }
1026
1027    #[test]
1028    fn test_parse_drift_tests_with_spaces() {
1029        let tests = parse_drift_tests("ks, chi2 ,psi");
1030        assert_eq!(tests.len(), 3);
1031    }
1032
1033    #[test]
1034    fn test_parse_drift_tests_unknown() {
1035        let tests = parse_drift_tests("unknown");
1036        assert!(tests.is_empty());
1037    }
1038
1039    #[test]
1040    fn test_parse_sketch_type_tdigest() {
1041        assert!(matches!(
1042            parse_sketch_type("tdigest"),
1043            Some(SketchType::TDigest)
1044        ));
1045    }
1046
1047    #[test]
1048    fn test_parse_sketch_type_ddsketch() {
1049        assert!(matches!(
1050            parse_sketch_type("ddsketch"),
1051            Some(SketchType::DDSketch)
1052        ));
1053    }
1054
1055    #[test]
1056    fn test_parse_sketch_type_unknown() {
1057        assert!(parse_sketch_type("unknown").is_none());
1058    }
1059
1060    #[test]
1061    fn test_cmd_drift_report_basic() {
1062        let temp_dir = tempfile::tempdir()
1063            .ok()
1064            .unwrap_or_else(|| panic!("Should create temp dir"));
1065        let ref_path = temp_dir.path().join("ref.parquet");
1066        let cur_path = temp_dir.path().join("cur.parquet");
1067        create_test_parquet(&ref_path, 100);
1068        create_test_parquet(&cur_path, 100);
1069
1070        let result = cmd_drift_report(&ref_path, &cur_path, None);
1071        assert!(result.is_ok());
1072    }
1073
1074    #[test]
1075    fn test_cmd_drift_report_with_output() {
1076        let temp_dir = tempfile::tempdir()
1077            .ok()
1078            .unwrap_or_else(|| panic!("Should create temp dir"));
1079        let ref_path = temp_dir.path().join("ref.parquet");
1080        let cur_path = temp_dir.path().join("cur.parquet");
1081        let output = temp_dir.path().join("report.html");
1082        create_test_parquet(&ref_path, 100);
1083        create_test_parquet(&cur_path, 100);
1084
1085        let result = cmd_drift_report(&ref_path, &cur_path, Some(&output));
1086        assert!(result.is_ok());
1087        assert!(output.exists());
1088    }
1089
1090    #[test]
1091    fn test_cmd_drift_sketch_ddsketch_json_output() {
1092        let temp_dir = tempfile::tempdir()
1093            .ok()
1094            .unwrap_or_else(|| panic!("Should create temp dir"));
1095        let data_path = temp_dir.path().join("data.parquet");
1096        let sketch_path = temp_dir.path().join("sketch.json");
1097        create_test_parquet(&data_path, 100);
1098
1099        let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
1100        assert!(result.is_ok());
1101    }
1102
1103    #[test]
1104    fn test_cmd_drift_merge() {
1105        let temp_dir = tempfile::tempdir()
1106            .ok()
1107            .unwrap_or_else(|| panic!("Should create temp dir"));
1108        let data1 = temp_dir.path().join("data1.parquet");
1109        let data2 = temp_dir.path().join("data2.parquet");
1110        let sketch1 = temp_dir.path().join("sketch1.json");
1111        let sketch2 = temp_dir.path().join("sketch2.json");
1112        let merged = temp_dir.path().join("merged.json");
1113
1114        // Create datasets with float column
1115        let schema = Arc::new(Schema::new(vec![Field::new(
1116            "value",
1117            DataType::Float64,
1118            false,
1119        )]));
1120
1121        let batch1 = arrow::array::RecordBatch::try_new(
1122            schema.clone(),
1123            vec![Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]))],
1124        )
1125        .unwrap();
1126        let dataset1 = ArrowDataset::from_batch(batch1).unwrap();
1127        dataset1.to_parquet(&data1).unwrap();
1128
1129        let batch2 = arrow::array::RecordBatch::try_new(
1130            schema,
1131            vec![Arc::new(Float64Array::from(vec![6.0, 7.0, 8.0, 9.0, 10.0]))],
1132        )
1133        .unwrap();
1134        let dataset2 = ArrowDataset::from_batch(batch2).unwrap();
1135        dataset2.to_parquet(&data2).unwrap();
1136
1137        cmd_drift_sketch(&data1, &sketch1, "tdigest", None, "json").unwrap();
1138        cmd_drift_sketch(&data2, &sketch2, "tdigest", None, "json").unwrap();
1139
1140        let sketches = vec![sketch1, sketch2];
1141        let result = cmd_drift_merge(&sketches, &merged, "json");
1142        assert!(result.is_ok());
1143    }
1144}