Skip to main content

alimentar/cli/
drift.rs

1//! Drift detection CLI commands.
2
3use std::path::{Path, PathBuf};
4
5use clap::Subcommand;
6
7use super::basic::load_dataset;
8use crate::{
9    drift::{DriftDetector, DriftSeverity, DriftTest},
10    sketch::{DataSketch, DistributedDriftDetector, SketchType},
11    ArrowDataset, Dataset,
12};
13
14/// Drift detection commands.
15#[derive(Subcommand)]
16pub enum DriftCommands {
17    /// Detect drift between reference and current datasets
18    Detect {
19        /// Reference (baseline) dataset
20        #[arg(short, long)]
21        reference: PathBuf,
22        /// Current dataset to compare
23        #[arg(short, long)]
24        current: PathBuf,
25        /// Statistical tests to use (ks, chi2, psi, js)
26        #[arg(short, long, default_value = "ks,psi")]
27        tests: String,
28        /// Significance threshold (alpha)
29        #[arg(short, long, default_value = "0.05")]
30        alpha: f64,
31        /// Output format (text, json)
32        #[arg(short, long, default_value = "text")]
33        format: String,
34    },
35    /// Generate a drift report summary
36    Report {
37        /// Reference (baseline) dataset
38        #[arg(short, long)]
39        reference: PathBuf,
40        /// Current dataset to compare
41        #[arg(short, long)]
42        current: PathBuf,
43        /// Output file for the report (JSON format)
44        #[arg(short, long)]
45        output: Option<PathBuf>,
46    },
47    /// Create a sketch from a dataset for distributed drift detection
48    Sketch {
49        /// Input dataset file
50        input: PathBuf,
51        /// Output sketch file
52        #[arg(short, long)]
53        output: PathBuf,
54        /// Sketch algorithm type (tdigest, ddsketch)
55        #[arg(short = 't', long, default_value = "tdigest")]
56        sketch_type: String,
57        /// Source identifier (e.g., node name)
58        #[arg(short, long)]
59        source: Option<String>,
60        /// Output format (json, binary)
61        #[arg(short, long, default_value = "json")]
62        format: String,
63    },
64    /// Merge multiple sketches into one
65    Merge {
66        /// Sketch files to merge
67        #[arg(required = true)]
68        sketches: Vec<PathBuf>,
69        /// Output merged sketch file
70        #[arg(short, long)]
71        output: PathBuf,
72        /// Output format (json, binary)
73        #[arg(short, long, default_value = "json")]
74        format: String,
75    },
76    /// Compare two sketches for drift
77    Compare {
78        /// Reference (baseline) sketch
79        #[arg(short, long)]
80        reference: PathBuf,
81        /// Current sketch to compare
82        #[arg(short, long)]
83        current: PathBuf,
84        /// Drift detection threshold
85        #[arg(short = 't', long, default_value = "0.1")]
86        threshold: f64,
87        /// Output format (text, json)
88        #[arg(short, long, default_value = "text")]
89        format: String,
90    },
91}
92
93/// Parse drift test types from a comma-separated string.
94pub(crate) fn parse_drift_tests(tests_str: &str) -> Vec<DriftTest> {
95    tests_str
96        .split(',')
97        .filter_map(|t| match t.trim().to_lowercase().as_str() {
98            "ks" => Some(DriftTest::KolmogorovSmirnov),
99            "chi2" | "chisquared" => Some(DriftTest::ChiSquared),
100            "psi" => Some(DriftTest::PSI),
101            "js" | "jensenshannon" => Some(DriftTest::JensenShannon),
102            _ => None,
103        })
104        .collect()
105}
106
107/// Get severity symbol for display.
108pub(crate) fn severity_symbol(severity: DriftSeverity) -> &'static str {
109    match severity {
110        DriftSeverity::None => "\u{2713}",     // checkmark
111        DriftSeverity::Low => "\u{25CB}",      // empty circle
112        DriftSeverity::Medium => "\u{25CF}",   // filled circle
113        DriftSeverity::High => "\u{25B2}",     // triangle
114        DriftSeverity::Critical => "\u{2716}", // X mark
115    }
116}
117
118/// Detect drift between reference and current datasets.
119pub(crate) fn cmd_drift_detect(
120    reference: &Path,
121    current: &Path,
122    tests_str: &str,
123    alpha: f64,
124    format: &str,
125) -> crate::Result<()> {
126    let ref_dataset = load_dataset(reference)?;
127    let cur_dataset = load_dataset(current)?;
128
129    let tests = parse_drift_tests(tests_str);
130    if tests.is_empty() {
131        return Err(crate::Error::invalid_config(
132            "No valid tests specified. Use: ks, chi2, psi, js",
133        ));
134    }
135
136    let report = run_drift_detection(ref_dataset, &cur_dataset, tests, alpha)?;
137
138    if format == "json" {
139        print_drift_report_json(&report)
140    } else {
141        print_drift_report_text(&report, reference, current, alpha);
142        Ok(())
143    }
144}
145
146fn run_drift_detection(
147    ref_dataset: ArrowDataset,
148    cur_dataset: &ArrowDataset,
149    tests: Vec<DriftTest>,
150    alpha: f64,
151) -> crate::Result<crate::drift::DriftReport> {
152    let mut detector = DriftDetector::new(ref_dataset).with_alpha(alpha);
153    for test in tests {
154        detector = detector.with_test(test);
155    }
156    detector.detect(cur_dataset)
157}
158
159fn print_drift_report_json(report: &crate::drift::DriftReport) -> crate::Result<()> {
160    let json = serde_json::json!({
161        "drift_detected": report.drift_detected,
162        "columns": report.column_scores.values().map(|d| {
163            serde_json::json!({
164                "column": d.column,
165                "test": format!("{:?}", d.test),
166                "statistic": d.statistic,
167                "p_value": d.p_value,
168                "drift_detected": d.drift_detected,
169                "severity": format!("{:?}", d.severity),
170            })
171        }).collect::<Vec<_>>()
172    });
173    println!(
174        "{}",
175        serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?
176    );
177    Ok(())
178}
179
180fn print_drift_report_text(
181    report: &crate::drift::DriftReport,
182    reference: &Path,
183    current: &Path,
184    alpha: f64,
185) {
186    println!("Drift Detection Report");
187    println!("======================");
188    println!("Reference: {}", reference.display());
189    println!("Current:   {}", current.display());
190    println!("Alpha:     {}", alpha);
191    println!();
192
193    if report.drift_detected {
194        println!("\u{26A0}\u{FE0F}  DRIFT DETECTED\n");
195    } else {
196        println!("\u{2713} No significant drift detected\n");
197    }
198
199    print_drift_table(report);
200    print_drifted_columns(report);
201}
202
203fn print_drift_table(report: &crate::drift::DriftReport) {
204    println!(
205        "{:<20} {:<15} {:<12} {:<12} {:<10} DRIFT",
206        "COLUMN", "TEST", "STATISTIC", "P-VALUE", "SEVERITY"
207    );
208    println!("{}", "-".repeat(80));
209
210    for drift in report.column_scores.values() {
211        let drift_str = if drift.drift_detected { "YES" } else { "no" };
212        let p_value_str = drift
213            .p_value
214            .map_or_else(|| "N/A".to_string(), |p| format!("{:.4}", p));
215        println!(
216            "{:<20} {:<15} {:<12.4} {:<12} {:<10} {} {}",
217            drift.column,
218            format!("{:?}", drift.test),
219            drift.statistic,
220            p_value_str,
221            format!("{:?}", drift.severity),
222            severity_symbol(drift.severity),
223            drift_str
224        );
225    }
226}
227
228fn print_drifted_columns(report: &crate::drift::DriftReport) {
229    println!();
230    let drifted: Vec<_> = report
231        .column_scores
232        .values()
233        .filter(|d| d.drift_detected)
234        .map(|d| d.column.clone())
235        .collect();
236    if !drifted.is_empty() {
237        println!("Columns with drift: {}", drifted.join(", "));
238    }
239}
240
241/// Generate a drift report summary.
242pub(crate) fn cmd_drift_report(
243    reference: &Path,
244    current: &Path,
245    output: Option<&PathBuf>,
246) -> crate::Result<()> {
247    let ref_dataset = load_dataset(reference)?;
248    let cur_dataset = load_dataset(current)?;
249
250    let detector = DriftDetector::new(ref_dataset)
251        .with_test(DriftTest::KolmogorovSmirnov)
252        .with_test(DriftTest::PSI)
253        .with_test(DriftTest::JensenShannon);
254
255    let report = detector.detect(&cur_dataset)?;
256
257    let drifted_count = report
258        .column_scores
259        .values()
260        .filter(|d| d.drift_detected)
261        .count();
262    let json = serde_json::json!({
263        "reference": reference.display().to_string(),
264        "current": current.display().to_string(),
265        "drift_detected": report.drift_detected,
266        "summary": {
267            "total_columns": report.column_scores.len(),
268            "drifted_columns": drifted_count,
269        },
270        "columns": report.column_scores.values().map(|d| {
271            serde_json::json!({
272                "column": d.column,
273                "test": format!("{:?}", d.test),
274                "statistic": d.statistic,
275                "p_value": d.p_value,
276                "drift_detected": d.drift_detected,
277                "severity": format!("{:?}", d.severity),
278            })
279        }).collect::<Vec<_>>()
280    });
281
282    let json_str =
283        serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
284
285    if let Some(output_path) = output {
286        std::fs::write(output_path, &json_str).map_err(|e| crate::Error::io(e, output_path))?;
287        println!("Drift report written to: {}", output_path.display());
288    } else {
289        println!("{}", json_str);
290    }
291
292    Ok(())
293}
294
295/// Parse sketch type from string.
296pub(crate) fn parse_sketch_type(s: &str) -> Option<SketchType> {
297    match s.to_lowercase().as_str() {
298        "tdigest" | "t-digest" => Some(SketchType::TDigest),
299        "ddsketch" | "dd-sketch" => Some(SketchType::DDSketch),
300        _ => None,
301    }
302}
303
304/// Create a sketch from a dataset for distributed drift detection.
305pub(crate) fn cmd_drift_sketch(
306    input: &Path,
307    output: &Path,
308    sketch_type: &str,
309    source: Option<&str>,
310    format: &str,
311) -> crate::Result<()> {
312    let sketch_type = parse_sketch_type(sketch_type).ok_or_else(|| {
313        crate::Error::invalid_config(format!(
314            "Unknown sketch type: {}. Use 'tdigest' or 'ddsketch'",
315            sketch_type
316        ))
317    })?;
318
319    let dataset = load_dataset(input)?;
320    let mut sketch = DataSketch::from_dataset(&dataset, sketch_type)?;
321
322    if let Some(src) = source {
323        sketch = sketch.with_source(src);
324    }
325
326    match format {
327        "binary" | "bin" => {
328            let bytes = sketch.to_bytes()?;
329            std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
330        }
331        _ => {
332            // Default to JSON
333            let json = serde_json::to_string_pretty(&sketch)
334                .map_err(|e| crate::Error::Format(e.to_string()))?;
335            std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
336        }
337    }
338
339    println!(
340        "Created {} sketch from {} ({} rows) -> {}",
341        sketch_type.name(),
342        input.display(),
343        dataset.len(),
344        output.display()
345    );
346
347    Ok(())
348}
349
350/// Load a sketch from a file.
351pub(crate) fn load_sketch(path: &Path) -> crate::Result<DataSketch> {
352    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
353
354    match ext {
355        "bin" | "binary" => {
356            let bytes = std::fs::read(path).map_err(|e| crate::Error::io(e, path))?;
357            DataSketch::from_bytes(&bytes)
358        }
359        _ => {
360            // Default to JSON
361            let json = std::fs::read_to_string(path).map_err(|e| crate::Error::io(e, path))?;
362            serde_json::from_str(&json)
363                .map_err(|e| crate::Error::Format(format!("Invalid sketch JSON: {}", e)))
364        }
365    }
366}
367
368/// Merge multiple sketches into one.
369pub(crate) fn cmd_drift_merge(
370    sketches: &[PathBuf],
371    output: &Path,
372    format: &str,
373) -> crate::Result<()> {
374    if sketches.is_empty() {
375        return Err(crate::Error::invalid_config(
376            "No sketches provided to merge",
377        ));
378    }
379
380    let loaded: Vec<DataSketch> = sketches
381        .iter()
382        .map(|p| load_sketch(p))
383        .collect::<Result<Vec<_>, _>>()?;
384
385    let merged = DataSketch::merge(&loaded)?;
386
387    match format {
388        "binary" | "bin" => {
389            let bytes = merged.to_bytes()?;
390            std::fs::write(output, bytes).map_err(|e| crate::Error::io(e, output))?;
391        }
392        _ => {
393            let json = serde_json::to_string_pretty(&merged)
394                .map_err(|e| crate::Error::Format(e.to_string()))?;
395            std::fs::write(output, json).map_err(|e| crate::Error::io(e, output))?;
396        }
397    }
398
399    println!(
400        "Merged {} sketches ({} total rows) -> {}",
401        sketches.len(),
402        merged.row_count,
403        output.display()
404    );
405
406    Ok(())
407}
408
409/// Compare two sketches for drift.
410pub(crate) fn cmd_drift_compare(
411    reference: &Path,
412    current: &Path,
413    threshold: f64,
414    format: &str,
415) -> crate::Result<()> {
416    let ref_sketch = load_sketch(reference)?;
417    let cur_sketch = load_sketch(current)?;
418
419    let detector = DistributedDriftDetector::new()
420        .with_sketch_type(ref_sketch.sketch_type)
421        .with_threshold(threshold);
422
423    let results = detector.compare(&ref_sketch, &cur_sketch)?;
424
425    let drift_detected = results.iter().any(|r| r.severity.is_drift());
426
427    if format == "json" {
428        let json = serde_json::json!({
429            "reference": reference.display().to_string(),
430            "current": current.display().to_string(),
431            "drift_detected": drift_detected,
432            "threshold": threshold,
433            "columns": results.iter().map(|r| {
434                serde_json::json!({
435                    "column": r.column,
436                    "statistic": r.statistic,
437                    "severity": format!("{:?}", r.severity),
438                    "drift_detected": r.severity.is_drift(),
439                })
440            }).collect::<Vec<_>>()
441        });
442
443        let json_str =
444            serde_json::to_string_pretty(&json).map_err(|e| crate::Error::Format(e.to_string()))?;
445        println!("{}", json_str);
446    } else {
447        // Text format
448        println!("Sketch Drift Comparison");
449        println!("=======================");
450        println!("Reference: {}", reference.display());
451        println!("Current:   {}", current.display());
452        println!("Threshold: {}", threshold);
453        println!();
454
455        if results.is_empty() {
456            println!("No numeric columns to compare.");
457        } else {
458            println!(
459                "{:<20} {:>10} {:>10} DRIFT",
460                "COLUMN", "STATISTIC", "SEVERITY"
461            );
462            println!("{}", "-".repeat(55));
463
464            for result in &results {
465                let drift_symbol = if result.severity.is_drift() {
466                    severity_symbol(result.severity)
467                } else {
468                    "\u{2713}"
469                };
470                println!(
471                    "{:<20} {:>10.4} {:>10} {}",
472                    result.column,
473                    result.statistic,
474                    format!("{:?}", result.severity),
475                    drift_symbol
476                );
477            }
478
479            println!();
480            if drift_detected {
481                println!("\u{26A0} Drift detected in one or more columns");
482            } else {
483                println!("\u{2713} No significant drift detected");
484            }
485        }
486    }
487
488    Ok(())
489}
490
491#[cfg(test)]
492#[allow(
493    clippy::cast_possible_truncation,
494    clippy::cast_possible_wrap,
495    clippy::cast_precision_loss,
496    clippy::uninlined_format_args,
497    clippy::unwrap_used,
498    clippy::expect_used,
499    clippy::redundant_clone,
500    clippy::cast_lossless,
501    clippy::redundant_closure_for_method_calls,
502    clippy::too_many_lines,
503    clippy::float_cmp,
504    clippy::similar_names,
505    clippy::needless_late_init,
506    clippy::redundant_pattern_matching
507)]
508mod tests {
509    use std::sync::Arc;
510
511    use arrow::{
512        array::{Float64Array, Int32Array, StringArray},
513        datatypes::{DataType, Field, Schema},
514    };
515
516    use super::*;
517    use crate::ArrowDataset;
518
519    fn create_test_parquet(path: &Path, rows: usize) {
520        let schema = Arc::new(Schema::new(vec![
521            Field::new("id", DataType::Int32, false),
522            Field::new("name", DataType::Utf8, false),
523        ]));
524
525        let ids: Vec<i32> = (0..rows as i32).collect();
526        let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
527
528        let batch = arrow::array::RecordBatch::try_new(
529            schema,
530            vec![
531                Arc::new(Int32Array::from(ids)),
532                Arc::new(StringArray::from(names)),
533            ],
534        )
535        .ok()
536        .unwrap_or_else(|| panic!("Should create batch"));
537
538        let dataset = ArrowDataset::from_batch(batch)
539            .ok()
540            .unwrap_or_else(|| panic!("Should create dataset"));
541
542        dataset
543            .to_parquet(path)
544            .ok()
545            .unwrap_or_else(|| panic!("Should write parquet"));
546    }
547
548    fn create_test_float_parquet(path: &Path, rows: usize) {
549        let schema = Arc::new(Schema::new(vec![
550            Field::new("id", DataType::Int32, false),
551            Field::new("value", DataType::Float64, false),
552        ]));
553
554        let ids: Vec<i32> = (0..rows as i32).collect();
555        let values: Vec<f64> = ids.iter().map(|i| *i as f64 * 1.5).collect();
556
557        let batch = arrow::array::RecordBatch::try_new(
558            schema,
559            vec![
560                Arc::new(Int32Array::from(ids)),
561                Arc::new(Float64Array::from(values)),
562            ],
563        )
564        .ok()
565        .unwrap_or_else(|| panic!("Should create batch"));
566
567        let dataset = ArrowDataset::from_batch(batch)
568            .ok()
569            .unwrap_or_else(|| panic!("Should create dataset"));
570
571        dataset
572            .to_parquet(path)
573            .ok()
574            .unwrap_or_else(|| panic!("Should write parquet"));
575    }
576
577    #[test]
578    fn test_parse_drift_tests() {
579        let tests = parse_drift_tests("ks,psi");
580        assert_eq!(tests.len(), 2);
581
582        let tests = parse_drift_tests("ks, chi2, psi, js");
583        assert_eq!(tests.len(), 4);
584
585        let tests = parse_drift_tests("invalid");
586        assert!(tests.is_empty());
587
588        let tests = parse_drift_tests("KS,PSI");
589        assert_eq!(tests.len(), 2);
590    }
591
592    #[test]
593    fn test_severity_symbol() {
594        assert_eq!(severity_symbol(DriftSeverity::None), "\u{2713}");
595        assert_eq!(severity_symbol(DriftSeverity::Low), "\u{25CB}");
596        assert_eq!(severity_symbol(DriftSeverity::Medium), "\u{25CF}");
597        assert_eq!(severity_symbol(DriftSeverity::High), "\u{25B2}");
598        assert_eq!(severity_symbol(DriftSeverity::Critical), "\u{2716}");
599    }
600
601    #[test]
602    fn test_cmd_drift_detect_same_data() {
603        let temp_dir = tempfile::tempdir()
604            .ok()
605            .unwrap_or_else(|| panic!("Should create temp dir"));
606        let path = temp_dir.path().join("data.parquet");
607        create_test_parquet(&path, 100);
608
609        // Compare dataset with itself - should detect no drift
610        let result = cmd_drift_detect(&path, &path, "ks,psi", 0.05, "text");
611        assert!(result.is_ok());
612    }
613
614    #[test]
615    fn test_cmd_drift_detect_json_format() {
616        let temp_dir = tempfile::tempdir()
617            .ok()
618            .unwrap_or_else(|| panic!("Should create temp dir"));
619        let path = temp_dir.path().join("data.parquet");
620        create_test_parquet(&path, 100);
621
622        let result = cmd_drift_detect(&path, &path, "ks", 0.05, "json");
623        assert!(result.is_ok());
624    }
625
626    #[test]
627    fn test_cmd_drift_detect_invalid_tests() {
628        let temp_dir = tempfile::tempdir()
629            .ok()
630            .unwrap_or_else(|| panic!("Should create temp dir"));
631        let path = temp_dir.path().join("data.parquet");
632        create_test_parquet(&path, 100);
633
634        let result = cmd_drift_detect(&path, &path, "invalid", 0.05, "text");
635        assert!(result.is_err());
636    }
637
638    #[test]
639    fn test_cmd_drift_report() {
640        let temp_dir = tempfile::tempdir()
641            .ok()
642            .unwrap_or_else(|| panic!("Should create temp dir"));
643        let path = temp_dir.path().join("data.parquet");
644        create_test_parquet(&path, 100);
645
646        // Report without output file (prints to stdout)
647        let result = cmd_drift_report(&path, &path, None);
648        assert!(result.is_ok());
649    }
650
651    #[test]
652    fn test_cmd_drift_report_to_file() {
653        let temp_dir = tempfile::tempdir()
654            .ok()
655            .unwrap_or_else(|| panic!("Should create temp dir"));
656        let data_path = temp_dir.path().join("data.parquet");
657        let output_path = temp_dir.path().join("report.json");
658        create_test_parquet(&data_path, 100);
659
660        let result = cmd_drift_report(&data_path, &data_path, Some(&output_path));
661        assert!(result.is_ok());
662        assert!(output_path.exists());
663
664        // Verify JSON is valid
665        let content = std::fs::read_to_string(&output_path)
666            .ok()
667            .unwrap_or_else(|| panic!("Should read file"));
668        let parsed: serde_json::Value = serde_json::from_str(&content)
669            .ok()
670            .unwrap_or_else(|| panic!("Should parse JSON"));
671        assert!(parsed.get("drift_detected").is_some());
672    }
673
674    #[test]
675    fn test_parse_sketch_type() {
676        assert!(matches!(
677            parse_sketch_type("tdigest"),
678            Some(SketchType::TDigest)
679        ));
680        assert!(matches!(
681            parse_sketch_type("TDIGEST"),
682            Some(SketchType::TDigest)
683        ));
684        assert!(matches!(
685            parse_sketch_type("ddsketch"),
686            Some(SketchType::DDSketch)
687        ));
688        assert!(matches!(
689            parse_sketch_type("DDSKETCH"),
690            Some(SketchType::DDSketch)
691        ));
692        assert!(parse_sketch_type("invalid").is_none());
693    }
694
695    #[test]
696    fn test_cmd_drift_sketch_tdigest() {
697        let temp_dir = tempfile::tempdir()
698            .ok()
699            .unwrap_or_else(|| panic!("Should create temp dir"));
700        let data_path = temp_dir.path().join("data.parquet");
701        let sketch_path = temp_dir.path().join("sketch.json");
702        create_test_float_parquet(&data_path, 100);
703
704        let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "json");
705        assert!(result.is_ok());
706        assert!(sketch_path.exists());
707
708        // Verify sketch file is valid JSON
709        let content = std::fs::read_to_string(&sketch_path)
710            .ok()
711            .unwrap_or_else(|| panic!("Should read file"));
712        let parsed: serde_json::Value = serde_json::from_str(&content)
713            .ok()
714            .unwrap_or_else(|| panic!("Should parse JSON"));
715        assert!(parsed.get("sketch_type").is_some());
716        assert!(parsed.get("row_count").is_some());
717    }
718
719    #[test]
720    fn test_cmd_drift_sketch_ddsketch_type() {
721        let temp_dir = tempfile::tempdir()
722            .ok()
723            .unwrap_or_else(|| panic!("Should create temp dir"));
724        let data_path = temp_dir.path().join("data.parquet");
725        let sketch_path = temp_dir.path().join("sketch.json");
726        create_test_float_parquet(&data_path, 100);
727
728        let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
729        assert!(result.is_ok());
730        assert!(sketch_path.exists());
731    }
732
733    #[test]
734    fn test_cmd_drift_sketch_with_source() {
735        let temp_dir = tempfile::tempdir()
736            .ok()
737            .unwrap_or_else(|| panic!("Should create temp dir"));
738        let data_path = temp_dir.path().join("data.parquet");
739        let sketch_path = temp_dir.path().join("sketch.json");
740        create_test_float_parquet(&data_path, 50);
741
742        let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", Some("node-1"), "json");
743        assert!(result.is_ok());
744
745        // Verify source is in the output
746        let content = std::fs::read_to_string(&sketch_path)
747            .ok()
748            .unwrap_or_else(|| panic!("Should read file"));
749        let parsed: serde_json::Value = serde_json::from_str(&content)
750            .ok()
751            .unwrap_or_else(|| panic!("Should parse JSON"));
752        assert_eq!(
753            parsed.get("source").and_then(|v| v.as_str()),
754            Some("node-1")
755        );
756    }
757
758    #[test]
759    fn test_cmd_drift_sketch_binary_format() {
760        let temp_dir = tempfile::tempdir()
761            .ok()
762            .unwrap_or_else(|| panic!("Should create temp dir"));
763        let data_path = temp_dir.path().join("data.parquet");
764        let sketch_path = temp_dir.path().join("sketch.bin");
765        create_test_float_parquet(&data_path, 100);
766
767        let result = cmd_drift_sketch(&data_path, &sketch_path, "tdigest", None, "binary");
768        assert!(result.is_ok());
769        assert!(sketch_path.exists());
770
771        // Binary file should exist and be non-empty
772        let metadata = std::fs::metadata(&sketch_path)
773            .ok()
774            .unwrap_or_else(|| panic!("Should get metadata"));
775        assert!(metadata.len() > 0);
776    }
777
778    #[test]
779    fn test_cmd_drift_sketch_invalid_type() {
780        let temp_dir = tempfile::tempdir()
781            .ok()
782            .unwrap_or_else(|| panic!("Should create temp dir"));
783        let data_path = temp_dir.path().join("data.parquet");
784        let sketch_path = temp_dir.path().join("sketch.json");
785        create_test_float_parquet(&data_path, 100);
786
787        let result = cmd_drift_sketch(&data_path, &sketch_path, "invalid", None, "json");
788        assert!(result.is_err());
789    }
790
791    #[test]
792    fn test_cmd_drift_merge_tdigest() {
793        let temp_dir = tempfile::tempdir()
794            .ok()
795            .unwrap_or_else(|| panic!("Should create temp dir"));
796
797        // Create two datasets
798        let data1 = temp_dir.path().join("data1.parquet");
799        let data2 = temp_dir.path().join("data2.parquet");
800        let sketch1 = temp_dir.path().join("sketch1.json");
801        let sketch2 = temp_dir.path().join("sketch2.json");
802        let merged = temp_dir.path().join("merged.json");
803
804        create_test_float_parquet(&data1, 50);
805        create_test_float_parquet(&data2, 50);
806
807        // Create sketches
808        cmd_drift_sketch(&data1, &sketch1, "tdigest", Some("node-1"), "json")
809            .ok()
810            .unwrap_or_else(|| panic!("Should create sketch1"));
811        cmd_drift_sketch(&data2, &sketch2, "tdigest", Some("node-2"), "json")
812            .ok()
813            .unwrap_or_else(|| panic!("Should create sketch2"));
814
815        // Merge
816        let sketches = vec![sketch1.clone(), sketch2.clone()];
817        let result = cmd_drift_merge(&sketches, &merged, "json");
818        assert!(result.is_ok());
819        assert!(merged.exists());
820
821        // Verify merged sketch
822        let content = std::fs::read_to_string(&merged)
823            .ok()
824            .unwrap_or_else(|| panic!("Should read file"));
825        let parsed: serde_json::Value = serde_json::from_str(&content)
826            .ok()
827            .unwrap_or_else(|| panic!("Should parse JSON"));
828        assert_eq!(parsed.get("row_count").and_then(|v| v.as_u64()), Some(100));
829    }
830
831    #[test]
832    fn test_cmd_drift_merge_single_sketch() {
833        let temp_dir = tempfile::tempdir()
834            .ok()
835            .unwrap_or_else(|| panic!("Should create temp dir"));
836
837        let data = temp_dir.path().join("data.parquet");
838        let sketch = temp_dir.path().join("sketch.json");
839        let merged = temp_dir.path().join("merged.json");
840
841        create_test_float_parquet(&data, 100);
842        cmd_drift_sketch(&data, &sketch, "tdigest", None, "json")
843            .ok()
844            .unwrap_or_else(|| panic!("Should create sketch"));
845
846        // Merge single sketch
847        let sketches = vec![sketch.clone()];
848        let result = cmd_drift_merge(&sketches, &merged, "json");
849        assert!(result.is_ok());
850    }
851
852    #[test]
853    fn test_cmd_drift_merge_empty_fails() {
854        let temp_dir = tempfile::tempdir()
855            .ok()
856            .unwrap_or_else(|| panic!("Should create temp dir"));
857        let merged = temp_dir.path().join("merged.json");
858
859        let sketches: Vec<PathBuf> = vec![];
860        let result = cmd_drift_merge(&sketches, &merged, "json");
861        assert!(result.is_err());
862    }
863
864    #[test]
865    fn test_cmd_drift_compare_no_drift() {
866        let temp_dir = tempfile::tempdir()
867            .ok()
868            .unwrap_or_else(|| panic!("Should create temp dir"));
869
870        let data = temp_dir.path().join("data.parquet");
871        let sketch1 = temp_dir.path().join("sketch1.json");
872        let sketch2 = temp_dir.path().join("sketch2.json");
873
874        create_test_float_parquet(&data, 100);
875
876        // Create two identical sketches
877        cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
878            .ok()
879            .unwrap_or_else(|| panic!("Should create sketch1"));
880        cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
881            .ok()
882            .unwrap_or_else(|| panic!("Should create sketch2"));
883
884        let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "text");
885        assert!(result.is_ok());
886    }
887
888    #[test]
889    fn test_cmd_drift_compare_json_format() {
890        let temp_dir = tempfile::tempdir()
891            .ok()
892            .unwrap_or_else(|| panic!("Should create temp dir"));
893
894        let data = temp_dir.path().join("data.parquet");
895        let sketch1 = temp_dir.path().join("sketch1.json");
896        let sketch2 = temp_dir.path().join("sketch2.json");
897
898        create_test_float_parquet(&data, 100);
899
900        cmd_drift_sketch(&data, &sketch1, "tdigest", None, "json")
901            .ok()
902            .unwrap_or_else(|| panic!("Should create sketch1"));
903        cmd_drift_sketch(&data, &sketch2, "tdigest", None, "json")
904            .ok()
905            .unwrap_or_else(|| panic!("Should create sketch2"));
906
907        let result = cmd_drift_compare(&sketch1, &sketch2, 0.1, "json");
908        assert!(result.is_ok());
909    }
910
911    #[test]
912    fn test_cmd_drift_merge_binary_format() {
913        let temp_dir = tempfile::tempdir()
914            .ok()
915            .unwrap_or_else(|| panic!("Should create temp dir"));
916
917        let data = temp_dir.path().join("data.parquet");
918        let sketch = temp_dir.path().join("sketch.bin");
919        let merged = temp_dir.path().join("merged.bin");
920
921        create_test_float_parquet(&data, 100);
922        cmd_drift_sketch(&data, &sketch, "tdigest", None, "binary")
923            .ok()
924            .unwrap_or_else(|| panic!("Should create sketch"));
925
926        let sketches = vec![sketch.clone()];
927        let result = cmd_drift_merge(&sketches, &merged, "binary");
928        assert!(result.is_ok());
929        assert!(merged.exists());
930    }
931
932    #[test]
933    fn test_load_sketch_invalid_json() {
934        let temp_dir = tempfile::tempdir()
935            .ok()
936            .unwrap_or_else(|| panic!("Should create temp dir"));
937        let sketch_path = temp_dir.path().join("invalid.json");
938
939        std::fs::write(&sketch_path, "{ invalid json }")
940            .ok()
941            .unwrap_or_else(|| panic!("Should write file"));
942
943        let result = load_sketch(&sketch_path);
944        assert!(result.is_err());
945    }
946
947    #[test]
948    fn test_cmd_drift_detect_all_tests() {
949        let temp_dir = tempfile::tempdir()
950            .ok()
951            .unwrap_or_else(|| panic!("Should create temp dir"));
952        let path = temp_dir.path().join("data.parquet");
953        create_test_parquet(&path, 100);
954
955        let result = cmd_drift_detect(&path, &path, "ks,chi2,psi,js", 0.05, "text");
956        assert!(result.is_ok());
957    }
958
959    #[test]
960    fn test_cmd_drift_detect_with_drift() {
961        let temp_dir = tempfile::tempdir()
962            .ok()
963            .unwrap_or_else(|| panic!("Should create temp dir"));
964        let ref_path = temp_dir.path().join("ref.parquet");
965        let cur_path = temp_dir.path().join("cur.parquet");
966
967        create_test_parquet(&ref_path, 100);
968
969        let schema = Arc::new(Schema::new(vec![
970            Field::new("id", DataType::Int32, false),
971            Field::new("name", DataType::Utf8, false),
972        ]));
973
974        let ids: Vec<i32> = (500..600).collect();
975        let names: Vec<String> = ids.iter().map(|i| format!("different_{}", i)).collect();
976
977        let batch = arrow::array::RecordBatch::try_new(
978            schema,
979            vec![
980                Arc::new(Int32Array::from(ids)),
981                Arc::new(StringArray::from(names)),
982            ],
983        )
984        .ok()
985        .unwrap_or_else(|| panic!("Should create batch"));
986
987        let dataset = ArrowDataset::from_batch(batch)
988            .ok()
989            .unwrap_or_else(|| panic!("Should create dataset"));
990
991        dataset
992            .to_parquet(&cur_path)
993            .ok()
994            .unwrap_or_else(|| panic!("Should write parquet"));
995
996        let result = cmd_drift_detect(&ref_path, &cur_path, "ks,psi", 0.05, "text");
997        assert!(result.is_ok());
998    }
999
1000    #[test]
1001    fn test_cmd_drift_compare_empty_results() {
1002        let temp_dir = tempfile::tempdir()
1003            .ok()
1004            .unwrap_or_else(|| panic!("Should create temp dir"));
1005
1006        // Create dataset with only string column (no numeric for sketch)
1007        let data_path = temp_dir.path().join("strings.parquet");
1008        let sketch1_path = temp_dir.path().join("sketch1.json");
1009        let sketch2_path = temp_dir.path().join("sketch2.json");
1010
1011        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
1012
1013        let names: Vec<String> = (0..50).map(|i| format!("name_{}", i)).collect();
1014
1015        let batch =
1016            arrow::array::RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(names))])
1017                .ok()
1018                .unwrap_or_else(|| panic!("Should create batch"));
1019
1020        let dataset = ArrowDataset::from_batch(batch)
1021            .ok()
1022            .unwrap_or_else(|| panic!("Should create dataset"));
1023
1024        dataset
1025            .to_parquet(&data_path)
1026            .ok()
1027            .unwrap_or_else(|| panic!("Should write parquet"));
1028
1029        cmd_drift_sketch(&data_path, &sketch1_path, "tdigest", None, "json")
1030            .ok()
1031            .unwrap_or_else(|| panic!("Should create sketch1"));
1032        cmd_drift_sketch(&data_path, &sketch2_path, "tdigest", None, "json")
1033            .ok()
1034            .unwrap_or_else(|| panic!("Should create sketch2"));
1035
1036        let result = cmd_drift_compare(&sketch1_path, &sketch2_path, 0.1, "text");
1037        assert!(result.is_ok());
1038    }
1039
1040    #[test]
1041    fn test_parse_drift_tests_single() {
1042        let tests = parse_drift_tests("ks");
1043        assert_eq!(tests.len(), 1);
1044        assert!(matches!(tests[0], DriftTest::KolmogorovSmirnov));
1045    }
1046
1047    #[test]
1048    fn test_parse_drift_tests_multiple() {
1049        let tests = parse_drift_tests("ks,chi2,psi,js");
1050        assert_eq!(tests.len(), 4);
1051    }
1052
1053    #[test]
1054    fn test_parse_drift_tests_with_spaces() {
1055        let tests = parse_drift_tests("ks, chi2 ,psi");
1056        assert_eq!(tests.len(), 3);
1057    }
1058
1059    #[test]
1060    fn test_parse_drift_tests_unknown() {
1061        let tests = parse_drift_tests("unknown");
1062        assert!(tests.is_empty());
1063    }
1064
1065    #[test]
1066    fn test_parse_sketch_type_tdigest() {
1067        assert!(matches!(
1068            parse_sketch_type("tdigest"),
1069            Some(SketchType::TDigest)
1070        ));
1071    }
1072
1073    #[test]
1074    fn test_parse_sketch_type_ddsketch() {
1075        assert!(matches!(
1076            parse_sketch_type("ddsketch"),
1077            Some(SketchType::DDSketch)
1078        ));
1079    }
1080
1081    #[test]
1082    fn test_parse_sketch_type_unknown() {
1083        assert!(parse_sketch_type("unknown").is_none());
1084    }
1085
1086    #[test]
1087    fn test_cmd_drift_report_basic() {
1088        let temp_dir = tempfile::tempdir()
1089            .ok()
1090            .unwrap_or_else(|| panic!("Should create temp dir"));
1091        let ref_path = temp_dir.path().join("ref.parquet");
1092        let cur_path = temp_dir.path().join("cur.parquet");
1093        create_test_parquet(&ref_path, 100);
1094        create_test_parquet(&cur_path, 100);
1095
1096        let result = cmd_drift_report(&ref_path, &cur_path, None);
1097        assert!(result.is_ok());
1098    }
1099
1100    #[test]
1101    fn test_cmd_drift_report_with_output() {
1102        let temp_dir = tempfile::tempdir()
1103            .ok()
1104            .unwrap_or_else(|| panic!("Should create temp dir"));
1105        let ref_path = temp_dir.path().join("ref.parquet");
1106        let cur_path = temp_dir.path().join("cur.parquet");
1107        let output = temp_dir.path().join("report.html");
1108        create_test_parquet(&ref_path, 100);
1109        create_test_parquet(&cur_path, 100);
1110
1111        let result = cmd_drift_report(&ref_path, &cur_path, Some(&output));
1112        assert!(result.is_ok());
1113        assert!(output.exists());
1114    }
1115
1116    #[test]
1117    fn test_cmd_drift_sketch_ddsketch_json_output() {
1118        let temp_dir = tempfile::tempdir()
1119            .ok()
1120            .unwrap_or_else(|| panic!("Should create temp dir"));
1121        let data_path = temp_dir.path().join("data.parquet");
1122        let sketch_path = temp_dir.path().join("sketch.json");
1123        create_test_parquet(&data_path, 100);
1124
1125        let result = cmd_drift_sketch(&data_path, &sketch_path, "ddsketch", None, "json");
1126        assert!(result.is_ok());
1127    }
1128
1129    #[test]
1130    fn test_cmd_drift_merge() {
1131        let temp_dir = tempfile::tempdir()
1132            .ok()
1133            .unwrap_or_else(|| panic!("Should create temp dir"));
1134        let data1 = temp_dir.path().join("data1.parquet");
1135        let data2 = temp_dir.path().join("data2.parquet");
1136        let sketch1 = temp_dir.path().join("sketch1.json");
1137        let sketch2 = temp_dir.path().join("sketch2.json");
1138        let merged = temp_dir.path().join("merged.json");
1139
1140        // Create datasets with float column
1141        let schema = Arc::new(Schema::new(vec![Field::new(
1142            "value",
1143            DataType::Float64,
1144            false,
1145        )]));
1146
1147        let batch1 = arrow::array::RecordBatch::try_new(
1148            schema.clone(),
1149            vec![Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]))],
1150        )
1151        .unwrap();
1152        let dataset1 = ArrowDataset::from_batch(batch1).unwrap();
1153        dataset1.to_parquet(&data1).unwrap();
1154
1155        let batch2 = arrow::array::RecordBatch::try_new(
1156            schema,
1157            vec![Arc::new(Float64Array::from(vec![6.0, 7.0, 8.0, 9.0, 10.0]))],
1158        )
1159        .unwrap();
1160        let dataset2 = ArrowDataset::from_batch(batch2).unwrap();
1161        dataset2.to_parquet(&data2).unwrap();
1162
1163        cmd_drift_sketch(&data1, &sketch1, "tdigest", None, "json").unwrap();
1164        cmd_drift_sketch(&data2, &sketch2, "tdigest", None, "json").unwrap();
1165
1166        let sketches = vec![sketch1, sketch2];
1167        let result = cmd_drift_merge(&sketches, &merged, "json");
1168        assert!(result.is_ok());
1169    }
1170}