pub mod compiler;
pub mod ir;
use crate::{StarResult, StarStore, StarTriple};
use compiler::SparqlJitCompiler;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, instrument};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JitConfig {
pub compilation_threshold: usize,
pub enable_background_compilation: bool,
pub max_cached_plans: usize,
pub plan_cache_ttl_secs: u64,
pub enable_hot_path_detection: bool,
pub profiling_sample_rate: f64,
pub enable_plan_optimization: bool,
}
impl Default for JitConfig {
fn default() -> Self {
Self {
compilation_threshold: 10,
enable_background_compilation: true,
max_cached_plans: 1000,
plan_cache_ttl_secs: 3600,
enable_hot_path_detection: true,
profiling_sample_rate: 0.1,
enable_plan_optimization: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CompilationStrategy {
AlwaysInterpret,
AlwaysCompile,
Adaptive,
ProfileGuided,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionMode {
Interpreted,
Compiled,
Profiling,
}
#[derive(Debug, Clone)]
pub struct QueryPlan {
pub query: String,
pub hash: u64,
pub estimated_cost: f64,
pub mode: ExecutionMode,
pub compiled_at: Option<Instant>,
pub execution_count: usize,
pub total_execution_time: Duration,
pub avg_execution_time: Duration,
}
impl QueryPlan {
pub fn new(query: String) -> Self {
let hash = Self::compute_hash(&query);
Self {
query,
hash,
estimated_cost: 0.0,
mode: ExecutionMode::Interpreted,
compiled_at: None,
execution_count: 0,
total_execution_time: Duration::ZERO,
avg_execution_time: Duration::ZERO,
}
}
fn compute_hash(query: &str) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query.hash(&mut hasher);
hasher.finish()
}
pub fn update_stats(&mut self, execution_time: Duration) {
self.execution_count += 1;
self.total_execution_time += execution_time;
self.avg_execution_time = self.total_execution_time / self.execution_count as u32;
}
pub fn is_hot(&self, threshold: usize) -> bool {
self.execution_count >= threshold && self.mode == ExecutionMode::Interpreted
}
}
#[derive(Debug, Clone)]
pub struct CompiledQuery {
pub plan: QueryPlan,
pub compiled_code: Option<Arc<Vec<u8>>>,
pub compilation_time: Duration,
pub compiled_at: Instant,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QueryStats {
pub total_queries: u64,
pub interpreted_count: u64,
pub compiled_count: u64,
pub total_compilation_time: Duration,
pub cache_hits: u64,
pub cache_misses: u64,
pub avg_interpreted_time: Duration,
pub avg_compiled_time: Duration,
}
pub struct JitQueryEngine {
config: JitConfig,
plan_cache: Arc<RwLock<HashMap<u64, QueryPlan>>>,
compiled_cache: Arc<RwLock<HashMap<u64, CompiledQuery>>>,
stats: Arc<RwLock<QueryStats>>,
strategy: CompilationStrategy,
jit_compiler: Arc<RwLock<SparqlJitCompiler>>,
}
impl JitQueryEngine {
pub fn new() -> Self {
Self::with_config(JitConfig::default())
}
pub fn with_config(config: JitConfig) -> Self {
let jit_compiler = SparqlJitCompiler::new().unwrap_or_else(|e| {
tracing::warn!("Failed to initialize JIT compiler: {}, using fallback", e);
SparqlJitCompiler::default()
});
Self {
config,
plan_cache: Arc::new(RwLock::new(HashMap::new())),
compiled_cache: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(QueryStats::default())),
strategy: CompilationStrategy::Adaptive,
jit_compiler: Arc::new(RwLock::new(jit_compiler)),
}
}
pub fn set_strategy(&mut self, strategy: CompilationStrategy) {
self.strategy = strategy;
}
pub fn set_compilation_threshold(&mut self, threshold: usize) {
self.config.compilation_threshold = threshold;
}
#[instrument(skip(self, store))]
pub async fn execute(&self, query: &str, store: &StarStore) -> StarResult<Vec<StarTriple>> {
let start = Instant::now();
let hash = QueryPlan::compute_hash(query);
{
let mut stats = self.stats.write().await;
stats.total_queries += 1;
}
let compiled = self.compiled_cache.read().await;
if let Some(compiled_query) = compiled.get(&hash) {
debug!("Cache hit: executing compiled query (hash: {})", hash);
let plan = compiled_query.plan.clone();
drop(compiled);
let result = self.execute_compiled(&plan, store).await?;
let execution_time = start.elapsed();
let mut stats = self.stats.write().await;
stats.compiled_count += 1;
stats.cache_hits += 1;
stats.avg_compiled_time = if stats.compiled_count == 1 {
execution_time
} else {
(stats.avg_compiled_time * (stats.compiled_count - 1) as u32 + execution_time)
/ stats.compiled_count as u32
};
return Ok(result);
}
drop(compiled);
let mut plan_cache = self.plan_cache.write().await;
let plan = plan_cache
.entry(hash)
.or_insert_with(|| QueryPlan::new(query.to_string()));
let execution_time = start.elapsed();
plan.update_stats(execution_time);
let should_compile = match self.strategy {
CompilationStrategy::AlwaysInterpret => false,
CompilationStrategy::AlwaysCompile => true,
CompilationStrategy::Adaptive => plan.is_hot(self.config.compilation_threshold),
CompilationStrategy::ProfileGuided => {
plan.is_hot(self.config.compilation_threshold)
&& self.config.enable_hot_path_detection
}
};
if should_compile {
info!(
"Query is hot (executed {} times), triggering JIT compilation",
plan.execution_count
);
let plan_for_compilation = plan.clone();
drop(plan_cache);
if self.config.enable_background_compilation {
let compiled_cache = self.compiled_cache.clone();
let stats = self.stats.clone();
let jit_compiler = self.jit_compiler.clone();
tokio::spawn(async move {
if let Ok(compiled) =
Self::compile_query_internal(plan_for_compilation, jit_compiler).await
{
let mut cache = compiled_cache.write().await;
let mut stats_guard = stats.write().await;
stats_guard.total_compilation_time += compiled.compilation_time;
cache.insert(hash, compiled);
info!("Background compilation complete for query hash {}", hash);
}
});
} else {
let compiled =
Self::compile_query_internal(plan_for_compilation, self.jit_compiler.clone())
.await?;
let mut cache = self.compiled_cache.write().await;
let mut stats = self.stats.write().await;
stats.total_compilation_time += compiled.compilation_time;
cache.insert(hash, compiled);
}
}
let result = self.execute_interpreted(query, store).await?;
let mut stats = self.stats.write().await;
stats.interpreted_count += 1;
stats.cache_misses += 1;
stats.avg_interpreted_time = if stats.interpreted_count == 1 {
execution_time
} else {
(stats.avg_interpreted_time * (stats.interpreted_count - 1) as u32 + execution_time)
/ stats.interpreted_count as u32
};
Ok(result)
}
async fn execute_interpreted(
&self,
query: &str,
store: &StarStore,
) -> StarResult<Vec<StarTriple>> {
debug!("Executing query in interpreted mode: {}", query);
Ok(store.all_triples())
}
async fn execute_compiled(
&self,
plan: &QueryPlan,
store: &StarStore,
) -> StarResult<Vec<StarTriple>> {
debug!(
"Executing JIT-compiled query (executions: {})",
plan.execution_count
);
let hash = plan.hash;
let compiled = self.compiled_cache.read().await;
if let Some(compiled_query) = compiled.get(&hash) {
if let Some(compiled_code) = &compiled_query.compiled_code {
let kernel_id = String::from_utf8_lossy(compiled_code).to_string();
drop(compiled);
let compiler = self.jit_compiler.read().await;
return compiler.execute_compiled(&kernel_id, store);
}
}
drop(compiled);
debug!("Compiled code not available, falling back to interpreted mode");
self.execute_interpreted(&plan.query, store).await
}
async fn compile_query_internal(
plan: QueryPlan,
jit_compiler: Arc<RwLock<SparqlJitCompiler>>,
) -> StarResult<CompiledQuery> {
let start = Instant::now();
debug!("Compiling query with scirs2_core::jit: {}", plan.query);
let mut compiler = jit_compiler.write().await;
let ir_plan =
compiler
.parse_to_ir(&plan.query)
.map_err(|e| crate::StarError::QueryError {
message: format!("IR parsing failed: {}", e),
query_fragment: Some(plan.query.clone()),
position: None,
suggestion: None,
})?;
let kernel_id =
compiler
.compile_ir(&ir_plan)
.map_err(|e| crate::StarError::QueryError {
message: format!("JIT compilation failed: {}", e),
query_fragment: Some(plan.query.clone()),
position: None,
suggestion: Some("Check query syntax or disable JIT compilation".to_string()),
})?;
info!("Successfully compiled query to kernel: {}", kernel_id);
let compilation_time = start.elapsed();
let compiled_code = Some(Arc::new(kernel_id.into_bytes()));
Ok(CompiledQuery {
plan: QueryPlan {
mode: ExecutionMode::Compiled,
compiled_at: Some(Instant::now()),
..plan
},
compiled_code,
compilation_time,
compiled_at: Instant::now(),
})
}
pub async fn stats(&self) -> QueryStats {
self.stats.read().await.clone()
}
pub async fn clear_caches(&self) {
let mut plan_cache = self.plan_cache.write().await;
let mut compiled_cache = self.compiled_cache.write().await;
plan_cache.clear();
compiled_cache.clear();
info!("Caches cleared");
}
pub async fn cache_stats(&self) -> (usize, usize) {
let plan_cache = self.plan_cache.read().await;
let compiled_cache = self.compiled_cache.read().await;
(plan_cache.len(), compiled_cache.len())
}
pub async fn invalidate_query(&self, query: &str) {
let hash = QueryPlan::compute_hash(query);
let mut plan_cache = self.plan_cache.write().await;
let mut compiled_cache = self.compiled_cache.write().await;
plan_cache.remove(&hash);
compiled_cache.remove(&hash);
debug!("Invalidated query from cache (hash: {})", hash);
}
pub async fn hot_queries(&self, limit: usize) -> Vec<QueryPlan> {
let plan_cache = self.plan_cache.read().await;
let mut plans: Vec<_> = plan_cache.values().cloned().collect();
plans.sort_by_key(|b| std::cmp::Reverse(b.execution_count));
plans.into_iter().take(limit).collect()
}
pub async fn compilation_candidates(&self) -> Vec<QueryPlan> {
let plan_cache = self.plan_cache.read().await;
plan_cache
.values()
.filter(|p| p.is_hot(self.config.compilation_threshold))
.cloned()
.collect()
}
pub async fn precompile(&self, query: &str) -> StarResult<()> {
let hash = QueryPlan::compute_hash(query);
let plan = QueryPlan::new(query.to_string());
let compiled = Self::compile_query_internal(plan, self.jit_compiler.clone()).await?;
let mut cache = self.compiled_cache.write().await;
let mut stats = self.stats.write().await;
stats.total_compilation_time += compiled.compilation_time;
cache.insert(hash, compiled);
info!("Precompiled query (hash: {})", hash);
Ok(())
}
}
impl Default for JitQueryEngine {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_jit_config_default() {
let config = JitConfig::default();
assert_eq!(config.compilation_threshold, 10);
assert!(config.enable_background_compilation);
assert_eq!(config.max_cached_plans, 1000);
}
#[test]
fn test_query_plan_creation() {
let plan = QueryPlan::new("SELECT * WHERE { ?s ?p ?o }".to_string());
assert_eq!(plan.execution_count, 0);
assert_eq!(plan.mode, ExecutionMode::Interpreted);
assert!(plan.compiled_at.is_none());
}
#[test]
fn test_query_plan_stats_update() {
let mut plan = QueryPlan::new("SELECT * WHERE { ?s ?p ?o }".to_string());
plan.update_stats(Duration::from_millis(100));
assert_eq!(plan.execution_count, 1);
assert_eq!(plan.avg_execution_time, Duration::from_millis(100));
plan.update_stats(Duration::from_millis(200));
assert_eq!(plan.execution_count, 2);
assert_eq!(plan.avg_execution_time, Duration::from_millis(150));
}
#[test]
fn test_query_plan_is_hot() {
let mut plan = QueryPlan::new("SELECT * WHERE { ?s ?p ?o }".to_string());
assert!(!plan.is_hot(10));
for _ in 0..10 {
plan.update_stats(Duration::from_millis(100));
}
assert!(plan.is_hot(10));
}
#[test]
fn test_jit_engine_creation() {
let engine = JitQueryEngine::new();
assert_eq!(engine.strategy, CompilationStrategy::Adaptive);
}
#[test]
fn test_jit_engine_strategy() {
let mut engine = JitQueryEngine::new();
engine.set_strategy(CompilationStrategy::AlwaysCompile);
assert_eq!(engine.strategy, CompilationStrategy::AlwaysCompile);
}
#[tokio::test]
async fn test_jit_engine_execute_interpreted() {
let engine = JitQueryEngine::new();
let store = StarStore::new();
let result = engine
.execute("SELECT * WHERE { ?s ?p ?o }", &store)
.await
.unwrap();
assert!(result.is_empty());
let stats = engine.stats().await;
assert_eq!(stats.total_queries, 1);
assert_eq!(stats.interpreted_count, 1);
}
#[tokio::test]
async fn test_jit_engine_cache_stats() {
let engine = JitQueryEngine::new();
let (plan_count, compiled_count) = engine.cache_stats().await;
assert_eq!(plan_count, 0);
assert_eq!(compiled_count, 0);
}
#[tokio::test]
async fn test_jit_engine_clear_caches() {
let engine = JitQueryEngine::new();
let store = StarStore::new();
engine
.execute("SELECT * WHERE { ?s ?p ?o }", &store)
.await
.unwrap();
let (plan_count, _) = engine.cache_stats().await;
assert_eq!(plan_count, 1);
engine.clear_caches().await;
let (plan_count, compiled_count) = engine.cache_stats().await;
assert_eq!(plan_count, 0);
assert_eq!(compiled_count, 0);
}
#[tokio::test]
async fn test_jit_engine_hot_queries() {
let engine = JitQueryEngine::new();
let store = StarStore::new();
for _ in 0..15 {
engine
.execute("SELECT * WHERE { ?s ?p ?o }", &store)
.await
.unwrap();
}
let hot = engine.hot_queries(5).await;
assert_eq!(hot.len(), 1);
assert_eq!(hot[0].execution_count, 15);
}
#[tokio::test]
async fn test_jit_engine_compilation_candidates() {
let engine = JitQueryEngine::new();
let store = StarStore::new();
for _ in 0..9 {
engine
.execute("SELECT * WHERE { ?s ?p ?o }", &store)
.await
.unwrap();
}
let candidates = engine.compilation_candidates().await;
assert_eq!(candidates.len(), 0);
engine
.execute("SELECT * WHERE { ?s ?p ?o }", &store)
.await
.unwrap();
let candidates = engine.compilation_candidates().await;
assert_eq!(candidates.len(), 1); }
#[tokio::test]
async fn test_jit_engine_invalidate_query() {
let engine = JitQueryEngine::new();
let store = StarStore::new();
let query = "SELECT * WHERE { ?s ?p ?o }";
engine.execute(query, &store).await.unwrap();
let (plan_count, _) = engine.cache_stats().await;
assert_eq!(plan_count, 1);
engine.invalidate_query(query).await;
let (plan_count, _) = engine.cache_stats().await;
assert_eq!(plan_count, 0);
}
}