oxirs-arq 0.1.0

Jena-style SPARQL algebra with extension points and query optimization
Documentation

OxiRS ARQ

Version

SPARQL query engine with algebra and optimization

Status: Production Release (v0.1.0) - Released January 7, 2026

Production Release: Production-ready with API stability guarantees. Semantic versioning enforced.

Overview

oxirs-arq is a SPARQL 1.1/1.2 query engine inspired by Apache Jena's ARQ engine. It provides query optimization, extensible algebra operations, and built-in function support.

Features

  • SPARQL 1.1/1.2 Compliance: Implementation of SPARQL standards
  • Query Optimization: Cost-based optimization with statistics
  • Extensible Algebra: Pluggable operators and custom function support
  • Parallel Execution: Multi-threaded query execution
  • Memory Management: Efficient memory usage with streaming evaluation
  • Custom Functions: Integration of domain-specific functions
  • Query Planning: Join reordering and optimization passes
  • Result Streaming: Support for large result sets
  • Federation Support: SERVICE keyword for distributed queries
  • Caching: Query plan and result caching

Installation

Add to your Cargo.toml:

[dependencies]
oxirs-arq = "0.1.0"

Quick Start

Basic Query Execution

use oxirs_arq::{QueryEngine, Query};
use oxirs_core::Dataset;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create dataset
    let dataset = Dataset::from_file("data.ttl")?;
    
    // Create query engine
    let engine = QueryEngine::new();
    
    // Parse and execute query
    let query = Query::parse(r#"
        SELECT ?person ?name ?age WHERE {
            ?person a foaf:Person ;
                   foaf:name ?name ;
                   foaf:age ?age .
            FILTER (?age > 18)
        }
        ORDER BY ?age
    "#)?;
    
    let results = engine.execute(&query, &dataset).await?;
    
    // Process results
    for binding in results {
        println!("Person: {}, Name: {}, Age: {}", 
                 binding.get("person")?,
                 binding.get("name")?,
                 binding.get("age")?);
    }
    
    Ok(())
}

Custom Function Registration

use oxirs_arq::{QueryEngine, Function, FunctionRegistry, Value};

// Define custom function
struct DistanceFunction;

impl Function for DistanceFunction {
    fn name(&self) -> &str { "distance" }
    
    fn arity(&self) -> usize { 4 } // lat1, lon1, lat2, lon2
    
    async fn call(&self, args: &[Value]) -> Result<Value, String> {
        let lat1 = args[0].as_float()?;
        let lon1 = args[1].as_float()?;
        let lat2 = args[2].as_float()?;
        let lon2 = args[3].as_float()?;
        
        let distance = calculate_haversine_distance(lat1, lon1, lat2, lon2);
        Ok(Value::Float(distance))
    }
}

// Register function
let mut engine = QueryEngine::new();
engine.register_function(DistanceFunction);

