Skip to main content

shape_runtime/
query_executor.rs

1//! High-level query execution API for Shape
2//!
3//! This module provides the main interface for executing Shape queries
4//! against data and generating results with statistics.
5
6use chrono::{DateTime, Datelike, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use shape_ast::error::{Result, ResultExt, ShapeError};
9use std::collections::HashMap;
10
11use crate::data::DataFrame;
12use crate::semantic::SemanticAnalyzer;
13use crate::{QueryResult as RuntimeQueryResult, Runtime};
14use shape_ast::parser;
15
16/// Main query executor that orchestrates the entire Shape pipeline
17pub struct QueryExecutor {
18    runtime: Runtime,
19    analyzer: SemanticAnalyzer,
20}
21
22/// Result of executing a Shape query
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct QueryResult {
25    /// The original query string
26    pub query: String,
27
28    /// Type of query executed
29    pub query_type: QueryType,
30
31    /// Pattern matches found
32    pub matches: Vec<PatternMatch>,
33
34    /// Statistics about the results
35    pub statistics: QueryStatistics,
36
37    /// Execution metadata
38    pub metadata: ExecutionMetadata,
39}
40
41/// Types of Shape queries
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum QueryType {
44    Find,
45    Scan,
46    Analyze,
47    Alert,
48}
49
50/// A single pattern match result
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct PatternMatch {
53    /// Pattern name that matched
54    pub pattern_name: String,
55
56    /// ID (if applicable)
57    pub id: Option<String>,
58
59    /// Time when pattern was found
60    pub timestamp: DateTime<Utc>,
61
62    /// Row index where pattern starts
63    pub row_index: usize,
64
65    /// Number of elements in the pattern
66    pub pattern_length: usize,
67
68    /// Match confidence (0.0 to 1.0)
69    pub confidence: f64,
70
71    /// Additional pattern-specific data
72    pub attributes: serde_json::Value,
73}
74
75/// Statistics about query results
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct QueryStatistics {
78    /// Total number of matches
79    pub total_matches: usize,
80
81    /// Number of unique patterns found
82    pub unique_patterns: usize,
83
84    /// Time range analyzed
85    pub time_range: TimeRange,
86
87    /// Generic performance metrics
88    pub performance: PerformanceMetrics,
89
90    /// Pattern frequency
91    pub pattern_frequency: HashMap<String, usize>,
92
93    /// Time distribution of matches
94    pub time_distribution: TimeDistribution,
95}
96
97/// Time range information
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct TimeRange {
100    pub start: DateTime<Utc>,
101    pub end: DateTime<Utc>,
102    pub row_count: usize,
103}
104
105/// Generic metrics for pattern matches
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct PerformanceMetrics {
108    /// Average confidence of matches
109    pub avg_confidence: f64,
110
111    /// Success rate (confidence > threshold)
112    pub success_rate: f64,
113
114    /// Average duration in elements
115    pub avg_duration: f64,
116}
117
118/// Time distribution of pattern matches
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct TimeDistribution {
121    /// Matches by hour of day
122    pub hourly: HashMap<u32, usize>,
123
124    /// Matches by day of week
125    pub daily: HashMap<String, usize>,
126
127    /// Matches by month
128    pub monthly: HashMap<String, usize>,
129}
130
131/// Metadata about query execution
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct ExecutionMetadata {
134    /// When the query was executed
135    pub executed_at: DateTime<Utc>,
136
137    /// Execution time in milliseconds
138    pub execution_time_ms: u64,
139
140    /// Number of rows processed
141    pub rows_processed: usize,
142
143    /// Any warnings during execution
144    pub warnings: Vec<String>,
145}
146
147impl QueryExecutor {
148    /// Create a new query executor
149    pub fn new() -> Self {
150        Self {
151            runtime: Runtime::new(),
152            analyzer: SemanticAnalyzer::new(),
153        }
154    }
155
156    /// Execute a Shape query against data
157    pub fn execute(&mut self, query: &str, data: &DataFrame) -> Result<QueryResult> {
158        let start_time = std::time::Instant::now();
159        let executed_at = Utc::now();
160
161        // Parse the query
162        let program = parser::parse_program(query).with_context("Failed to parse Shape query")?;
163
164        // Analyze semantically
165        self.analyzer
166            .analyze(&program)
167            .with_context("Semantic analysis failed")?;
168
169        // Load the program first
170        self.runtime
171            .load_program(&program, data)
172            .with_context("Failed to load program")?;
173
174        // Find and execute the first query item
175        let query_item = program
176            .items
177            .iter()
178            .find(|item| matches!(item, shape_ast::ast::Item::Query(_, _)))
179            .ok_or_else(|| ShapeError::RuntimeError {
180                message: "No query found in program".to_string(),
181                location: None,
182            })?;
183
184        let runtime_result = self
185            .runtime
186            .execute_query(query_item, data)
187            .with_context("Query execution failed")?;
188
189        // Convert runtime results to our result format
190        let query_result = self.build_query_result(
191            query,
192            runtime_result,
193            data,
194            executed_at,
195            start_time.elapsed(),
196        )?;
197
198        Ok(query_result)
199    }
200
201    /// Execute a query and return results in JSON format
202    pub fn execute_json(&mut self, query: &str, data: &DataFrame) -> Result<String> {
203        let result = self.execute(query, data)?;
204        let json = serde_json::to_string_pretty(&result).map_err(|e| ShapeError::RuntimeError {
205            message: format!("Failed to serialize result to JSON: {}", e),
206            location: None,
207        })?;
208        Ok(json)
209    }
210
211    /// Build the final query result from runtime results
212    fn build_query_result(
213        &self,
214        query: &str,
215        runtime_result: RuntimeQueryResult,
216        data: &DataFrame,
217        executed_at: DateTime<Utc>,
218        elapsed: std::time::Duration,
219    ) -> Result<QueryResult> {
220        // Extract matches from runtime result
221        let matches = self.extract_matches(&runtime_result, data)?;
222
223        // Calculate statistics
224        let statistics = self.calculate_statistics(&matches, data)?;
225
226        // Determine query type
227        let query_type = self.determine_query_type(query)?;
228
229        // Build metadata
230        let metadata = ExecutionMetadata {
231            executed_at,
232            execution_time_ms: elapsed.as_millis() as u64,
233            rows_processed: data.row_count(),
234            warnings: Vec::new(),
235        };
236
237        Ok(QueryResult {
238            query: query.to_string(),
239            query_type,
240            matches,
241            statistics,
242            metadata,
243        })
244    }
245
246    /// Extract pattern matches from runtime results
247    fn extract_matches(
248        &self,
249        runtime_result: &RuntimeQueryResult,
250        _data: &DataFrame,
251    ) -> Result<Vec<PatternMatch>> {
252        let mut matches = Vec::new();
253
254        if let Some(runtime_matches) = &runtime_result.matches {
255            for pm in runtime_matches {
256                matches.push(PatternMatch {
257                    pattern_name: pm.pattern_name.clone(),
258                    id: Some(pm.id.clone()),
259                    timestamp: pm.timestamp,
260                    row_index: pm.index,
261                    pattern_length: 1, // Default
262                    confidence: pm.confidence,
263                    attributes: pm.metadata.clone(),
264                });
265            }
266        }
267
268        Ok(matches)
269    }
270
271    /// Calculate statistics from matches
272    fn calculate_statistics(
273        &self,
274        matches: &[PatternMatch],
275        data: &DataFrame,
276    ) -> Result<QueryStatistics> {
277        // Calculate time range
278        let time_range = self.calculate_time_range(data)?;
279
280        // Calculate performance metrics
281        let performance = self.calculate_performance_metrics(matches)?;
282
283        // Calculate pattern frequency
284        let pattern_frequency = self.calculate_pattern_frequency(matches);
285
286        // Calculate time distribution
287        let time_distribution = self.calculate_time_distribution(matches)?;
288
289        Ok(QueryStatistics {
290            total_matches: matches.len(),
291            unique_patterns: pattern_frequency.len(),
292            time_range,
293            performance,
294            pattern_frequency,
295            time_distribution,
296        })
297    }
298
299    /// Calculate time range of data
300    fn calculate_time_range(&self, data: &DataFrame) -> Result<TimeRange> {
301        if data.is_empty() {
302            return Err(ShapeError::DataError {
303                message: "No rows in data".to_string(),
304                symbol: None,
305                timeframe: None,
306            });
307        }
308
309        let start_ts = data.get_timestamp(0).unwrap();
310        let last_ts = data.get_timestamp(data.row_count() - 1).unwrap();
311
312        Ok(TimeRange {
313            start: DateTime::from_timestamp(start_ts, 0).unwrap_or_else(Utc::now),
314            end: DateTime::from_timestamp(last_ts, 0).unwrap_or_else(Utc::now),
315            row_count: data.row_count(),
316        })
317    }
318
319    /// Calculate metrics from matches
320    fn calculate_performance_metrics(
321        &self,
322        matches: &[PatternMatch],
323    ) -> Result<PerformanceMetrics> {
324        if matches.is_empty() {
325            return Ok(PerformanceMetrics {
326                avg_confidence: 0.0,
327                success_rate: 0.0,
328                avg_duration: 0.0,
329            });
330        }
331
332        let mut confidences = Vec::new();
333        let mut successes = 0;
334        let mut durations = Vec::new();
335
336        for pattern_match in matches {
337            confidences.push(pattern_match.confidence);
338            if pattern_match.confidence > 0.5 {
339                successes += 1;
340            }
341            durations.push(pattern_match.pattern_length as f64);
342        }
343
344        let avg_confidence = confidences.iter().sum::<f64>() / confidences.len() as f64;
345        let success_rate = successes as f64 / matches.len() as f64;
346        let avg_duration = durations.iter().sum::<f64>() / durations.len() as f64;
347
348        Ok(PerformanceMetrics {
349            avg_confidence,
350            success_rate,
351            avg_duration,
352        })
353    }
354
355    /// Calculate pattern frequency
356    fn calculate_pattern_frequency(&self, matches: &[PatternMatch]) -> HashMap<String, usize> {
357        let mut frequency = HashMap::new();
358        for m in matches {
359            *frequency.entry(m.pattern_name.clone()).or_insert(0) += 1;
360        }
361        frequency
362    }
363
364    /// Calculate time distribution of matches
365    fn calculate_time_distribution(&self, matches: &[PatternMatch]) -> Result<TimeDistribution> {
366        let mut hourly = HashMap::new();
367        let mut daily = HashMap::new();
368        let mut monthly = HashMap::new();
369
370        for m in matches {
371            *hourly.entry(m.timestamp.hour()).or_insert(0) += 1;
372            *daily.entry(m.timestamp.weekday().to_string()).or_insert(0) += 1;
373            *monthly.entry(m.timestamp.month().to_string()).or_insert(0) += 1;
374        }
375
376        Ok(TimeDistribution {
377            hourly,
378            daily,
379            monthly,
380        })
381    }
382
383    /// Determine query type from query string
384    fn determine_query_type(&self, query: &str) -> Result<QueryType> {
385        let query_lower = query.to_lowercase();
386        if query_lower.contains("find") {
387            Ok(QueryType::Find)
388        } else if query_lower.contains("scan") {
389            Ok(QueryType::Scan)
390        } else if query_lower.contains("analyze") {
391            Ok(QueryType::Analyze)
392        } else if query_lower.contains("alert") {
393            Ok(QueryType::Alert)
394        } else {
395            Ok(QueryType::Find) // Default
396        }
397    }
398}
399
400impl Default for QueryExecutor {
401    fn default() -> Self {
402        Self::new()
403    }
404}