rangebar_batch/
engine.rs

1//! Batch analysis engine powered by Polars
2//!
3//! High-performance batch processing for historical analysis,
4//! backtesting, and research with exception-only failure handling.
5
6use polars::frame::row::Row;
7use polars::prelude::*;
8use rangebar_core::RangeBar;
9use rangebar_io::formats::{ConversionError, DataFrameConverter};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use thiserror::Error;
13
14/// Configuration for batch analysis operations
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct BatchConfig {
17    /// Maximum memory usage in bytes (for streaming operations)
18    pub max_memory_bytes: usize,
19
20    /// Chunk size for processing large datasets
21    pub chunk_size: usize,
22
23    /// Number of parallel threads for computation
24    pub parallel_threads: Option<usize>,
25
26    /// Enable lazy evaluation for better memory efficiency
27    pub use_lazy_evaluation: bool,
28
29    /// Statistical computations configuration
30    pub statistics_config: StatisticsConfig,
31}
32
33/// Statistical computations configuration
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct StatisticsConfig {
36    /// Compute rolling statistics
37    pub enable_rolling_stats: bool,
38
39    /// Window size for rolling statistics
40    pub rolling_window_size: usize,
41
42    /// Compute quantiles and percentiles
43    pub enable_quantiles: bool,
44
45    /// Quantile levels to compute (e.g., [0.1, 0.25, 0.5, 0.75, 0.9])
46    pub quantile_levels: Vec<f64>,
47
48    /// Enable correlation analysis
49    pub enable_correlations: bool,
50
51    /// Enable time-series resampling
52    pub enable_resampling: bool,
53}
54
55impl Default for BatchConfig {
56    fn default() -> Self {
57        Self {
58            max_memory_bytes: 1_000_000_000, // 1GB
59            chunk_size: 100_000,
60            parallel_threads: None, // Use all available cores
61            use_lazy_evaluation: true,
62            statistics_config: StatisticsConfig {
63                enable_rolling_stats: true,
64                rolling_window_size: 20,
65                enable_quantiles: true,
66                quantile_levels: vec![0.1, 0.25, 0.5, 0.75, 0.9],
67                enable_correlations: true,
68                enable_resampling: true,
69            },
70        }
71    }
72}
73
74/// Main batch analysis engine
75#[derive(Debug)]
76pub struct BatchAnalysisEngine {
77    config: BatchConfig,
78}
79
80impl BatchAnalysisEngine {
81    /// Create new batch analysis engine with default configuration
82    pub fn new() -> Self {
83        Self {
84            config: BatchConfig::default(),
85        }
86    }
87
88    /// Create new batch analysis engine with custom configuration
89    pub fn with_config(config: BatchConfig) -> Self {
90        Self { config }
91    }
92
93    /// Analyze single symbol range bar data
94    pub fn analyze_single_symbol(
95        &self,
96        range_bars: &[RangeBar],
97        symbol: &str,
98    ) -> Result<BatchResult, BatchError> {
99        if range_bars.is_empty() {
100            return Err(BatchError::EmptyData {
101                symbol: symbol.to_string(),
102            });
103        }
104
105        // Convert to DataFrame for analysis
106        let df = range_bars.to_vec().to_polars_dataframe().map_err(|e| {
107            BatchError::ConversionFailed {
108                symbol: symbol.to_string(),
109                source: ConversionError::PolarsError(e),
110            }
111        })?;
112
113        let mut analysis_report = AnalysisReport::new(symbol.to_string());
114
115        // Basic statistics
116        analysis_report.basic_stats = self.compute_basic_statistics(&df)?;
117
118        // Rolling statistics (if enabled)
119        if self.config.statistics_config.enable_rolling_stats {
120            analysis_report.rolling_stats = Some(self.compute_rolling_statistics(&df)?);
121        }
122
123        // Quantile analysis (if enabled)
124        if self.config.statistics_config.enable_quantiles {
125            analysis_report.quantiles = Some(self.compute_quantiles(&df)?);
126        }
127
128        // Price movement analysis
129        analysis_report.price_analysis = self.compute_price_analysis(&df)?;
130
131        // Volume analysis
132        analysis_report.volume_analysis = self.compute_volume_analysis(&df)?;
133
134        // Microstructure analysis
135        analysis_report.microstructure = self.compute_microstructure_analysis(&df)?;
136
137        Ok(BatchResult {
138            symbol: symbol.to_string(),
139            records_processed: range_bars.len(),
140            analysis: analysis_report,
141        })
142    }
143
144    /// Analyze multiple symbols for cross-symbol analysis
145    pub fn analyze_multiple_symbols(
146        &self,
147        symbol_data: HashMap<String, Vec<RangeBar>>,
148    ) -> Result<Vec<BatchResult>, BatchError> {
149        if symbol_data.is_empty() {
150            return Err(BatchError::NoSymbolData);
151        }
152
153        let mut results = Vec::with_capacity(symbol_data.len());
154
155        // Process each symbol
156        for (symbol, range_bars) in symbol_data {
157            let result = self.analyze_single_symbol(&range_bars, &symbol)?;
158            results.push(result);
159        }
160
161        Ok(results)
162    }
163
164    /// Compute basic statistics for range bar data
165    fn compute_basic_statistics(&self, df: &DataFrame) -> Result<BasicStatistics, BatchError> {
166        let lazy_df = df.clone().lazy();
167
168        // Compute basic aggregations using Polars expressions
169        let stats_df = lazy_df
170            .select([
171                // Price statistics
172                col("close").mean().alias("close_mean"),
173                col("close").std(1).alias("close_std"),
174                col("close").min().alias("close_min"),
175                col("close").max().alias("close_max"),
176                col("close").median().alias("close_median"),
177                // Volume statistics
178                col("volume").mean().alias("volume_mean"),
179                col("volume").std(1).alias("volume_std"),
180                col("volume").sum().alias("volume_total"),
181                // Count statistics
182                len().alias("total_bars"),
183                col("individual_trade_count").sum().alias("total_trades"),
184                // Time span
185                col("open_time").min().alias("first_time"),
186                col("close_time").max().alias("last_time"),
187            ])
188            .collect()
189            .map_err(|e| BatchError::ComputationFailed {
190                operation: "basic_statistics".to_string(),
191                source: e.into(),
192            })?;
193
194        // Extract values from the result
195        let row = stats_df
196            .get_row(0)
197            .map_err(|e| BatchError::ComputationFailed {
198                operation: "extract_basic_stats".to_string(),
199                source: e.into(),
200            })?;
201
202        Ok(BasicStatistics {
203            close_mean: extract_f64_value(&row, 0)?,
204            close_std: extract_f64_value(&row, 1)?,
205            close_min: extract_f64_value(&row, 2)?,
206            close_max: extract_f64_value(&row, 3)?,
207            close_median: extract_f64_value(&row, 4)?,
208            volume_mean: extract_f64_value(&row, 5)?,
209            volume_std: extract_f64_value(&row, 6)?,
210            volume_total: extract_f64_value(&row, 7)?,
211            total_bars: extract_i64_value(&row, 8)? as usize,
212            total_trades: extract_i64_value(&row, 9)? as usize,
213            first_time: extract_i64_value(&row, 10)?,
214            last_time: extract_i64_value(&row, 11)?,
215        })
216    }
217
218    /// Compute rolling statistics
219    fn compute_rolling_statistics(&self, df: &DataFrame) -> Result<RollingStatistics, BatchError> {
220        let window = self.config.statistics_config.rolling_window_size;
221
222        let rolling_df = df
223            .clone()
224            .lazy()
225            .with_columns([
226                // Rolling price statistics
227                col("close")
228                    .rolling_mean(RollingOptionsFixedWindow {
229                        window_size: window,
230                        min_periods: 1,
231                        ..Default::default()
232                    })
233                    .alias("close_sma"),
234                col("close")
235                    .rolling_std(RollingOptionsFixedWindow {
236                        window_size: window,
237                        min_periods: 1,
238                        ..Default::default()
239                    })
240                    .alias("close_rolling_std"),
241                // Rolling volume statistics
242                col("volume")
243                    .rolling_mean(RollingOptionsFixedWindow {
244                        window_size: window,
245                        min_periods: 1,
246                        ..Default::default()
247                    })
248                    .alias("volume_sma"),
249                col("volume")
250                    .rolling_std(RollingOptionsFixedWindow {
251                        window_size: window,
252                        min_periods: 1,
253                        ..Default::default()
254                    })
255                    .alias("volume_rolling_std"),
256                // Price returns
257                (col("close") / col("close").shift(lit(1)) - lit(1.0)).alias("returns"),
258            ])
259            .select([
260                col("close_sma").mean().alias("avg_sma"),
261                col("close_rolling_std").mean().alias("avg_volatility"),
262                col("volume_sma").mean().alias("avg_volume_sma"),
263                col("returns").std(1).alias("returns_volatility"),
264            ])
265            .collect()
266            .map_err(|e| BatchError::ComputationFailed {
267                operation: "rolling_statistics".to_string(),
268                source: e.into(),
269            })?;
270
271        let row = rolling_df
272            .get_row(0)
273            .map_err(|e| BatchError::ComputationFailed {
274                operation: "extract_rolling_stats".to_string(),
275                source: e.into(),
276            })?;
277
278        Ok(RollingStatistics {
279            window_size: window,
280            avg_sma: extract_f64_value(&row, 0)?,
281            avg_volatility: extract_f64_value(&row, 1)?,
282            avg_volume_sma: extract_f64_value(&row, 2)?,
283            returns_volatility: extract_f64_value(&row, 3)?,
284        })
285    }
286
287    /// Compute quantiles and percentiles
288    fn compute_quantiles(&self, df: &DataFrame) -> Result<QuantileAnalysis, BatchError> {
289        let quantile_levels = &self.config.statistics_config.quantile_levels;
290        let mut price_quantiles = Vec::new();
291        let mut volume_quantiles = Vec::new();
292
293        for &level in quantile_levels {
294            // Price quantiles
295            let price_q = df
296                .clone()
297                .lazy()
298                .select([col("close").quantile(lit(level), QuantileMethod::Linear)])
299                .collect()
300                .map_err(|e| BatchError::ComputationFailed {
301                    operation: format!("price_quantile_{}", level),
302                    source: e.into(),
303                })?;
304
305            let price_value = price_q
306                .get_row(0)
307                .and_then(|row| {
308                    extract_f64_value(&row, 0)
309                        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))
310                })
311                .map_err(|e| BatchError::ComputationFailed {
312                    operation: format!("extract_price_quantile_{}", level),
313                    source: e.into(),
314                })?;
315
316            price_quantiles.push((level, price_value));
317
318            // Volume quantiles
319            let volume_q = df
320                .clone()
321                .lazy()
322                .select([col("volume").quantile(lit(level), QuantileMethod::Linear)])
323                .collect()
324                .map_err(|e| BatchError::ComputationFailed {
325                    operation: format!("volume_quantile_{}", level),
326                    source: e.into(),
327                })?;
328
329            let volume_value = volume_q
330                .get_row(0)
331                .and_then(|row| {
332                    extract_f64_value(&row, 0)
333                        .map_err(|e| PolarsError::ComputeError(e.to_string().into()))
334                })
335                .map_err(|e| BatchError::ComputationFailed {
336                    operation: format!("extract_volume_quantile_{}", level),
337                    source: e.into(),
338                })?;
339
340            volume_quantiles.push((level, volume_value));
341        }
342
343        Ok(QuantileAnalysis {
344            levels: quantile_levels.clone(),
345            price_quantiles,
346            volume_quantiles,
347        })
348    }
349
350    /// Compute price movement analysis
351    fn compute_price_analysis(&self, df: &DataFrame) -> Result<PriceAnalysis, BatchError> {
352        let price_df = df
353            .clone()
354            .lazy()
355            .with_columns([
356                // Price ranges
357                (col("high") - col("low")).alias("price_range"),
358                (col("close") - col("open")).alias("price_change"),
359                ((col("close") - col("open")) / col("open") * lit(100.0)).alias("price_change_pct"),
360                // OHLC analysis
361                col("close").gt(col("open")).alias("is_bullish"),
362                col("high").eq(col("close")).alias("is_high_close"),
363                col("low").eq(col("close")).alias("is_low_close"),
364            ])
365            .select([
366                col("price_range").mean().alias("avg_range"),
367                col("price_range").std(1).alias("range_volatility"),
368                col("price_change").mean().alias("avg_change"),
369                col("price_change_pct").std(1).alias("price_volatility"),
370                col("is_bullish").mean().alias("bullish_ratio"),
371                col("is_high_close").mean().alias("high_close_ratio"),
372                col("is_low_close").mean().alias("low_close_ratio"),
373            ])
374            .collect()
375            .map_err(|e| BatchError::ComputationFailed {
376                operation: "price_analysis".to_string(),
377                source: e.into(),
378            })?;
379
380        let row = price_df
381            .get_row(0)
382            .map_err(|e| BatchError::ComputationFailed {
383                operation: "extract_price_analysis".to_string(),
384                source: e.into(),
385            })?;
386
387        Ok(PriceAnalysis {
388            avg_range: extract_f64_value(&row, 0)?,
389            range_volatility: extract_f64_value(&row, 1)?,
390            avg_change: extract_f64_value(&row, 2)?,
391            price_volatility: extract_f64_value(&row, 3)?,
392            bullish_ratio: extract_f64_value(&row, 4)?,
393            high_close_ratio: extract_f64_value(&row, 5)?,
394            low_close_ratio: extract_f64_value(&row, 6)?,
395        })
396    }
397
398    /// Compute volume analysis
399    fn compute_volume_analysis(&self, df: &DataFrame) -> Result<VolumeAnalysis, BatchError> {
400        let volume_df = df
401            .clone()
402            .lazy()
403            .with_columns([
404                // Buy/sell analysis
405                (col("buy_volume") / col("volume")).alias("buy_ratio"),
406                (col("sell_volume") / col("volume")).alias("sell_ratio"),
407                col("vwap").alias("volume_weighted_price"),
408            ])
409            .select([
410                col("buy_ratio").mean().alias("avg_buy_ratio"),
411                col("sell_ratio").mean().alias("avg_sell_ratio"),
412                col("buy_ratio").std(1).alias("buy_ratio_volatility"),
413                col("volume_weighted_price").mean().alias("avg_vwap"),
414                col("volume").sum().alias("total_volume"),
415                col("buy_volume").sum().alias("total_buy_volume"),
416                col("sell_volume").sum().alias("total_sell_volume"),
417            ])
418            .collect()
419            .map_err(|e| BatchError::ComputationFailed {
420                operation: "volume_analysis".to_string(),
421                source: e.into(),
422            })?;
423
424        let row = volume_df
425            .get_row(0)
426            .map_err(|e| BatchError::ComputationFailed {
427                operation: "extract_volume_analysis".to_string(),
428                source: e.into(),
429            })?;
430
431        Ok(VolumeAnalysis {
432            avg_buy_ratio: extract_f64_value(&row, 0)?,
433            avg_sell_ratio: extract_f64_value(&row, 1)?,
434            buy_ratio_volatility: extract_f64_value(&row, 2)?,
435            avg_vwap: extract_f64_value(&row, 3)?,
436            total_volume: extract_f64_value(&row, 4)?,
437            total_buy_volume: extract_f64_value(&row, 5)?,
438            total_sell_volume: extract_f64_value(&row, 6)?,
439        })
440    }
441
442    /// Compute microstructure analysis
443    fn compute_microstructure_analysis(
444        &self,
445        df: &DataFrame,
446    ) -> Result<MicrostructureAnalysis, BatchError> {
447        let micro_df = df
448            .clone()
449            .lazy()
450            .with_columns([
451                // AggTrade intensity
452                (col("individual_trade_count")
453                    / ((col("close_time") - col("open_time")) / lit(1000.0)))
454                .alias("aggtrades_per_second"),
455                // Order flow imbalance
456                ((col("buy_volume") - col("sell_volume")) / col("volume"))
457                    .alias("order_flow_imbalance"),
458                // VWAP deviation
459                ((col("close") - col("vwap")) / col("vwap") * lit(100.0)).alias("vwap_deviation"),
460            ])
461            .select([
462                col("aggtrades_per_second")
463                    .mean()
464                    .alias("avg_aggtrade_intensity"),
465                col("order_flow_imbalance")
466                    .mean()
467                    .alias("avg_order_flow_imbalance"),
468                col("order_flow_imbalance")
469                    .std(1)
470                    .alias("order_flow_volatility"),
471                col("vwap_deviation").mean().alias("avg_vwap_deviation"),
472                col("vwap_deviation")
473                    .std(1)
474                    .alias("vwap_deviation_volatility"),
475            ])
476            .collect()
477            .map_err(|e| BatchError::ComputationFailed {
478                operation: "microstructure_analysis".to_string(),
479                source: e.into(),
480            })?;
481
482        let row = micro_df
483            .get_row(0)
484            .map_err(|e| BatchError::ComputationFailed {
485                operation: "extract_microstructure_analysis".to_string(),
486                source: e.into(),
487            })?;
488
489        Ok(MicrostructureAnalysis {
490            avg_trade_intensity: extract_f64_value(&row, 0)?,
491            avg_order_flow_imbalance: extract_f64_value(&row, 1)?,
492            order_flow_volatility: extract_f64_value(&row, 2)?,
493            avg_vwap_deviation: extract_f64_value(&row, 3)?,
494            vwap_deviation_volatility: extract_f64_value(&row, 4)?,
495        })
496    }
497}
498
499impl Default for BatchAnalysisEngine {
500    fn default() -> Self {
501        Self::new()
502    }
503}
504
505/// Batch analysis result
506#[derive(Debug, Clone, Serialize, Deserialize)]
507pub struct BatchResult {
508    pub symbol: String,
509    pub records_processed: usize,
510    pub analysis: AnalysisReport,
511}
512
513/// Comprehensive analysis report
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct AnalysisReport {
516    pub symbol: String,
517    pub basic_stats: BasicStatistics,
518    pub rolling_stats: Option<RollingStatistics>,
519    pub quantiles: Option<QuantileAnalysis>,
520    pub price_analysis: PriceAnalysis,
521    pub volume_analysis: VolumeAnalysis,
522    pub microstructure: MicrostructureAnalysis,
523}
524
525impl AnalysisReport {
526    fn new(symbol: String) -> Self {
527        Self {
528            symbol,
529            basic_stats: BasicStatistics::default(),
530            rolling_stats: None,
531            quantiles: None,
532            price_analysis: PriceAnalysis::default(),
533            volume_analysis: VolumeAnalysis::default(),
534            microstructure: MicrostructureAnalysis::default(),
535        }
536    }
537}
538
539/// Basic statistical measures
540#[derive(Debug, Clone, Serialize, Deserialize, Default)]
541pub struct BasicStatistics {
542    pub close_mean: f64,
543    pub close_std: f64,
544    pub close_min: f64,
545    pub close_max: f64,
546    pub close_median: f64,
547    pub volume_mean: f64,
548    pub volume_std: f64,
549    pub volume_total: f64,
550    pub total_bars: usize,
551    pub total_trades: usize,
552    pub first_time: i64,
553    pub last_time: i64,
554}
555
556/// Rolling window statistics
557#[derive(Debug, Clone, Serialize, Deserialize)]
558pub struct RollingStatistics {
559    pub window_size: usize,
560    pub avg_sma: f64,
561    pub avg_volatility: f64,
562    pub avg_volume_sma: f64,
563    pub returns_volatility: f64,
564}
565
566/// Quantile analysis results
567#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct QuantileAnalysis {
569    pub levels: Vec<f64>,
570    pub price_quantiles: Vec<(f64, f64)>,
571    pub volume_quantiles: Vec<(f64, f64)>,
572}
573
574/// Price movement analysis
575#[derive(Debug, Clone, Serialize, Deserialize, Default)]
576pub struct PriceAnalysis {
577    pub avg_range: f64,
578    pub range_volatility: f64,
579    pub avg_change: f64,
580    pub price_volatility: f64,
581    pub bullish_ratio: f64,
582    pub high_close_ratio: f64,
583    pub low_close_ratio: f64,
584}
585
586/// Volume analysis results
587#[derive(Debug, Clone, Serialize, Deserialize, Default)]
588pub struct VolumeAnalysis {
589    pub avg_buy_ratio: f64,
590    pub avg_sell_ratio: f64,
591    pub buy_ratio_volatility: f64,
592    pub avg_vwap: f64,
593    pub total_volume: f64,
594    pub total_buy_volume: f64,
595    pub total_sell_volume: f64,
596}
597
598/// Market microstructure analysis
599#[derive(Debug, Clone, Serialize, Deserialize, Default)]
600pub struct MicrostructureAnalysis {
601    pub avg_trade_intensity: f64,
602    pub avg_order_flow_imbalance: f64,
603    pub order_flow_volatility: f64,
604    pub avg_vwap_deviation: f64,
605    pub vwap_deviation_volatility: f64,
606}
607
608/// Batch processing errors with rich context
609#[derive(Debug, Error)]
610pub enum BatchError {
611    #[error("No data provided for symbol: {symbol}")]
612    EmptyData { symbol: String },
613
614    #[error("No symbol data provided")]
615    NoSymbolData,
616
617    #[error("Data conversion failed for symbol '{symbol}'")]
618    ConversionFailed {
619        symbol: String,
620        #[source]
621        source: ConversionError,
622    },
623
624    #[error("Computation failed for operation '{operation}'")]
625    ComputationFailed {
626        operation: String,
627        #[source]
628        source: Box<dyn std::error::Error + Send + Sync>,
629    },
630
631    #[error("Value extraction failed for {operation}")]
632    ValueExtractionFailed {
633        operation: String,
634        #[source]
635        source: Box<dyn std::error::Error + Send + Sync>,
636    },
637}
638
639/// Helper function to extract f64 values from Polars rows
640fn extract_f64_value(row: &Row, index: usize) -> Result<f64, BatchError> {
641    match row.0.get(index) {
642        Some(AnyValue::Float64(val)) => Ok(*val),
643        Some(AnyValue::Float32(val)) => Ok(*val as f64),
644        Some(AnyValue::Int64(val)) => Ok(*val as f64),
645        Some(AnyValue::Int32(val)) => Ok(*val as f64),
646        Some(AnyValue::Null) => Ok(0.0), // Return 0.0 for null values
647        Some(other) => Err(BatchError::ValueExtractionFailed {
648            operation: format!("extract_f64_at_index_{}", index),
649            source: format!("Unexpected type: {:?}", other).into(),
650        }),
651        None => Err(BatchError::ValueExtractionFailed {
652            operation: format!("extract_f64_at_index_{}", index),
653            source: "Value not found".into(),
654        }),
655    }
656}
657
658/// Helper function to extract i64 values from Polars rows
659fn extract_i64_value(row: &Row, index: usize) -> Result<i64, BatchError> {
660    match row.0.get(index) {
661        Some(AnyValue::Int64(val)) => Ok(*val),
662        Some(AnyValue::Int32(val)) => Ok(*val as i64),
663        Some(AnyValue::UInt64(val)) => Ok(*val as i64),
664        Some(AnyValue::UInt32(val)) => Ok(*val as i64),
665        Some(other) => Err(BatchError::ValueExtractionFailed {
666            operation: format!("extract_i64_at_index_{}", index),
667            source: format!("Unexpected type: {:?}", other).into(),
668        }),
669        None => Err(BatchError::ValueExtractionFailed {
670            operation: format!("extract_i64_at_index_{}", index),
671            source: "Value not found".into(),
672        }),
673    }
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679    use rangebar_core::{DataSource, FixedPoint, RangeBar};
680
681    fn create_test_range_bars() -> Vec<RangeBar> {
682        vec![
683            RangeBar {
684                open_time: 1000000,
685                close_time: 1000001,
686                open: FixedPoint(100000000),
687                high: FixedPoint(110000000),
688                low: FixedPoint(90000000),
689                close: FixedPoint(105000000),
690                volume: FixedPoint(1000000000),
691                turnover: 1050000000,
692                individual_trade_count: 5,
693                agg_record_count: 1,
694                first_trade_id: 1,
695                last_trade_id: 5,
696                data_source: DataSource::default(),
697                buy_volume: FixedPoint(600000000),
698                sell_volume: FixedPoint(400000000),
699                buy_trade_count: 3,
700                sell_trade_count: 2,
701                vwap: FixedPoint(105000000),
702                buy_turnover: 630000000,
703                sell_turnover: 420000000,
704            },
705            RangeBar {
706                open_time: 1000002,
707                close_time: 1000003,
708                open: FixedPoint(105000000),
709                high: FixedPoint(115000000),
710                low: FixedPoint(95000000),
711                close: FixedPoint(110000000),
712                volume: FixedPoint(2000000000),
713                turnover: 2200000000,
714                individual_trade_count: 8,
715                agg_record_count: 1,
716                first_trade_id: 6,
717                last_trade_id: 13,
718                data_source: DataSource::default(),
719                buy_volume: FixedPoint(1200000000),
720                sell_volume: FixedPoint(800000000),
721                buy_trade_count: 5,
722                sell_trade_count: 3,
723                vwap: FixedPoint(110000000),
724                buy_turnover: 1320000000,
725                sell_turnover: 880000000,
726            },
727        ]
728    }
729
730    #[test]
731    fn test_batch_analysis_engine_creation() {
732        let engine = BatchAnalysisEngine::new();
733        assert_eq!(engine.config.chunk_size, 100_000);
734
735        let custom_config = BatchConfig {
736            chunk_size: 50_000,
737            ..Default::default()
738        };
739        let custom_engine = BatchAnalysisEngine::with_config(custom_config);
740        assert_eq!(custom_engine.config.chunk_size, 50_000);
741    }
742
743    #[test]
744    fn test_single_symbol_analysis() {
745        let engine = BatchAnalysisEngine::new();
746        let range_bars = create_test_range_bars();
747
748        let result = engine
749            .analyze_single_symbol(&range_bars, "BTCUSDT")
750            .unwrap();
751
752        assert_eq!(result.symbol, "BTCUSDT");
753        assert_eq!(result.records_processed, 2);
754        assert_eq!(result.analysis.basic_stats.total_bars, 2);
755        assert_eq!(result.analysis.basic_stats.total_trades, 13);
756    }
757
758    #[test]
759    fn test_multiple_symbols_analysis() {
760        let engine = BatchAnalysisEngine::new();
761        let mut symbol_data = HashMap::new();
762        symbol_data.insert("BTCUSDT".to_string(), create_test_range_bars());
763        symbol_data.insert("ETHUSDT".to_string(), create_test_range_bars());
764
765        let results = engine.analyze_multiple_symbols(symbol_data).unwrap();
766
767        assert_eq!(results.len(), 2);
768        assert!(results.iter().any(|r| r.symbol == "BTCUSDT"));
769        assert!(results.iter().any(|r| r.symbol == "ETHUSDT"));
770    }
771
772    #[test]
773    fn test_empty_data_error() {
774        let engine = BatchAnalysisEngine::new();
775        let empty_bars: Vec<RangeBar> = vec![];
776
777        let result = engine.analyze_single_symbol(&empty_bars, "BTCUSDT");
778        assert!(matches!(result, Err(BatchError::EmptyData { .. })));
779    }
780
781    #[test]
782    fn test_no_symbol_data_error() {
783        let engine = BatchAnalysisEngine::new();
784        let empty_data = HashMap::new();
785
786        let result = engine.analyze_multiple_symbols(empty_data);
787        assert!(matches!(result, Err(BatchError::NoSymbolData)));
788    }
789}