// Use in query
let query = Query::parse(r#"
    SELECT ?place ?dist WHERE {
        ?place geo:lat ?lat ; geo:lon ?lon .
        BIND(distance(?lat, ?lon, 40.7128, -74.0060) AS ?dist)
        FILTER(?dist < 100)
    }
"#)?;

Query Optimization

use oxirs_arq::{QueryEngine, Optimizer, OptimizationLevel};

let engine = QueryEngine::builder()
    .optimization_level(OptimizationLevel::Aggressive)
    .enable_statistics(true)
    .enable_caching(true)
    .parallel_execution(true)
    .build();

// Enable specific optimizations
let optimizer = Optimizer::new()
    .enable_join_reordering(true)
    .enable_filter_pushdown(true)
    .enable_projection_pushdown(true)
    .enable_constant_folding(true)
    .enable_dead_code_elimination(true);

engine.set_optimizer(optimizer);

Query Algebra

Algebra Operations

The engine supports a rich set of algebraic operations:

use oxirs_arq::algebra::{Algebra, BGP, Filter, Join, LeftJoin, Union, Extend};

// Build query algebra programmatically
let bgp1 = BGP::new(vec![
    triple_pattern!(?person, rdf:type, foaf:Person),
    triple_pattern!(?person, foaf:name, ?name),
]);

let bgp2 = BGP::new(vec![
    triple_pattern!(?person, foaf:age, ?age),
]);

let join = Join::new(bgp1, bgp2);
let filter = Filter::new(join, expression!(?age > 18));

let algebra = Algebra::from(filter);

Custom Operators

use oxirs_arq::{Operator, ExecutionContext, BindingSet};

struct SampleOperator {
    input: Box<dyn Operator>,
    sample_rate: f64,
}

#[async_trait]
impl Operator for SampleOperator {
    async fn execute(&self, context: &ExecutionContext) -> Result<BindingSet, Error> {
        let input_results = self.input.execute(context).await?;
        let mut rng = rand::thread_rng();
        
        input_results
            .filter(|_| rng.gen::<f64>() < self.sample_rate)
            .collect()
    }
    
    fn cardinality_estimate(&self) -> usize {
        (self.input.cardinality_estimate() as f64 * self.sample_rate) as usize
    }
}

Advanced Features

Parallel Execution

use oxirs_arq::{QueryEngine, ParallelConfig};

let engine = QueryEngine::builder()
    .parallel_config(ParallelConfig {
        max_threads: 8,
        work_stealing: true,
        chunk_size: 1000,
        parallel_threshold: 10000,
    })
    .build();

// Queries are automatically parallelized based on algebra structure
let query = Query::parse(r#"
    SELECT ?s1 ?s2 WHERE {
        {
            SELECT ?s1 WHERE { ?s1 ?p1 ?o1 }
        }
        UNION
        {
            SELECT ?s2 WHERE { ?s2 ?p2 ?o2 }
        }
    }
"#)?;

Result Streaming

use oxirs_arq::{QueryEngine, StreamingConfig};
use futures::stream::StreamExt;

let engine = QueryEngine::builder()
    .streaming_config(StreamingConfig {
        buffer_size: 1000,
        batch_size: 100,
    })
    .build();

let query = Query::parse("SELECT * WHERE { ?s ?p ?o }")?;
let mut stream = engine.execute_streaming(&query, &dataset).await?;

while let Some(batch) = stream.next().await {
    let bindings = batch?;
    // Process batch of results
    for binding in bindings {
        println!("{:?}", binding);
    }
}

Federation

use oxirs_arq::{QueryEngine, FederationConfig, RemoteEndpoint};

let engine = QueryEngine::builder()
    .federation_config(FederationConfig {
        timeout: Duration::from_secs(30),
        max_concurrent_services: 5,
        cache_remote_results: true,
    })
    .remote_endpoint("dbpedia", RemoteEndpoint::new("https://dbpedia.org/sparql"))
    .remote_endpoint("wikidata", RemoteEndpoint::new("https://query.wikidata.org/sparql"))
    .build();

let query = Query::parse(r#"
    SELECT ?person ?name ?birthPlace WHERE {
        ?person foaf:name ?name .
        SERVICE <https://dbpedia.org/sparql> {
            ?person dbo:birthPlace ?birthPlace .
        }
    }
"#)?;

Built-in Functions

String Functions

SELECT ?person ?upperName WHERE {
    ?person foaf:name ?name .
    BIND(UCASE(?name) AS ?upperName)
    FILTER(CONTAINS(?name, "John"))
}

Math Functions

SELECT ?value ?rounded ?sqrt WHERE {
    ?item ex:value ?value .
    BIND(ROUND(?value) AS ?rounded)
    BIND(SQRT(?value) AS ?sqrt)
    FILTER(?value > 0)
}

Date/Time Functions

SELECT ?event ?year ?monthName WHERE {
    ?event ex:date ?date .
    BIND(YEAR(?date) AS ?year)
    BIND(MONTH(?date) AS ?month)
    BIND(IF(?month = 1, "January", 
         IF(?month = 2, "February", ...)) AS ?monthName)
}

Aggregate Functions

SELECT ?category (COUNT(?item) AS ?count) (AVG(?price) AS ?avgPrice) WHERE {
    ?item ex:category ?category ;
          ex:price ?price .
}
GROUP BY ?category
HAVING (?count > 5)
ORDER BY DESC(?avgPrice)

Performance

Benchmarks

Query Type QPS Latency (p95) Memory
Simple BGP 25,000 5ms 15MB
Complex Join 5,000 25ms 45MB
Aggregation 3,000 35ms 35MB
Federation 1,200 150ms 25MB

Optimization Statistics

use oxirs_arq::{QueryEngine, Statistics, StatisticsCollector};

let mut engine = QueryEngine::new();
engine.enable_statistics(true);

// Execute queries to collect statistics
for query in queries {
    engine.execute(&query, &dataset).await?;
}

// View optimization statistics
let stats = engine.statistics();
println!("Query plans optimized: {}", stats.optimized_plans);
println!("Average speedup: {:.2}x", stats.average_speedup);
println!("Cache hit rate: {:.1}%", stats.cache_hit_rate * 100.0);

Configuration

Engine Configuration

query_engine:
  optimization:
    level: "aggressive"
    enable_statistics: true
    enable_caching: true
    cache_size: 1000
    
  execution:
    parallel: true
    max_threads: 8
    streaming: true
    batch_size: 1000
    
  federation:
    timeout: "30s"
    max_concurrent: 5
    cache_results: true
    
  functions:
    custom_namespaces:
      - "http://example.org/functions/"
    enable_extensions: true

Runtime Configuration

use oxirs_arq::{QueryEngine, RuntimeConfig};

let config = RuntimeConfig::builder()
    .query_timeout(Duration::from_secs(300))
    .memory_limit(1024 * 1024 * 1024) // 1GB
    .result_limit(1_000_000)
    .parallel_threshold(10_000)
    .build();

let engine = QueryEngine::with_config(config);

Error Handling

use oxirs_arq::{QueryError, ExecutionError};

match engine.execute(&query, &dataset).await {
    Ok(results) => {
        // Handle successful execution
    }
    Err(QueryError::ParseError(msg)) => {
        eprintln!("Query parse error: {}", msg);
    }
    Err(QueryError::ExecutionError(ExecutionError::Timeout)) => {
        eprintln!("Query execution timed out");
    }
    Err(QueryError::ExecutionError(ExecutionError::MemoryLimit)) => {
        eprintln!("Query exceeded memory limit");
    }
    Err(e) => {
        eprintln!("Unexpected error: {}", e);
    }
}

Related Crates

Development

Running Tests

cd engine/oxirs-arq
cargo test

Benchmarks

cargo bench

Query Plan Visualization

cargo run --example visualize-plan -- query.sparql

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

License

Licensed under either of:

at your option.

Status

🚀 Production Release (v0.1.0) - January 7, 2026

Current implementation status:

  • ✅ Full SPARQL 1.1/1.2 parsing and execution across persisted datasets
  • ✅ Federation (SERVICE) with retries, SERVICE SILENT, and result merging
  • ✅ Parallel execution framework instrumented with SciRS2 metrics
  • ✅ Custom function framework with dynamic registration
  • 🚧 Adaptive cardinality estimation (in progress)

APIs follow semantic versioning. See CHANGELOG.md for details.