use chrono::{DateTime, Datelike, Timelike, Utc};
use serde::{Deserialize, Serialize};
use shape_ast::error::{Result, ResultExt, ShapeError};
use std::collections::HashMap;
use crate::data::DataFrame;
use crate::{QueryResult as RuntimeQueryResult, Runtime};
use shape_ast::parser;
pub struct QueryExecutor {
runtime: Runtime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryResult {
pub query: String,
pub query_type: QueryType,
pub matches: Vec<PatternMatch>,
pub statistics: QueryStatistics,
pub metadata: ExecutionMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueryType {
Find,
Scan,
Analyze,
Alert,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PatternMatch {
pub pattern_name: String,
pub id: Option<String>,
pub timestamp: DateTime<Utc>,
pub row_index: usize,
pub pattern_length: usize,
pub confidence: f64,
pub attributes: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryStatistics {
pub total_matches: usize,
pub unique_patterns: usize,
pub time_range: TimeRange,
pub performance: PerformanceMetrics,
pub pattern_frequency: HashMap<String, usize>,
pub time_distribution: TimeDistribution,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeRange {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
pub row_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub avg_confidence: f64,
pub success_rate: f64,
pub avg_duration: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeDistribution {
pub hourly: HashMap<u32, usize>,
pub daily: HashMap<String, usize>,
pub monthly: HashMap<String, usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionMetadata {
pub executed_at: DateTime<Utc>,
pub execution_time_ms: u64,
pub rows_processed: usize,
pub warnings: Vec<String>,
}
impl QueryExecutor {
pub fn new() -> Self {
Self {
runtime: Runtime::new(),
}
}
pub fn execute(&mut self, query: &str, data: &DataFrame) -> Result<QueryResult> {
let start_time = std::time::Instant::now();
let executed_at = Utc::now();
let program = parser::parse_program(query).with_context("Failed to parse Shape query")?;
self.runtime
.load_program(&program, data)
.with_context("Failed to load program")?;
let query_item = program
.items
.iter()
.find(|item| matches!(item, shape_ast::ast::Item::Query(_, _)))
.ok_or_else(|| ShapeError::RuntimeError {
message: "No query found in program".to_string(),
location: None,
})?;
let runtime_result = self
.runtime
.execute_query(query_item, data)
.with_context("Query execution failed")?;
let query_result = self.build_query_result(
query,
runtime_result,
data,
executed_at,
start_time.elapsed(),
)?;
Ok(query_result)
}
pub fn execute_json(&mut self, query: &str, data: &DataFrame) -> Result<String> {
let result = self.execute(query, data)?;
let json = serde_json::to_string_pretty(&result).map_err(|e| ShapeError::RuntimeError {
message: format!("Failed to serialize result to JSON: {}", e),
location: None,
})?;
Ok(json)
}
fn build_query_result(
&self,
query: &str,
runtime_result: RuntimeQueryResult,
data: &DataFrame,
executed_at: DateTime<Utc>,
elapsed: std::time::Duration,
) -> Result<QueryResult> {
let matches = self.extract_matches(&runtime_result, data)?;
let statistics = self.calculate_statistics(&matches, data)?;
let query_type = self.determine_query_type(query)?;
let metadata = ExecutionMetadata {
executed_at,
execution_time_ms: elapsed.as_millis() as u64,
rows_processed: data.row_count(),
warnings: Vec::new(),
};
Ok(QueryResult {
query: query.to_string(),
query_type,
matches,
statistics,
metadata,
})
}
fn extract_matches(
&self,
runtime_result: &RuntimeQueryResult,
_data: &DataFrame,
) -> Result<Vec<PatternMatch>> {
let mut matches = Vec::new();
if let Some(runtime_matches) = &runtime_result.matches {
for pm in runtime_matches {
matches.push(PatternMatch {
pattern_name: pm.pattern_name.clone(),
id: Some(pm.id.clone()),
timestamp: pm.timestamp,
row_index: pm.index,
pattern_length: 1, confidence: pm.confidence,
attributes: pm.metadata.clone(),
});
}
}
Ok(matches)
}
fn calculate_statistics(
&self,
matches: &[PatternMatch],
data: &DataFrame,
) -> Result<QueryStatistics> {
let time_range = self.calculate_time_range(data)?;
let performance = self.calculate_performance_metrics(matches)?;
let pattern_frequency = self.calculate_pattern_frequency(matches);
let time_distribution = self.calculate_time_distribution(matches)?;
Ok(QueryStatistics {
total_matches: matches.len(),
unique_patterns: pattern_frequency.len(),
time_range,
performance,
pattern_frequency,
time_distribution,
})
}
fn calculate_time_range(&self, data: &DataFrame) -> Result<TimeRange> {
if data.is_empty() {
return Err(ShapeError::DataError {
message: "No rows in data".to_string(),
symbol: None,
timeframe: None,
});
}
let start_ts = data.get_timestamp(0).unwrap();
let last_ts = data.get_timestamp(data.row_count() - 1).unwrap();
Ok(TimeRange {
start: DateTime::from_timestamp(start_ts, 0).unwrap_or_else(Utc::now),
end: DateTime::from_timestamp(last_ts, 0).unwrap_or_else(Utc::now),
row_count: data.row_count(),
})
}
fn calculate_performance_metrics(
&self,
matches: &[PatternMatch],
) -> Result<PerformanceMetrics> {
if matches.is_empty() {
return Ok(PerformanceMetrics {
avg_confidence: 0.0,
success_rate: 0.0,
avg_duration: 0.0,
});
}
let mut confidences = Vec::new();
let mut successes = 0;
let mut durations = Vec::new();
for pattern_match in matches {
confidences.push(pattern_match.confidence);
if pattern_match.confidence > 0.5 {
successes += 1;
}
durations.push(pattern_match.pattern_length as f64);
}
let avg_confidence = confidences.iter().sum::<f64>() / confidences.len() as f64;
let success_rate = successes as f64 / matches.len() as f64;
let avg_duration = durations.iter().sum::<f64>() / durations.len() as f64;
Ok(PerformanceMetrics {
avg_confidence,
success_rate,
avg_duration,
})
}
fn calculate_pattern_frequency(&self, matches: &[PatternMatch]) -> HashMap<String, usize> {
let mut frequency = HashMap::new();
for m in matches {
*frequency.entry(m.pattern_name.clone()).or_insert(0) += 1;
}
frequency
}
fn calculate_time_distribution(&self, matches: &[PatternMatch]) -> Result<TimeDistribution> {
let mut hourly = HashMap::new();
let mut daily = HashMap::new();
let mut monthly = HashMap::new();
for m in matches {
*hourly.entry(m.timestamp.hour()).or_insert(0) += 1;
*daily.entry(m.timestamp.weekday().to_string()).or_insert(0) += 1;
*monthly.entry(m.timestamp.month().to_string()).or_insert(0) += 1;
}
Ok(TimeDistribution {
hourly,
daily,
monthly,
})
}
fn determine_query_type(&self, query: &str) -> Result<QueryType> {
let query_lower = query.to_lowercase();
if query_lower.contains("find") {
Ok(QueryType::Find)
} else if query_lower.contains("scan") {
Ok(QueryType::Scan)
} else if query_lower.contains("analyze") {
Ok(QueryType::Analyze)
} else if query_lower.contains("alert") {
Ok(QueryType::Alert)
} else {
Ok(QueryType::Find) }
}
}
impl Default for QueryExecutor {
fn default() -> Self {
Self::new()
}
}