1use chrono::{DateTime, Datelike, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use shape_ast::error::{Result, ResultExt, ShapeError};
9use std::collections::HashMap;
10
11use crate::data::DataFrame;
12use crate::semantic::SemanticAnalyzer;
13use crate::{QueryResult as RuntimeQueryResult, Runtime};
14use shape_ast::parser;
15
16pub struct QueryExecutor {
18 runtime: Runtime,
19 analyzer: SemanticAnalyzer,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct QueryResult {
25 pub query: String,
27
28 pub query_type: QueryType,
30
31 pub matches: Vec<PatternMatch>,
33
34 pub statistics: QueryStatistics,
36
37 pub metadata: ExecutionMetadata,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum QueryType {
44 Find,
45 Scan,
46 Analyze,
47 Alert,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct PatternMatch {
53 pub pattern_name: String,
55
56 pub id: Option<String>,
58
59 pub timestamp: DateTime<Utc>,
61
62 pub row_index: usize,
64
65 pub pattern_length: usize,
67
68 pub confidence: f64,
70
71 pub attributes: serde_json::Value,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct QueryStatistics {
78 pub total_matches: usize,
80
81 pub unique_patterns: usize,
83
84 pub time_range: TimeRange,
86
87 pub performance: PerformanceMetrics,
89
90 pub pattern_frequency: HashMap<String, usize>,
92
93 pub time_distribution: TimeDistribution,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct TimeRange {
100 pub start: DateTime<Utc>,
101 pub end: DateTime<Utc>,
102 pub row_count: usize,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct PerformanceMetrics {
108 pub avg_confidence: f64,
110
111 pub success_rate: f64,
113
114 pub avg_duration: f64,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct TimeDistribution {
121 pub hourly: HashMap<u32, usize>,
123
124 pub daily: HashMap<String, usize>,
126
127 pub monthly: HashMap<String, usize>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct ExecutionMetadata {
134 pub executed_at: DateTime<Utc>,
136
137 pub execution_time_ms: u64,
139
140 pub rows_processed: usize,
142
143 pub warnings: Vec<String>,
145}
146
147impl QueryExecutor {
148 pub fn new() -> Self {
150 Self {
151 runtime: Runtime::new(),
152 analyzer: SemanticAnalyzer::new(),
153 }
154 }
155
156 pub fn execute(&mut self, query: &str, data: &DataFrame) -> Result<QueryResult> {
158 let start_time = std::time::Instant::now();
159 let executed_at = Utc::now();
160
161 let program = parser::parse_program(query).with_context("Failed to parse Shape query")?;
163
164 self.analyzer
166 .analyze(&program)
167 .with_context("Semantic analysis failed")?;
168
169 self.runtime
171 .load_program(&program, data)
172 .with_context("Failed to load program")?;
173
174 let query_item = program
176 .items
177 .iter()
178 .find(|item| matches!(item, shape_ast::ast::Item::Query(_, _)))
179 .ok_or_else(|| ShapeError::RuntimeError {
180 message: "No query found in program".to_string(),
181 location: None,
182 })?;
183
184 let runtime_result = self
185 .runtime
186 .execute_query(query_item, data)
187 .with_context("Query execution failed")?;
188
189 let query_result = self.build_query_result(
191 query,
192 runtime_result,
193 data,
194 executed_at,
195 start_time.elapsed(),
196 )?;
197
198 Ok(query_result)
199 }
200
201 pub fn execute_json(&mut self, query: &str, data: &DataFrame) -> Result<String> {
203 let result = self.execute(query, data)?;
204 let json = serde_json::to_string_pretty(&result).map_err(|e| ShapeError::RuntimeError {
205 message: format!("Failed to serialize result to JSON: {}", e),
206 location: None,
207 })?;
208 Ok(json)
209 }
210
211 fn build_query_result(
213 &self,
214 query: &str,
215 runtime_result: RuntimeQueryResult,
216 data: &DataFrame,
217 executed_at: DateTime<Utc>,
218 elapsed: std::time::Duration,
219 ) -> Result<QueryResult> {
220 let matches = self.extract_matches(&runtime_result, data)?;
222
223 let statistics = self.calculate_statistics(&matches, data)?;
225
226 let query_type = self.determine_query_type(query)?;
228
229 let metadata = ExecutionMetadata {
231 executed_at,
232 execution_time_ms: elapsed.as_millis() as u64,
233 rows_processed: data.row_count(),
234 warnings: Vec::new(),
235 };
236
237 Ok(QueryResult {
238 query: query.to_string(),
239 query_type,
240 matches,
241 statistics,
242 metadata,
243 })
244 }
245
246 fn extract_matches(
248 &self,
249 runtime_result: &RuntimeQueryResult,
250 _data: &DataFrame,
251 ) -> Result<Vec<PatternMatch>> {
252 let mut matches = Vec::new();
253
254 if let Some(runtime_matches) = &runtime_result.matches {
255 for pm in runtime_matches {
256 matches.push(PatternMatch {
257 pattern_name: pm.pattern_name.clone(),
258 id: Some(pm.id.clone()),
259 timestamp: pm.timestamp,
260 row_index: pm.index,
261 pattern_length: 1, confidence: pm.confidence,
263 attributes: pm.metadata.clone(),
264 });
265 }
266 }
267
268 Ok(matches)
269 }
270
271 fn calculate_statistics(
273 &self,
274 matches: &[PatternMatch],
275 data: &DataFrame,
276 ) -> Result<QueryStatistics> {
277 let time_range = self.calculate_time_range(data)?;
279
280 let performance = self.calculate_performance_metrics(matches)?;
282
283 let pattern_frequency = self.calculate_pattern_frequency(matches);
285
286 let time_distribution = self.calculate_time_distribution(matches)?;
288
289 Ok(QueryStatistics {
290 total_matches: matches.len(),
291 unique_patterns: pattern_frequency.len(),
292 time_range,
293 performance,
294 pattern_frequency,
295 time_distribution,
296 })
297 }
298
299 fn calculate_time_range(&self, data: &DataFrame) -> Result<TimeRange> {
301 if data.is_empty() {
302 return Err(ShapeError::DataError {
303 message: "No rows in data".to_string(),
304 symbol: None,
305 timeframe: None,
306 });
307 }
308
309 let start_ts = data.get_timestamp(0).unwrap();
310 let last_ts = data.get_timestamp(data.row_count() - 1).unwrap();
311
312 Ok(TimeRange {
313 start: DateTime::from_timestamp(start_ts, 0).unwrap_or_else(Utc::now),
314 end: DateTime::from_timestamp(last_ts, 0).unwrap_or_else(Utc::now),
315 row_count: data.row_count(),
316 })
317 }
318
319 fn calculate_performance_metrics(
321 &self,
322 matches: &[PatternMatch],
323 ) -> Result<PerformanceMetrics> {
324 if matches.is_empty() {
325 return Ok(PerformanceMetrics {
326 avg_confidence: 0.0,
327 success_rate: 0.0,
328 avg_duration: 0.0,
329 });
330 }
331
332 let mut confidences = Vec::new();
333 let mut successes = 0;
334 let mut durations = Vec::new();
335
336 for pattern_match in matches {
337 confidences.push(pattern_match.confidence);
338 if pattern_match.confidence > 0.5 {
339 successes += 1;
340 }
341 durations.push(pattern_match.pattern_length as f64);
342 }
343
344 let avg_confidence = confidences.iter().sum::<f64>() / confidences.len() as f64;
345 let success_rate = successes as f64 / matches.len() as f64;
346 let avg_duration = durations.iter().sum::<f64>() / durations.len() as f64;
347
348 Ok(PerformanceMetrics {
349 avg_confidence,
350 success_rate,
351 avg_duration,
352 })
353 }
354
355 fn calculate_pattern_frequency(&self, matches: &[PatternMatch]) -> HashMap<String, usize> {
357 let mut frequency = HashMap::new();
358 for m in matches {
359 *frequency.entry(m.pattern_name.clone()).or_insert(0) += 1;
360 }
361 frequency
362 }
363
364 fn calculate_time_distribution(&self, matches: &[PatternMatch]) -> Result<TimeDistribution> {
366 let mut hourly = HashMap::new();
367 let mut daily = HashMap::new();
368 let mut monthly = HashMap::new();
369
370 for m in matches {
371 *hourly.entry(m.timestamp.hour()).or_insert(0) += 1;
372 *daily.entry(m.timestamp.weekday().to_string()).or_insert(0) += 1;
373 *monthly.entry(m.timestamp.month().to_string()).or_insert(0) += 1;
374 }
375
376 Ok(TimeDistribution {
377 hourly,
378 daily,
379 monthly,
380 })
381 }
382
383 fn determine_query_type(&self, query: &str) -> Result<QueryType> {
385 let query_lower = query.to_lowercase();
386 if query_lower.contains("find") {
387 Ok(QueryType::Find)
388 } else if query_lower.contains("scan") {
389 Ok(QueryType::Scan)
390 } else if query_lower.contains("analyze") {
391 Ok(QueryType::Analyze)
392 } else if query_lower.contains("alert") {
393 Ok(QueryType::Alert)
394 } else {
395 Ok(QueryType::Find) }
397 }
398}
399
400impl Default for QueryExecutor {
401 fn default() -> Self {
402 Self::new()
403 }
404}