#![cfg(feature = "http")]
use oxirouter::core::source::SourceCapabilities;
use oxirouter::{AggregationStrategy, DataSource, Query, Router};
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
const SPARQL_ONE_BINDING: &str = r#"{"head":{"vars":["name"]},"results":{"bindings":[{"name":{"type":"literal","value":"Test"}}]}}"#;
fn make_server(
response_status: &'static str,
response_body: &'static str,
max_connections: usize,
) -> (SocketAddr, Arc<AtomicBool>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind server");
let addr = listener.local_addr().expect("local_addr");
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
std::thread::spawn(move || {
let mut handled = 0usize;
for stream in listener.incoming() {
if shutdown_clone.load(Ordering::SeqCst) || handled >= max_connections {
break;
}
let shutdown2 = Arc::clone(&shutdown_clone);
if let Ok(mut s) = stream {
std::thread::spawn(move || {
drain_http_request(&mut s);
if !shutdown2.load(Ordering::SeqCst) {
let content_type = if response_status.starts_with("200") {
"application/sparql-results+json"
} else {
"text/plain"
};
let response = format!(
"HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
response_status,
content_type,
response_body.len(),
response_body
);
let _ = s.write_all(response.as_bytes());
}
});
handled += 1;
} else {
break;
}
}
shutdown_clone.store(true, Ordering::SeqCst);
});
(addr, shutdown)
}
fn drain_http_request(stream: &mut TcpStream) {
stream
.set_read_timeout(Some(Duration::from_millis(500)))
.ok();
let mut buf = [0u8; 4096];
let mut received = Vec::with_capacity(512);
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
received.extend_from_slice(&buf[..n]);
if received.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
if received.len() >= 4096 {
break;
}
}
Err(_) => break,
}
}
}
fn make_source(id: &str, addr: SocketAddr) -> DataSource {
DataSource::new(id, format!("http://{}/sparql", addr))
.with_capabilities(SourceCapabilities::full())
}
fn make_query() -> Query {
Query::parse("SELECT ?name WHERE { ?s <http://xmlns.com/foaf/0.1/name> ?name } LIMIT 10")
.expect("parse query")
}
#[test]
fn test_federated_query_first_strategy() {
let (addr1, _s1) = make_server("200 OK", SPARQL_ONE_BINDING, 3);
let (addr2, _s2) = make_server("200 OK", SPARQL_ONE_BINDING, 3);
let mut router = Router::new();
router.add_source(make_source("src1", addr1));
router.add_source(make_source("src2", addr2));
let query = make_query();
let result = router
.federated_query(&query, AggregationStrategy::First)
.expect("federated_query should succeed");
assert_eq!(result.successful_sources, 1, "First strategy: one source");
assert_eq!(result.failed_sources, 0, "First strategy: no failures");
assert_eq!(result.row_count, 1, "First strategy: one binding");
}
#[test]
fn test_federated_query_union_strategy() {
let (addr1, _s1) = make_server("200 OK", SPARQL_ONE_BINDING, 3);
let (addr2, _s2) = make_server("200 OK", SPARQL_ONE_BINDING, 3);
let mut router = Router::new();
router.add_source(make_source("src1", addr1));
router.add_source(make_source("src2", addr2));
let query = make_query();
let result = router
.federated_query(&query, AggregationStrategy::Union)
.expect("union federated_query should succeed");
assert_eq!(result.successful_sources, 2, "Union strategy: two sources");
assert_eq!(result.failed_sources, 0, "Union strategy: no failures");
assert!(!result.data.is_empty(), "Union result should have data");
}
#[test]
fn test_federated_query_all_sources_fail() {
let (addr1, _s1) = make_server("500 Internal Server Error", "error", 3);
let (addr2, _s2) = make_server("500 Internal Server Error", "error", 3);
let mut router = Router::new();
router.add_source(make_source("src1", addr1));
router.add_source(make_source("src2", addr2));
let query = make_query();
let result = router.federated_query(&query, AggregationStrategy::First);
assert!(
result.is_err(),
"Expected error when all sources fail, got: {:?}",
result
);
}
#[test]
fn test_route_and_execute_returns_per_source() {
let (addr1, _s1) = make_server("200 OK", SPARQL_ONE_BINDING, 3);
let (addr2, _s2) = make_server("200 OK", SPARQL_ONE_BINDING, 3);
let mut router = Router::new();
router.add_source(make_source("src1", addr1));
router.add_source(make_source("src2", addr2));
let query = make_query();
let results = router
.route_and_execute(&query)
.expect("route_and_execute should succeed");
assert_eq!(
results.len(),
2,
"route_and_execute should return one result per source"
);
for r in &results {
assert!(!r.source_id.is_empty(), "Each result must have a source_id");
assert!(
r.error.is_none(),
"Expected successful result for {}",
r.source_id
);
}
}
#[test]
fn test_federated_query_no_sources() {
let router = Router::new(); let query = make_query();
let result = router.federated_query(&query, AggregationStrategy::First);
assert!(
result.is_err(),
"Expected NoSources error, got: {:?}",
result
);
let err = result.unwrap_err();
assert!(
matches!(err, oxirouter::OxiRouterError::NoSources { .. }),
"Expected NoSources error variant, got: {:?}",
err
);
}