use anyhow::{anyhow, Result};
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, info};
use crate::store_integration::{QueryResult, RdfStore, Triple};
use crate::StreamEvent;
pub struct CqelsEngine {
store: Arc<dyn RdfStore>,
streams: Arc<RwLock<HashMap<String, CqelsStream>>>,
queries: Arc<RwLock<HashMap<String, CqelsQuery>>>,
plans: Arc<RwLock<HashMap<String, ExecutionPlan>>>,
config: CqelsConfig,
stats: Arc<RwLock<CqelsStats>>,
}
#[derive(Debug, Clone)]
pub struct CqelsConfig {
pub max_queries: usize,
pub incremental_evaluation: bool,
pub adaptive_optimization: bool,
pub window_buffer_size: usize,
pub join_buffer_size: usize,
pub physical_timestamps: bool,
}
impl Default for CqelsConfig {
fn default() -> Self {
Self {
max_queries: 100,
incremental_evaluation: true,
adaptive_optimization: true,
window_buffer_size: 10000,
join_buffer_size: 10000,
physical_timestamps: true,
}
}
}
pub struct CqelsStream {
pub id: String,
pub uri: String,
pub buffer: VecDeque<StreamTriple>,
pub schema: Option<StreamSchema>,
pub metadata: StreamMetadata,
}
#[derive(Debug, Clone)]
pub struct StreamTriple {
pub triple: Triple,
pub timestamp: DateTime<Utc>,
pub sequence_id: u64,
pub source_id: String,
}
#[derive(Debug, Clone)]
pub struct StreamSchema {
pub predicates: HashSet<String>,
pub value_types: HashMap<String, ValueType>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ValueType {
IRI,
Literal,
Integer,
Float,
Boolean,
DateTime,
}
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub created_at: DateTime<Utc>,
pub event_count: u64,
pub avg_event_rate: f64,
pub last_event_time: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone)]
pub struct CqelsQuery {
pub id: String,
pub query_string: String,
pub operators: Vec<CqelsOperator>,
pub metadata: QueryMetadata,
pub state: ExecutionState,
}
#[derive(Debug, Clone)]
pub enum CqelsOperator {
StreamScan {
stream_uri: String,
window: WindowDefinition,
},
StaticScan {
graph_uri: Option<String>,
pattern: TriplePattern,
},
StreamJoin {
left: Box<CqelsOperator>,
right: Box<CqelsOperator>,
condition: JoinCondition,
},
HybridJoin {
stream_op: Box<CqelsOperator>,
static_op: Box<CqelsOperator>,
condition: JoinCondition,
},
Filter {
input: Box<CqelsOperator>,
condition: FilterCondition,
},
Project {
input: Box<CqelsOperator>,
variables: Vec<String>,
},
Aggregate {
input: Box<CqelsOperator>,
functions: Vec<AggregateFunction>,
group_by: Vec<String>,
},
}
#[derive(Debug, Clone)]
pub struct WindowDefinition {
pub window_type: CqelsWindowType,
pub time_range: Option<Duration>,
pub triple_count: Option<usize>,
pub slide: Option<Duration>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum CqelsWindowType {
TimeTumbling,
TimeSliding,
CountTumbling,
CountSliding,
Now,
}
#[derive(Debug, Clone)]
pub struct TriplePattern {
pub subject: PatternNode,
pub predicate: PatternNode,
pub object: PatternNode,
}
#[derive(Debug, Clone)]
pub enum PatternNode {
Variable(String),
IRI(String),
Literal(String),
Blank(String),
}
#[derive(Debug, Clone)]
pub struct JoinCondition {
pub left_var: String,
pub right_var: String,
pub join_type: JoinType,
}
#[derive(Debug, Clone, PartialEq)]
pub enum JoinType {
Inner,
LeftOuter,
RightOuter,
FullOuter,
}
#[derive(Debug, Clone)]
pub enum FilterCondition {
Equals { var: String, value: String },
NotEquals { var: String, value: String },
LessThan { var: String, value: String },
GreaterThan { var: String, value: String },
Regex { var: String, pattern: String },
And(Box<FilterCondition>, Box<FilterCondition>),
Or(Box<FilterCondition>, Box<FilterCondition>),
Not(Box<FilterCondition>),
}
#[derive(Debug, Clone)]
pub struct AggregateFunction {
pub function: AggregateFunctionType,
pub variable: String,
pub alias: String,
}
#[derive(Debug, Clone, PartialEq)]
pub enum AggregateFunctionType {
Count,
Sum,
Avg,
Min,
Max,
}
#[derive(Debug, Clone)]
pub struct QueryMetadata {
pub name: Option<String>,
pub description: Option<String>,
pub created_at: DateTime<Utc>,
pub owner: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ExecutionState {
Idle,
Running,
Paused,
Completed,
Failed(String),
}
#[derive(Debug, Clone)]
pub struct ExecutionPlan {
pub id: String,
pub root: CqelsOperator,
pub stats: HashMap<String, OperatorStats>,
pub hints: Vec<OptimizationHint>,
}
#[derive(Debug, Clone, Default)]
pub struct OperatorStats {
pub input_tuples: u64,
pub output_tuples: u64,
pub execution_time_ms: f64,
pub memory_usage_bytes: usize,
}
#[derive(Debug, Clone)]
pub enum OptimizationHint {
PushDownFilter,
MaterializeJoin,
UseIndex(String),
ParallelExecution,
}
#[derive(Debug, Clone, Default)]
pub struct CqelsStats {
pub queries_registered: u64,
pub queries_executed: u64,
pub total_stream_triples: u64,
pub total_static_triples: u64,
pub total_joins_performed: u64,
pub avg_query_latency_ms: f64,
pub memory_usage_bytes: usize,
}
impl CqelsEngine {
pub fn new(store: Arc<dyn RdfStore>, config: CqelsConfig) -> Self {
Self {
store,
streams: Arc::new(RwLock::new(HashMap::new())),
queries: Arc::new(RwLock::new(HashMap::new())),
plans: Arc::new(RwLock::new(HashMap::new())),
config,
stats: Arc::new(RwLock::new(CqelsStats::default())),
}
}
pub async fn register_stream(&self, uri: String) -> Result<String> {
let stream_id = uuid::Uuid::new_v4().to_string();
let stream = CqelsStream {
id: stream_id.clone(),
uri,
buffer: VecDeque::with_capacity(self.config.window_buffer_size),
schema: None,
metadata: StreamMetadata {
created_at: Utc::now(),
event_count: 0,
avg_event_rate: 0.0,
last_event_time: None,
},
};
let mut streams = self.streams.write().await;
streams.insert(stream_id.clone(), stream);
info!("Registered CQELS stream: {}", stream_id);
Ok(stream_id)
}
pub async fn register_query(&self, query_string: String) -> Result<String> {
let query_id = uuid::Uuid::new_v4().to_string();
let operators = self.parse_cqels_query(&query_string)?;
let query = CqelsQuery {
id: query_id.clone(),
query_string,
operators,
metadata: QueryMetadata {
name: None,
description: None,
created_at: Utc::now(),
owner: None,
},
state: ExecutionState::Idle,
};
let plan = self.create_execution_plan(&query)?;
let mut queries = self.queries.write().await;
if queries.len() >= self.config.max_queries {
return Err(anyhow!("Maximum number of queries reached"));
}
queries.insert(query_id.clone(), query);
let mut plans = self.plans.write().await;
plans.insert(query_id.clone(), plan);
let mut stats = self.stats.write().await;
stats.queries_registered += 1;
info!("Registered CQELS query: {}", query_id);
Ok(query_id)
}
pub async fn process_event(&self, stream_uri: &str, event: &StreamEvent) -> Result<()> {
let triples = self.extract_triples_from_event(event)?;
let mut streams = self.streams.write().await;
let stream = streams
.values_mut()
.find(|s| s.uri == stream_uri)
.ok_or_else(|| anyhow!("Stream not found: {}", stream_uri))?;
for triple in &triples {
let stream_triple = StreamTriple {
triple: triple.clone(),
timestamp: Utc::now(),
sequence_id: stream.metadata.event_count,
source_id: stream.id.clone(),
};
stream.buffer.push_back(stream_triple);
stream.metadata.event_count += 1;
stream.metadata.last_event_time = Some(Utc::now());
if stream.buffer.len() > self.config.window_buffer_size {
stream.buffer.pop_front();
}
}
let mut stats = self.stats.write().await;
stats.total_stream_triples += triples.len() as u64;
Ok(())
}
pub async fn execute_query(&self, query_id: &str) -> Result<QueryResult> {
let queries = self.queries.read().await;
let _query = queries
.get(query_id)
.ok_or_else(|| anyhow!("Query not found: {}", query_id))?;
let plans = self.plans.read().await;
let plan = plans
.get(query_id)
.ok_or_else(|| anyhow!("Execution plan not found: {}", query_id))?;
let result = self.execute_plan(plan).await?;
let mut stats = self.stats.write().await;
stats.queries_executed += 1;
Ok(result)
}
fn parse_cqels_query(&self, query: &str) -> Result<Vec<CqelsOperator>> {
let mut operators = Vec::new();
if query.to_uppercase().contains("SELECT") {
let stream_scan = CqelsOperator::StreamScan {
stream_uri: "http://example.org/stream".to_string(),
window: WindowDefinition {
window_type: CqelsWindowType::TimeSliding,
time_range: Some(Duration::from_secs(60)),
triple_count: None,
slide: Some(Duration::from_secs(10)),
},
};
operators.push(stream_scan);
}
Ok(operators)
}
fn create_execution_plan(&self, query: &CqelsQuery) -> Result<ExecutionPlan> {
let plan_id = uuid::Uuid::new_v4().to_string();
let root = query
.operators
.first()
.cloned()
.ok_or_else(|| anyhow!("No operators in query"))?;
let plan = ExecutionPlan {
id: plan_id,
root,
stats: HashMap::new(),
hints: Vec::new(),
};
if self.config.adaptive_optimization {
self.optimize_plan(&plan)
} else {
Ok(plan)
}
}
fn optimize_plan(&self, plan: &ExecutionPlan) -> Result<ExecutionPlan> {
let mut optimized = plan.clone();
optimized.hints.push(OptimizationHint::PushDownFilter);
debug!("Optimized execution plan: {}", optimized.id);
Ok(optimized)
}
async fn execute_plan(&self, plan: &ExecutionPlan) -> Result<QueryResult> {
debug!("Executing plan: {}", plan.id);
self.execute_operator(&plan.root).await
}
fn execute_operator<'a>(
&'a self,
operator: &'a CqelsOperator,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueryResult>> + 'a>> {
Box::pin(async move { self.execute_operator_impl(operator).await })
}
async fn execute_operator_impl(&self, operator: &CqelsOperator) -> Result<QueryResult> {
match operator {
CqelsOperator::StreamScan { stream_uri, window } => {
self.execute_stream_scan(stream_uri, window).await
}
CqelsOperator::StaticScan { graph_uri, pattern } => {
self.execute_static_scan(graph_uri.as_deref(), pattern)
.await
}
CqelsOperator::StreamJoin {
left,
right,
condition,
} => self.execute_stream_join(left, right, condition).await,
CqelsOperator::HybridJoin {
stream_op,
static_op,
condition,
} => {
self.execute_hybrid_join(stream_op, static_op, condition)
.await
}
CqelsOperator::Filter { input, condition } => {
self.execute_filter(input, condition).await
}
CqelsOperator::Project { input, variables } => {
self.execute_project(input, variables).await
}
CqelsOperator::Aggregate {
input,
functions,
group_by,
} => self.execute_aggregate(input, functions, group_by).await,
}
}
async fn execute_stream_scan(
&self,
stream_uri: &str,
window: &WindowDefinition,
) -> Result<QueryResult> {
let streams = self.streams.read().await;
let stream = streams
.values()
.find(|s| s.uri == stream_uri)
.ok_or_else(|| anyhow!("Stream not found: {}", stream_uri))?;
let triples = self.apply_window(&stream.buffer, window)?;
debug!("Stream scan returned {} triples", triples.len());
Ok(QueryResult {
bindings: Vec::new(),
})
}
async fn execute_static_scan(
&self,
_graph_uri: Option<&str>,
_pattern: &TriplePattern,
) -> Result<QueryResult> {
debug!("Executing static scan");
Ok(QueryResult {
bindings: Vec::new(),
})
}
async fn execute_stream_join(
&self,
_left: &CqelsOperator,
_right: &CqelsOperator,
_condition: &JoinCondition,
) -> Result<QueryResult> {
let left_result = QueryResult {
bindings: Vec::new(),
};
let right_result = QueryResult {
bindings: Vec::new(),
};
debug!(
"Stream join: {} x {} bindings",
left_result.bindings.len(),
right_result.bindings.len()
);
let mut stats = self.stats.write().await;
stats.total_joins_performed += 1;
Ok(QueryResult {
bindings: Vec::new(),
})
}
async fn execute_hybrid_join(
&self,
_stream_op: &CqelsOperator,
_static_op: &CqelsOperator,
_condition: &JoinCondition,
) -> Result<QueryResult> {
let stream_result = QueryResult {
bindings: Vec::new(),
};
let static_result = QueryResult {
bindings: Vec::new(),
};
debug!(
"Hybrid join: {} stream x {} static bindings",
stream_result.bindings.len(),
static_result.bindings.len()
);
Ok(QueryResult {
bindings: Vec::new(),
})
}
async fn execute_filter(
&self,
_input: &CqelsOperator,
_condition: &FilterCondition,
) -> Result<QueryResult> {
let input_result = QueryResult {
bindings: Vec::new(),
};
debug!("Filter applied to {} bindings", input_result.bindings.len());
Ok(input_result)
}
async fn execute_project(
&self,
_input: &CqelsOperator,
_variables: &[String],
) -> Result<QueryResult> {
let input_result = QueryResult {
bindings: Vec::new(),
};
debug!(
"Project applied to {} bindings",
input_result.bindings.len()
);
Ok(input_result)
}
async fn execute_aggregate(
&self,
_input: &CqelsOperator,
_functions: &[AggregateFunction],
_group_by: &[String],
) -> Result<QueryResult> {
let input_result = QueryResult {
bindings: Vec::new(),
};
debug!(
"Aggregate applied to {} bindings",
input_result.bindings.len()
);
Ok(QueryResult {
bindings: Vec::new(),
})
}
fn apply_window(
&self,
buffer: &VecDeque<StreamTriple>,
window: &WindowDefinition,
) -> Result<Vec<Triple>> {
let now = Utc::now();
let mut triples = Vec::new();
match window.window_type {
CqelsWindowType::TimeSliding | CqelsWindowType::TimeTumbling => {
if let Some(time_range) = window.time_range {
let cutoff = now - ChronoDuration::from_std(time_range)?;
for stream_triple in buffer {
if stream_triple.timestamp > cutoff {
triples.push(stream_triple.triple.clone());
}
}
}
}
CqelsWindowType::CountSliding | CqelsWindowType::CountTumbling => {
if let Some(count) = window.triple_count {
triples.extend(buffer.iter().rev().take(count).map(|st| st.triple.clone()));
}
}
CqelsWindowType::Now => {
triples.extend(buffer.iter().map(|st| st.triple.clone()));
}
}
Ok(triples)
}
fn extract_triples_from_event(&self, event: &StreamEvent) -> Result<Vec<Triple>> {
let mut triples = Vec::new();
match event {
StreamEvent::TripleAdded {
subject,
predicate,
object,
graph,
..
} => {
triples.push(Triple {
subject: subject.clone(),
predicate: predicate.clone(),
object: object.clone(),
graph: graph.clone(),
});
}
StreamEvent::QuadAdded {
subject,
predicate,
object,
graph,
..
} => {
triples.push(Triple {
subject: subject.clone(),
predicate: predicate.clone(),
object: object.clone(),
graph: Some(graph.clone()),
});
}
_ => {}
}
Ok(triples)
}
pub async fn get_stats(&self) -> CqelsStats {
self.stats.read().await.clone()
}
pub async fn start_query(&self, query_id: &str) -> Result<()> {
let mut queries = self.queries.write().await;
if let Some(query) = queries.get_mut(query_id) {
query.state = ExecutionState::Running;
info!("Started CQELS query: {}", query_id);
Ok(())
} else {
Err(anyhow!("Query not found: {}", query_id))
}
}
pub async fn stop_query(&self, query_id: &str) -> Result<()> {
let mut queries = self.queries.write().await;
if let Some(query) = queries.get_mut(query_id) {
query.state = ExecutionState::Completed;
info!("Stopped CQELS query: {}", query_id);
Ok(())
} else {
Err(anyhow!("Query not found: {}", query_id))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_cqels_config_defaults() {
let config = CqelsConfig::default();
assert_eq!(config.max_queries, 100);
assert!(config.incremental_evaluation);
assert!(config.adaptive_optimization);
}
#[tokio::test]
async fn test_window_definition() {
let window = WindowDefinition {
window_type: CqelsWindowType::TimeSliding,
time_range: Some(Duration::from_secs(60)),
triple_count: None,
slide: Some(Duration::from_secs(10)),
};
assert_eq!(window.window_type, CqelsWindowType::TimeSliding);
assert!(window.time_range.is_some());
}
#[tokio::test]
async fn test_execution_state() {
let state = ExecutionState::Idle;
assert_eq!(state, ExecutionState::Idle);
let state = ExecutionState::Running;
assert_eq!(state, ExecutionState::Running);
}
#[tokio::test]
async fn test_cqels_stats() {
let stats = CqelsStats {
queries_registered: 10,
queries_executed: 50,
total_stream_triples: 10000,
total_static_triples: 5000,
total_joins_performed: 25,
avg_query_latency_ms: 12.5,
memory_usage_bytes: 1024 * 1024,
};
assert_eq!(stats.queries_registered, 10);
assert_eq!(stats.total_stream_triples, 10000);
}
}