use crate::model::Variable;
use crate::query::plan::ExecutionPlan;
use crate::OxirsError;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub struct WasmQueryCompiler {
cache: Arc<RwLock<CompilationCache>>,
target: WasmTarget,
optimization: OptimizationLevel,
}
#[derive(Debug, Clone)]
pub enum WasmTarget {
Wasm1_0,
WasmSimd,
WasmThreads,
WasmSimdThreads,
WasmGC,
}
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub enum OptimizationLevel {
None,
Basic,
Standard,
Aggressive,
Size,
}
struct CompilationCache {
modules: HashMap<QueryHash, CachedModule>,
total_size: usize,
max_size: usize,
}
type QueryHash = u64;
struct CachedModule {
wasm_bytes: Vec<u8>,
#[allow(dead_code)]
metadata: ModuleMetadata,
#[allow(dead_code)]
compile_time: std::time::Duration,
usage_count: usize,
}
#[derive(Debug, Clone)]
pub struct ModuleMetadata {
pub size: usize,
pub memory_pages: u32,
pub exports: Vec<ExportedFunction>,
pub imports: Vec<RequiredImport>,
pub optimization_stats: OptimizationStats,
}
#[derive(Debug, Clone)]
pub struct ExportedFunction {
pub name: String,
pub params: Vec<WasmType>,
pub returns: Vec<WasmType>,
pub kind: FunctionKind,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WasmType {
I32,
I64,
F32,
F64,
V128, FuncRef, ExternRef, }
#[derive(Debug, Clone, PartialEq)]
pub enum FunctionKind {
QueryExec,
TripleMatch,
Join,
Filter,
Aggregate,
Helper(String),
}
#[derive(Debug, Clone)]
pub struct RequiredImport {
pub module: String,
pub name: String,
pub import_type: ImportType,
}
#[derive(Debug, Clone)]
pub enum ImportType {
Function {
params: Vec<WasmType>,
returns: Vec<WasmType>,
},
Memory {
min_pages: u32,
max_pages: Option<u32>,
},
Table {
element_type: WasmType,
min_size: u32,
max_size: Option<u32>,
},
Global { value_type: WasmType, mutable: bool },
}
#[derive(Debug, Clone, Default)]
pub struct OptimizationStats {
pub instructions_eliminated: usize,
pub functions_inlined: usize,
pub dead_code_removed: usize,
pub loops_optimized: usize,
pub memory_optimizations: usize,
}
pub trait WasmRuntime: Send + Sync {
fn instantiate(&self, wasm_bytes: &[u8]) -> Result<Box<dyn WasmInstance>, OxirsError>;
fn validate(&self, wasm_bytes: &[u8]) -> Result<(), OxirsError>;
fn capabilities(&self) -> RuntimeCapabilities;
}
pub trait WasmInstance: Send + Sync {
fn execute_query(&mut self, input: &QueryInput) -> Result<QueryOutput, OxirsError>;
fn memory_usage(&self) -> usize;
fn reset(&mut self);
}
#[derive(Debug, Clone)]
pub struct RuntimeCapabilities {
pub simd: bool,
pub threads: bool,
pub bulk_memory: bool,
pub reference_types: bool,
pub max_memory_pages: u32,
}
pub struct QueryInput {
pub data: Vec<u8>,
pub bindings: HashMap<String, Vec<u8>>,
pub limits: ExecutionLimits,
}
pub struct QueryOutput {
pub results: Vec<u8>,
pub stats: ExecutionStats,
}
#[derive(Debug, Clone)]
pub struct ExecutionLimits {
pub timeout_ms: u32,
pub max_memory: usize,
pub max_results: usize,
}
#[derive(Debug, Clone)]
pub struct ExecutionStats {
pub exec_time_us: u64,
pub memory_used: usize,
pub triples_scanned: usize,
pub results_count: usize,
}
impl WasmQueryCompiler {
pub fn new(target: WasmTarget, optimization: OptimizationLevel) -> Self {
Self {
cache: Arc::new(RwLock::new(CompilationCache::new())),
target,
optimization,
}
}
pub fn compile(&self, plan: &ExecutionPlan) -> Result<Vec<u8>, OxirsError> {
let hash = self.hash_plan(plan);
if let Some(module) = self.get_cached(hash) {
return Ok(module);
}
let wasm_bytes = self.generate_wasm(plan)?;
let optimized = self.optimize_wasm(wasm_bytes)?;
self.cache_module(hash, optimized.clone())?;
Ok(optimized)
}
fn generate_wasm(&self, plan: &ExecutionPlan) -> Result<Vec<u8>, OxirsError> {
let mut builder = WasmModuleBuilder::new(self.target.clone());
builder.add_memory(1, Some(1024))?;
self.add_imports(&mut builder)?;
self.add_data_structures(&mut builder)?;
self.generate_query_function(&mut builder, plan)?;
self.generate_helpers(&mut builder)?;
builder.build()
}
fn add_imports(&self, builder: &mut WasmModuleBuilder) -> Result<(), OxirsError> {
builder.add_import(
"host",
"alloc",
ImportType::Function {
params: vec![WasmType::I32],
returns: vec![WasmType::I32],
},
)?;
builder.add_import(
"host",
"free",
ImportType::Function {
params: vec![WasmType::I32],
returns: vec![],
},
)?;
builder.add_import(
"host",
"log",
ImportType::Function {
params: vec![WasmType::I32, WasmType::I32],
returns: vec![],
},
)?;
Ok(())
}
fn add_data_structures(&self, builder: &mut WasmModuleBuilder) -> Result<(), OxirsError> {
builder.add_struct(
"Term",
vec![
("kind", WasmType::I32),
("value_ptr", WasmType::I32),
("value_len", WasmType::I32),
("datatype_ptr", WasmType::I32),
("datatype_len", WasmType::I32),
("lang_ptr", WasmType::I32),
("lang_len", WasmType::I32),
],
)?;
builder.add_struct(
"Triple",
vec![
("subject", WasmType::I32),
("predicate", WasmType::I32),
("object", WasmType::I32),
],
)?;
builder.add_struct(
"Binding",
vec![
("var_ptr", WasmType::I32),
("var_len", WasmType::I32),
("term", WasmType::I32),
],
)?;
Ok(())
}
fn generate_query_function(
&self,
builder: &mut WasmModuleBuilder,
plan: &ExecutionPlan,
) -> Result<(), OxirsError> {
match plan {
ExecutionPlan::TripleScan { pattern } => {
self.generate_triple_scan(builder, pattern)?;
}
ExecutionPlan::HashJoin {
left,
right,
join_vars,
} => {
self.generate_hash_join(builder, left, right, join_vars)?;
}
_ => {
return Err(OxirsError::Query(
"Unsupported plan type for WASM".to_string(),
))
}
}
Ok(())
}
fn generate_triple_scan(
&self,
builder: &mut WasmModuleBuilder,
_pattern: &crate::model::pattern::TriplePattern,
) -> Result<(), OxirsError> {
builder.add_function(
"query_exec",
vec![
WasmType::I32, WasmType::I32, WasmType::I32, WasmType::I32, ],
vec![WasmType::I32],
|fb| {
fb.local_get(0);
fb.local_get(1);
fb.call("scan_triples");
fb.i32_const(0); },
)?;
Ok(())
}
fn generate_hash_join(
&self,
builder: &mut WasmModuleBuilder,
left: &ExecutionPlan,
right: &ExecutionPlan,
_join_vars: &[Variable],
) -> Result<(), OxirsError> {
self.generate_query_function(builder, left)?;
self.generate_query_function(builder, right)?;
builder.add_function(
"hash_join",
vec![
WasmType::I32, WasmType::I32, WasmType::I32, WasmType::I32, ],
vec![WasmType::I32],
|fb| {
fb.i32_const(0);
},
)?;
Ok(())
}
fn generate_helpers(&self, builder: &mut WasmModuleBuilder) -> Result<(), OxirsError> {
builder.add_function(
"str_eq",
vec![
WasmType::I32,
WasmType::I32, WasmType::I32,
WasmType::I32, ],
vec![WasmType::I32],
|fb| {
fb.local_get(1);
fb.local_get(3);
fb.i32_ne();
fb.if_else(
WasmType::I32,
|fb| {
fb.i32_const(0); },
|fb| {
fb.i32_const(1); },
);
},
)?;
builder.add_function(
"match_term",
vec![
WasmType::I32, WasmType::I32, ],
vec![WasmType::I32],
|fb| {
fb.i32_const(1); },
)?;
Ok(())
}
fn optimize_wasm(&self, wasm_bytes: Vec<u8>) -> Result<Vec<u8>, OxirsError> {
match self.optimization {
OptimizationLevel::None => Ok(wasm_bytes),
_ => {
Ok(wasm_bytes)
}
}
}
fn hash_plan(&self, plan: &ExecutionPlan) -> QueryHash {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
format!("{plan:?}").hash(&mut hasher);
hasher.finish()
}
fn get_cached(&self, hash: QueryHash) -> Option<Vec<u8>> {
let cache = self.cache.read().ok()?;
cache.modules.get(&hash).map(|m| m.wasm_bytes.clone())
}
fn cache_module(&self, hash: QueryHash, wasm_bytes: Vec<u8>) -> Result<(), OxirsError> {
let mut cache = self
.cache
.write()
.map_err(|_| OxirsError::Query("Failed to acquire cache lock".to_string()))?;
let metadata = ModuleMetadata {
size: wasm_bytes.len(),
memory_pages: 1,
exports: vec![],
imports: vec![],
optimization_stats: OptimizationStats::default(),
};
cache.add(
hash,
CachedModule {
wasm_bytes,
metadata,
compile_time: std::time::Duration::from_millis(10),
usage_count: 0,
},
);
Ok(())
}
}
struct WasmModuleBuilder {
#[allow(dead_code)]
target: WasmTarget,
imports: Vec<RequiredImport>,
functions: Vec<FunctionDef>,
memory: Option<MemoryDef>,
structs: Vec<StructDef>,
}
struct FunctionDef {
#[allow(dead_code)]
name: String,
#[allow(dead_code)]
params: Vec<WasmType>,
#[allow(dead_code)]
returns: Vec<WasmType>,
#[allow(dead_code)]
body: Vec<WasmInstruction>,
}
struct MemoryDef {
#[allow(dead_code)]
min_pages: u32,
#[allow(dead_code)]
max_pages: Option<u32>,
}
struct StructDef {
#[allow(dead_code)]
name: String,
#[allow(dead_code)]
fields: Vec<(String, WasmType)>,
}
#[allow(dead_code)]
enum WasmInstruction {
LocalGet(u32),
LocalSet(u32),
I32Const(i32),
I32Add,
I32Sub,
I32Eq,
I32Ne,
Call(String),
If(Vec<WasmInstruction>),
IfElse(WasmType, Vec<WasmInstruction>, Vec<WasmInstruction>),
Return,
}
struct FunctionBuilder {
instructions: Vec<WasmInstruction>,
}
impl FunctionBuilder {
fn new() -> Self {
Self {
instructions: Vec::new(),
}
}
fn local_get(&mut self, idx: u32) {
self.instructions.push(WasmInstruction::LocalGet(idx));
}
#[allow(dead_code)]
fn local_set(&mut self, idx: u32) {
self.instructions.push(WasmInstruction::LocalSet(idx));
}
fn i32_const(&mut self, val: i32) {
self.instructions.push(WasmInstruction::I32Const(val));
}
#[allow(dead_code)]
fn i32_add(&mut self) {
self.instructions.push(WasmInstruction::I32Add);
}
fn i32_ne(&mut self) {
self.instructions.push(WasmInstruction::I32Ne);
}
fn call(&mut self, func: &str) {
self.instructions
.push(WasmInstruction::Call(func.to_string()));
}
fn if_else<F1, F2>(&mut self, result_type: WasmType, then_fn: F1, else_fn: F2)
where
F1: FnOnce(&mut Self),
F2: FnOnce(&mut Self),
{
let mut then_builder = FunctionBuilder::new();
then_fn(&mut then_builder);
let mut else_builder = FunctionBuilder::new();
else_fn(&mut else_builder);
self.instructions.push(WasmInstruction::IfElse(
result_type,
then_builder.instructions,
else_builder.instructions,
));
}
}
impl WasmModuleBuilder {
fn new(target: WasmTarget) -> Self {
Self {
target,
imports: Vec::new(),
functions: Vec::new(),
memory: None,
structs: Vec::new(),
}
}
fn add_import(
&mut self,
module: &str,
name: &str,
import_type: ImportType,
) -> Result<(), OxirsError> {
self.imports.push(RequiredImport {
module: module.to_string(),
name: name.to_string(),
import_type,
});
Ok(())
}
fn add_memory(&mut self, min: u32, max: Option<u32>) -> Result<(), OxirsError> {
self.memory = Some(MemoryDef {
min_pages: min,
max_pages: max,
});
Ok(())
}
fn add_struct(&mut self, name: &str, fields: Vec<(&str, WasmType)>) -> Result<(), OxirsError> {
self.structs.push(StructDef {
name: name.to_string(),
fields: fields
.into_iter()
.map(|(n, t)| (n.to_string(), t))
.collect(),
});
Ok(())
}
fn add_function<F>(
&mut self,
name: &str,
params: Vec<WasmType>,
returns: Vec<WasmType>,
body_fn: F,
) -> Result<(), OxirsError>
where
F: FnOnce(&mut FunctionBuilder),
{
let mut builder = FunctionBuilder::new();
body_fn(&mut builder);
self.functions.push(FunctionDef {
name: name.to_string(),
params,
returns,
body: builder.instructions,
});
Ok(())
}
fn build(self) -> Result<Vec<u8>, OxirsError> {
Ok(vec![0x00, 0x61, 0x73, 0x6d]) }
}
impl CompilationCache {
fn new() -> Self {
Self {
modules: HashMap::new(),
total_size: 0,
max_size: 100 * 1024 * 1024, }
}
fn add(&mut self, hash: QueryHash, module: CachedModule) {
self.total_size += module.wasm_bytes.len();
self.modules.insert(hash, module);
while self.total_size > self.max_size && !self.modules.is_empty() {
if let Some((&hash, _)) = self.modules.iter().min_by_key(|(_, m)| m.usage_count) {
if let Some(removed) = self.modules.remove(&hash) {
self.total_size -= removed.wasm_bytes.len();
}
}
}
}
}
pub struct StreamingWasmCompiler {
base_compiler: WasmQueryCompiler,
chunk_size: usize,
}
impl StreamingWasmCompiler {
pub fn new(target: WasmTarget, optimization: OptimizationLevel) -> Self {
Self {
base_compiler: WasmQueryCompiler::new(target, optimization),
chunk_size: 1024 * 1024, }
}
pub async fn compile_streaming(
&self,
plan: &ExecutionPlan,
) -> Result<impl futures::Stream<Item = Result<Vec<u8>, OxirsError>> + use<>, OxirsError> {
use futures::stream;
let wasm_bytes = self.base_compiler.compile(plan)?;
let chunks: Vec<Vec<u8>> = wasm_bytes
.chunks(self.chunk_size)
.map(|c| c.to_vec())
.collect();
Ok(stream::iter(chunks.into_iter().map(Ok)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wasm_compiler_creation() {
let compiler = WasmQueryCompiler::new(WasmTarget::Wasm1_0, OptimizationLevel::Standard);
let cache = compiler.cache.read().expect("lock should not be poisoned");
assert_eq!(cache.modules.len(), 0);
}
#[test]
fn test_module_builder() {
let mut builder = WasmModuleBuilder::new(WasmTarget::Wasm1_0);
builder
.add_memory(1, Some(16))
.expect("add memory should succeed");
builder
.add_import(
"host",
"log",
ImportType::Function {
params: vec![WasmType::I32],
returns: vec![],
},
)
.expect("operation should succeed");
builder
.add_function("test", vec![], vec![WasmType::I32], |fb| {
fb.i32_const(42);
})
.expect("operation should succeed");
let wasm = builder.build().expect("operation should succeed");
assert!(!wasm.is_empty());
}
#[test]
fn test_optimization_levels() {
assert_eq!(OptimizationLevel::None as u8, 0);
assert!(OptimizationLevel::Aggressive > OptimizationLevel::Standard);
}
}