#![cfg(feature = "http")]
use oxirouter::federation::{ExecutionConfig, Executor};
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
const SPARQL_EMPTY_JSON: &str = r#"{"head":{"vars":["s"]},"results":{"bindings":[]}}"#;
fn make_multi_connection_server(
delay_ms: u64,
max_connections: usize,
response_body: &str,
) -> (SocketAddr, Arc<AtomicBool>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind multi server");
let addr = listener.local_addr().expect("local_addr");
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
let body = response_body.to_owned();
std::thread::spawn(move || {
let mut handled = 0usize;
for stream in listener.incoming() {
if shutdown_clone.load(Ordering::SeqCst) || handled >= max_connections {
break;
}
let body_clone = body.clone();
let shutdown2 = Arc::clone(&shutdown_clone);
if let Ok(mut s) = stream {
std::thread::spawn(move || {
drain_http_request(&mut s);
if delay_ms > 0 {
std::thread::sleep(Duration::from_millis(delay_ms));
}
if !shutdown2.load(Ordering::SeqCst) {
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/sparql-results+json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body_clone.len(),
body_clone
);
let _ = s.write_all(response.as_bytes());
}
});
handled += 1;
} else {
break;
}
}
shutdown_clone.store(true, Ordering::SeqCst);
});
(addr, shutdown)
}
fn drain_http_request(stream: &mut TcpStream) {
stream
.set_read_timeout(Some(Duration::from_millis(500)))
.ok();
let mut buf = [0u8; 4096];
let mut received = Vec::with_capacity(512);
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
received.extend_from_slice(&buf[..n]);
if received.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
if received.len() >= 4096 {
break;
}
}
Err(_) => break,
}
}
}
fn make_source(id: &str, addr: SocketAddr) -> oxirouter::DataSource {
use oxirouter::core::source::SourceCapabilities;
oxirouter::DataSource::new(id, format!("http://{}/sparql", addr))
.with_capabilities(SourceCapabilities::full())
}
fn make_simple_query() -> oxirouter::Query {
oxirouter::Query::parse("SELECT ?s WHERE { ?s ?p ?o } LIMIT 1").expect("parse query")
}
fn make_ranking(source_ids: &[&str]) -> oxirouter::core::source::SourceRanking {
use oxirouter::core::source::{SelectionReason, SourceRanking, SourceSelection};
let mut ranking = SourceRanking::new();
for id in source_ids {
ranking.add(SourceSelection {
source_id: (*id).to_string(),
confidence: 1.0,
estimated_latency_ms: 100,
reason: SelectionReason::Fallback,
});
}
ranking
}
#[test]
fn test_parallel_two_sources_concurrent() {
let (addr1, _shutdown1) = make_multi_connection_server(100, 1, SPARQL_EMPTY_JSON);
let (addr2, _shutdown2) = make_multi_connection_server(100, 1, SPARQL_EMPTY_JSON);
let source1 = make_source("src1", addr1);
let source2 = make_source("src2", addr2);
let config = ExecutionConfig {
timeout_ms: 2_000,
max_retries: 0,
parallel: true,
max_concurrency: 4,
..ExecutionConfig::default()
};
let executor = Executor::with_config(config);
let query = make_simple_query();
let ranking = make_ranking(&["src1", "src2"]);
let sources: Vec<&oxirouter::DataSource> = vec![&source1, &source2];
let t0 = Instant::now();
let results = executor
.execute(&query, &sources, &ranking)
.expect("execute");
let elapsed = t0.elapsed();
assert_eq!(results.len(), 2, "expected 2 results");
assert!(
elapsed < Duration::from_millis(1800),
"parallel execution took too long: {:?}",
elapsed
);
if elapsed > Duration::from_millis(250) {
return;
}
assert!(
elapsed < Duration::from_millis(160),
"parallel execution was not concurrent: {:?} >= 160ms",
elapsed
);
}
#[test]
fn test_parallel_per_source_timeout() {
let (fast_addr, _shutdown_fast) = make_multi_connection_server(50, 1, SPARQL_EMPTY_JSON);
let (slow_addr, _shutdown_slow) = make_multi_connection_server(5_000, 1, SPARQL_EMPTY_JSON);
let source_fast = make_source("fast", fast_addr);
let source_slow = make_source("slow", slow_addr);
let config = ExecutionConfig {
timeout_ms: 200, max_retries: 0, parallel: true,
max_concurrency: 4,
..ExecutionConfig::default()
};
let executor = Executor::with_config(config);
let query = make_simple_query();
let ranking = make_ranking(&["fast", "slow"]);
let sources: Vec<&oxirouter::DataSource> = vec![&source_fast, &source_slow];
let t0 = Instant::now();
let results = executor
.execute(&query, &sources, &ranking)
.expect("execute");
let elapsed = t0.elapsed();
assert_eq!(results.len(), 2, "expected 2 results");
let slow_result = results
.iter()
.find(|r| r.source_id == "slow")
.expect("slow result missing");
assert!(
slow_result.error.is_some(),
"slow source should have timed out, got: {:?}",
slow_result
);
assert!(
elapsed < Duration::from_millis(1500),
"per-source timeout not enforced: {:?}",
elapsed
);
}
#[test]
fn test_parallel_source_order_preserved() {
let delays = [150u64, 50, 100];
let mut addrs = Vec::new();
let mut shutdowns = Vec::new();
for &d in &delays {
let (addr, shutdown) = make_multi_connection_server(d, 1, SPARQL_EMPTY_JSON);
addrs.push(addr);
shutdowns.push(shutdown);
}
let source0 = make_source("s0", addrs[0]);
let source1 = make_source("s1", addrs[1]);
let source2 = make_source("s2", addrs[2]);
let config = ExecutionConfig {
timeout_ms: 2_000,
max_retries: 0,
parallel: true,
max_concurrency: 4,
..ExecutionConfig::default()
};
let executor = Executor::with_config(config);
let query = make_simple_query();
let ranking = make_ranking(&["s0", "s1", "s2"]);
let sources: Vec<&oxirouter::DataSource> = vec![&source0, &source1, &source2];
let results = executor
.execute(&query, &sources, &ranking)
.expect("execute");
assert_eq!(results.len(), 3, "expected 3 results");
assert_eq!(results[0].source_id, "s0", "result[0] source mismatch");
assert_eq!(results[1].source_id, "s1", "result[1] source mismatch");
assert_eq!(results[2].source_id, "s2", "result[2] source mismatch");
}
#[test]
fn test_parallel_all_timeout() {
let (addr1, _s1) = make_multi_connection_server(5_000, 1, SPARQL_EMPTY_JSON);
let (addr2, _s2) = make_multi_connection_server(5_000, 1, SPARQL_EMPTY_JSON);
let source1 = make_source("t1", addr1);
let source2 = make_source("t2", addr2);
let config = ExecutionConfig {
timeout_ms: 200, max_retries: 0,
parallel: true,
max_concurrency: 4,
..ExecutionConfig::default()
};
let executor = Executor::with_config(config);
let query = make_simple_query();
let ranking = make_ranking(&["t1", "t2"]);
let sources: Vec<&oxirouter::DataSource> = vec![&source1, &source2];
let t0 = Instant::now();
let results = executor
.execute(&query, &sources, &ranking)
.expect("execute");
let elapsed = t0.elapsed();
assert_eq!(results.len(), 2, "expected 2 results");
for result in &results {
assert!(
result.error.is_some(),
"expected timeout error for {}, got success",
result.source_id
);
}
assert!(
elapsed < Duration::from_millis(1500),
"all-timeout test took too long: {:?}",
elapsed
);
}