1use std::collections::HashMap;
2
3use anyhow::{anyhow, bail, Result};
4use chrono::{DateTime, Duration, Utc};
5use rust_decimal::{prelude::ToPrimitive, Decimal};
6use tesser_core::{Candle, Interval, Symbol};
7
8#[derive(Clone, Copy, Debug)]
10pub struct ValidationConfig {
11 pub price_jump_threshold: f64,
12 pub reference_tolerance: f64,
13 pub repair_missing: bool,
14}
15
16#[derive(Clone, Debug)]
18pub struct ValidationSummary {
19 pub symbol: Symbol,
20 pub interval: Interval,
21 pub rows: usize,
22 pub start: DateTime<Utc>,
23 pub end: DateTime<Utc>,
24 pub missing_candles: usize,
25 pub duplicate_candles: usize,
26 pub zero_volume_candles: usize,
27 pub price_spike_count: usize,
28 pub cross_mismatch_count: usize,
29 pub repaired_candles: usize,
30}
31
32#[derive(Clone, Debug)]
34pub struct GapRecord {
35 pub start: DateTime<Utc>,
36 pub end: DateTime<Utc>,
37 pub missing: usize,
38}
39
40#[derive(Clone, Debug)]
42pub struct SpikeRecord {
43 pub timestamp: DateTime<Utc>,
44 pub change_fraction: f64,
45}
46
47#[derive(Clone, Debug)]
49pub struct CrossMismatch {
50 pub timestamp: DateTime<Utc>,
51 pub primary_close: f64,
52 pub reference_close: f64,
53 pub delta_fraction: f64,
54}
55
56#[derive(Clone, Debug)]
58pub struct ValidationOutcome {
59 pub summary: ValidationSummary,
60 pub gaps: Vec<GapRecord>,
61 pub price_spikes: Vec<SpikeRecord>,
62 pub cross_mismatches: Vec<CrossMismatch>,
63 pub repaired: Vec<Candle>,
64}
65
66pub fn validate_dataset(
68 mut candles: Vec<Candle>,
69 reference: Option<Vec<Candle>>,
70 config: ValidationConfig,
71) -> Result<ValidationOutcome> {
72 if candles.is_empty() {
73 bail!("no candles supplied for validation");
74 }
75 candles.sort_by_key(|c| c.timestamp);
76
77 let symbol = ensure_single_symbol(&candles)?;
78 let interval = candles.first().expect("checked len").interval;
79 let interval_duration = interval.as_duration();
80 let expected_ms = interval_duration.num_milliseconds().max(1); let mut gaps = Vec::new();
83 let mut price_spikes = Vec::new();
84 let mut cross_mismatches = Vec::new();
85 let mut duplicate_candles = 0usize;
86 let mut missing_candles = 0usize;
87 let mut zero_volume_candles = 0usize;
88
89 let mut prev_close: Option<f64> = None;
90 let mut prev_timestamp: Option<DateTime<Utc>> = None;
91 for candle in &candles {
92 let volume = candle.volume.to_f64().unwrap_or(0.0);
93 if volume <= 0.0 {
94 zero_volume_candles += 1;
95 }
96 let close = candle.close.to_f64().unwrap_or(0.0);
97 if let (Some(last_close), Some(last_ts)) = (prev_close, prev_timestamp) {
98 let delta: Duration = candle.timestamp - last_ts;
99 let delta_ms = delta.num_milliseconds();
100 if delta_ms < expected_ms {
101 duplicate_candles += 1;
102 } else if delta_ms > expected_ms {
103 let missing = ((delta_ms / expected_ms) as usize).saturating_sub(1);
104 if missing > 0 {
105 missing_candles += missing;
106 gaps.push(GapRecord {
107 start: last_ts,
108 end: candle.timestamp,
109 missing,
110 });
111 }
112 }
113
114 let denom = last_close.abs().max(f64::EPSILON);
115 let price_change = (close - last_close).abs() / denom;
116 if price_change >= config.price_jump_threshold {
117 price_spikes.push(SpikeRecord {
118 timestamp: candle.timestamp,
119 change_fraction: price_change,
120 });
121 }
122 }
123
124 prev_close = Some(close);
125 prev_timestamp = Some(candle.timestamp);
126 }
127
128 if let Some(reference_data) = reference.as_ref() {
129 let mut map = HashMap::with_capacity(reference_data.len());
130 for candle in reference_data {
131 map.insert(
132 candle.timestamp.timestamp_millis(),
133 candle.close.to_f64().unwrap_or(0.0),
134 );
135 }
136 for candle in &candles {
137 if let Some(reference_close) = map.get(&candle.timestamp.timestamp_millis()) {
138 let close = candle.close.to_f64().unwrap_or(0.0);
139 let denom = reference_close.abs().max(f64::EPSILON);
140 let diff = (close - reference_close).abs() / denom;
141 if diff >= config.reference_tolerance {
142 cross_mismatches.push(CrossMismatch {
143 timestamp: candle.timestamp,
144 primary_close: close,
145 reference_close: *reference_close,
146 delta_fraction: diff,
147 });
148 }
149 }
150 }
151 }
152
153 let mut repaired = candles.clone();
154 let mut repaired_candles = 0usize;
155 if config.repair_missing {
156 let mut idx = 0usize;
157 while idx + 1 < repaired.len() {
158 let current_ts = repaired[idx].timestamp;
159 let next_ts = repaired[idx + 1].timestamp;
160 let delta: Duration = next_ts - current_ts;
161 let delta_ms = delta.num_milliseconds();
162 if delta_ms > expected_ms {
163 let missing = ((delta_ms / expected_ms) as usize).saturating_sub(1);
164 if missing > 0 {
165 for step in 1..=missing {
166 let ts = current_ts + Duration::milliseconds(expected_ms * step as i64);
167 let fill_price = repaired[idx].close;
168 let fill = Candle {
169 symbol: repaired[idx].symbol,
170 interval,
171 open: fill_price,
172 high: fill_price,
173 low: fill_price,
174 close: fill_price,
175 volume: Decimal::ZERO,
176 timestamp: ts,
177 };
178 repaired.insert(idx + step, fill);
179 repaired_candles += 1;
180 }
181 idx += missing;
182 }
183 }
184 idx += 1;
185 }
186 }
187
188 let summary = ValidationSummary {
189 symbol,
190 interval,
191 rows: candles.len(),
192 start: candles.first().expect("len checked").timestamp,
193 end: candles.last().expect("len checked").timestamp,
194 missing_candles,
195 duplicate_candles,
196 zero_volume_candles,
197 price_spike_count: price_spikes.len(),
198 cross_mismatch_count: cross_mismatches.len(),
199 repaired_candles,
200 };
201
202 Ok(ValidationOutcome {
203 summary,
204 gaps,
205 price_spikes,
206 cross_mismatches,
207 repaired,
208 })
209}
210
211fn ensure_single_symbol(candles: &[Candle]) -> Result<Symbol> {
212 let mut iter = candles.iter();
213 let first = iter
214 .next()
215 .ok_or_else(|| anyhow!("no candles to inspect for symbol"))?;
216 let symbol = first.symbol;
217 if iter.any(|c| c.symbol != symbol) {
218 bail!("validation currently supports a single symbol per run");
219 }
220 Ok(symbol)
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use rust_decimal::prelude::FromPrimitive;
227 use rust_decimal::Decimal;
228
229 fn candle_at(minute: i64, close: f64, volume: f64) -> Candle {
230 let close = Decimal::from_f64(close).expect("close convertible");
231 let volume = Decimal::from_f64(volume).expect("volume convertible");
232 Candle {
233 symbol: Symbol::from("BTCUSDT"),
234 interval: Interval::OneMinute,
235 open: close,
236 high: close,
237 low: close,
238 close,
239 volume,
240 timestamp: Utc::now() + Duration::minutes(minute),
241 }
242 }
243
244 #[test]
245 fn detects_gaps_and_repairs() {
246 let candles = vec![candle_at(0, 100.0, 10.0), candle_at(2, 101.0, 11.0)];
247 let cfg = ValidationConfig {
248 price_jump_threshold: 0.05,
249 reference_tolerance: 0.01,
250 repair_missing: true,
251 };
252 let outcome = validate_dataset(candles, None, cfg).expect("ok");
253 assert_eq!(outcome.summary.missing_candles, 1);
254 assert_eq!(outcome.summary.repaired_candles, 1);
255 assert_eq!(outcome.repaired.len(), 3);
256 }
257
258 #[test]
259 fn detects_zero_volume_and_spikes() {
260 let candles = vec![
261 candle_at(0, 100.0, 10.0),
262 candle_at(1, 150.0, 0.0),
263 candle_at(2, 160.0, 12.0),
264 ];
265 let cfg = ValidationConfig {
266 price_jump_threshold: 0.2,
267 reference_tolerance: 0.01,
268 repair_missing: false,
269 };
270 let outcome = validate_dataset(candles, None, cfg).expect("ok");
271 assert_eq!(outcome.summary.zero_volume_candles, 1);
272 assert_eq!(outcome.summary.price_spike_count, 1);
273 }
274
275 #[test]
276 fn detects_cross_source_mismatches() {
277 let primary = vec![candle_at(0, 100.0, 1.0), candle_at(1, 102.0, 1.0)];
278 let reference = vec![candle_at(0, 100.0, 1.0), candle_at(1, 100.0, 1.0)];
279 let cfg = ValidationConfig {
280 price_jump_threshold: 0.2,
281 reference_tolerance: 0.01,
282 repair_missing: false,
283 };
284 let outcome = validate_dataset(primary, Some(reference), cfg).expect("ok");
285 assert_eq!(outcome.summary.cross_mismatch_count, 1);
286 }
287}