tesser_cli/
data_validation.rs

1use std::collections::HashMap;
2
3use anyhow::{anyhow, bail, Result};
4use chrono::{DateTime, Duration, Utc};
5use rust_decimal::{prelude::ToPrimitive, Decimal};
6use tesser_core::{Candle, Interval, Symbol};
7
8/// Configuration flags used when validating a dataset.
9#[derive(Clone, Copy, Debug)]
10pub struct ValidationConfig {
11    pub price_jump_threshold: f64,
12    pub reference_tolerance: f64,
13    pub repair_missing: bool,
14}
15
16/// Aggregated statistics describing the validation results.
17#[derive(Clone, Debug)]
18pub struct ValidationSummary {
19    pub symbol: Symbol,
20    pub interval: Interval,
21    pub rows: usize,
22    pub start: DateTime<Utc>,
23    pub end: DateTime<Utc>,
24    pub missing_candles: usize,
25    pub duplicate_candles: usize,
26    pub zero_volume_candles: usize,
27    pub price_spike_count: usize,
28    pub cross_mismatch_count: usize,
29    pub repaired_candles: usize,
30}
31
32/// Describes a gap in the time-series.
33#[derive(Clone, Debug)]
34pub struct GapRecord {
35    pub start: DateTime<Utc>,
36    pub end: DateTime<Utc>,
37    pub missing: usize,
38}
39
40/// Describes a suspicious price jump.
41#[derive(Clone, Debug)]
42pub struct SpikeRecord {
43    pub timestamp: DateTime<Utc>,
44    pub change_fraction: f64,
45}
46
47/// Describes a mismatch between the primary data source and a reference feed.
48#[derive(Clone, Debug)]
49pub struct CrossMismatch {
50    pub timestamp: DateTime<Utc>,
51    pub primary_close: f64,
52    pub reference_close: f64,
53    pub delta_fraction: f64,
54}
55
56/// Result of a validation run.
57#[derive(Clone, Debug)]
58pub struct ValidationOutcome {
59    pub summary: ValidationSummary,
60    pub gaps: Vec<GapRecord>,
61    pub price_spikes: Vec<SpikeRecord>,
62    pub cross_mismatches: Vec<CrossMismatch>,
63    pub repaired: Vec<Candle>,
64}
65
66/// Validate a group of candles, optionally repairing missing entries.
67pub fn validate_dataset(
68    mut candles: Vec<Candle>,
69    reference: Option<Vec<Candle>>,
70    config: ValidationConfig,
71) -> Result<ValidationOutcome> {
72    if candles.is_empty() {
73        bail!("no candles supplied for validation");
74    }
75    candles.sort_by_key(|c| c.timestamp);
76
77    let symbol = ensure_single_symbol(&candles)?;
78    let interval = candles.first().expect("checked len").interval;
79    let interval_duration = interval.as_duration();
80    let expected_ms = interval_duration.num_milliseconds().max(1); // guard against zero-duration intervals
81
82    let mut gaps = Vec::new();
83    let mut price_spikes = Vec::new();
84    let mut cross_mismatches = Vec::new();
85    let mut duplicate_candles = 0usize;
86    let mut missing_candles = 0usize;
87    let mut zero_volume_candles = 0usize;
88
89    let mut prev_close: Option<f64> = None;
90    let mut prev_timestamp: Option<DateTime<Utc>> = None;
91    for candle in &candles {
92        let volume = candle.volume.to_f64().unwrap_or(0.0);
93        if volume <= 0.0 {
94            zero_volume_candles += 1;
95        }
96        let close = candle.close.to_f64().unwrap_or(0.0);
97        if let (Some(last_close), Some(last_ts)) = (prev_close, prev_timestamp) {
98            let delta: Duration = candle.timestamp - last_ts;
99            let delta_ms = delta.num_milliseconds();
100            if delta_ms < expected_ms {
101                duplicate_candles += 1;
102            } else if delta_ms > expected_ms {
103                let missing = ((delta_ms / expected_ms) as usize).saturating_sub(1);
104                if missing > 0 {
105                    missing_candles += missing;
106                    gaps.push(GapRecord {
107                        start: last_ts,
108                        end: candle.timestamp,
109                        missing,
110                    });
111                }
112            }
113
114            let denom = last_close.abs().max(f64::EPSILON);
115            let price_change = (close - last_close).abs() / denom;
116            if price_change >= config.price_jump_threshold {
117                price_spikes.push(SpikeRecord {
118                    timestamp: candle.timestamp,
119                    change_fraction: price_change,
120                });
121            }
122        }
123
124        prev_close = Some(close);
125        prev_timestamp = Some(candle.timestamp);
126    }
127
128    if let Some(reference_data) = reference.as_ref() {
129        let mut map = HashMap::with_capacity(reference_data.len());
130        for candle in reference_data {
131            map.insert(
132                candle.timestamp.timestamp_millis(),
133                candle.close.to_f64().unwrap_or(0.0),
134            );
135        }
136        for candle in &candles {
137            if let Some(reference_close) = map.get(&candle.timestamp.timestamp_millis()) {
138                let close = candle.close.to_f64().unwrap_or(0.0);
139                let denom = reference_close.abs().max(f64::EPSILON);
140                let diff = (close - reference_close).abs() / denom;
141                if diff >= config.reference_tolerance {
142                    cross_mismatches.push(CrossMismatch {
143                        timestamp: candle.timestamp,
144                        primary_close: close,
145                        reference_close: *reference_close,
146                        delta_fraction: diff,
147                    });
148                }
149            }
150        }
151    }
152
153    let mut repaired = candles.clone();
154    let mut repaired_candles = 0usize;
155    if config.repair_missing {
156        let mut idx = 0usize;
157        while idx + 1 < repaired.len() {
158            let current_ts = repaired[idx].timestamp;
159            let next_ts = repaired[idx + 1].timestamp;
160            let delta: Duration = next_ts - current_ts;
161            let delta_ms = delta.num_milliseconds();
162            if delta_ms > expected_ms {
163                let missing = ((delta_ms / expected_ms) as usize).saturating_sub(1);
164                if missing > 0 {
165                    for step in 1..=missing {
166                        let ts = current_ts + Duration::milliseconds(expected_ms * step as i64);
167                        let fill_price = repaired[idx].close;
168                        let fill = Candle {
169                            symbol: repaired[idx].symbol,
170                            interval,
171                            open: fill_price,
172                            high: fill_price,
173                            low: fill_price,
174                            close: fill_price,
175                            volume: Decimal::ZERO,
176                            timestamp: ts,
177                        };
178                        repaired.insert(idx + step, fill);
179                        repaired_candles += 1;
180                    }
181                    idx += missing;
182                }
183            }
184            idx += 1;
185        }
186    }
187
188    let summary = ValidationSummary {
189        symbol,
190        interval,
191        rows: candles.len(),
192        start: candles.first().expect("len checked").timestamp,
193        end: candles.last().expect("len checked").timestamp,
194        missing_candles,
195        duplicate_candles,
196        zero_volume_candles,
197        price_spike_count: price_spikes.len(),
198        cross_mismatch_count: cross_mismatches.len(),
199        repaired_candles,
200    };
201
202    Ok(ValidationOutcome {
203        summary,
204        gaps,
205        price_spikes,
206        cross_mismatches,
207        repaired,
208    })
209}
210
211fn ensure_single_symbol(candles: &[Candle]) -> Result<Symbol> {
212    let mut iter = candles.iter();
213    let first = iter
214        .next()
215        .ok_or_else(|| anyhow!("no candles to inspect for symbol"))?;
216    let symbol = first.symbol;
217    if iter.any(|c| c.symbol != symbol) {
218        bail!("validation currently supports a single symbol per run");
219    }
220    Ok(symbol)
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use rust_decimal::prelude::FromPrimitive;
227    use rust_decimal::Decimal;
228
229    fn candle_at(minute: i64, close: f64, volume: f64) -> Candle {
230        let close = Decimal::from_f64(close).expect("close convertible");
231        let volume = Decimal::from_f64(volume).expect("volume convertible");
232        Candle {
233            symbol: Symbol::from("BTCUSDT"),
234            interval: Interval::OneMinute,
235            open: close,
236            high: close,
237            low: close,
238            close,
239            volume,
240            timestamp: Utc::now() + Duration::minutes(minute),
241        }
242    }
243
244    #[test]
245    fn detects_gaps_and_repairs() {
246        let candles = vec![candle_at(0, 100.0, 10.0), candle_at(2, 101.0, 11.0)];
247        let cfg = ValidationConfig {
248            price_jump_threshold: 0.05,
249            reference_tolerance: 0.01,
250            repair_missing: true,
251        };
252        let outcome = validate_dataset(candles, None, cfg).expect("ok");
253        assert_eq!(outcome.summary.missing_candles, 1);
254        assert_eq!(outcome.summary.repaired_candles, 1);
255        assert_eq!(outcome.repaired.len(), 3);
256    }
257
258    #[test]
259    fn detects_zero_volume_and_spikes() {
260        let candles = vec![
261            candle_at(0, 100.0, 10.0),
262            candle_at(1, 150.0, 0.0),
263            candle_at(2, 160.0, 12.0),
264        ];
265        let cfg = ValidationConfig {
266            price_jump_threshold: 0.2,
267            reference_tolerance: 0.01,
268            repair_missing: false,
269        };
270        let outcome = validate_dataset(candles, None, cfg).expect("ok");
271        assert_eq!(outcome.summary.zero_volume_candles, 1);
272        assert_eq!(outcome.summary.price_spike_count, 1);
273    }
274
275    #[test]
276    fn detects_cross_source_mismatches() {
277        let primary = vec![candle_at(0, 100.0, 1.0), candle_at(1, 102.0, 1.0)];
278        let reference = vec![candle_at(0, 100.0, 1.0), candle_at(1, 100.0, 1.0)];
279        let cfg = ValidationConfig {
280            price_jump_threshold: 0.2,
281            reference_tolerance: 0.01,
282            repair_missing: false,
283        };
284        let outcome = validate_dataset(primary, Some(reference), cfg).expect("ok");
285        assert_eq!(outcome.summary.cross_mismatch_count, 1);
286    }
287}