use crate::{GraphRAGError, GraphRAGResult, ScoredEntity, SparqlEngineTrait, Triple};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Semaphore;
use tracing::{debug, warn};
#[derive(Debug, Clone)]
pub struct StreamingSparqlConfig {
pub page_size: usize,
pub max_total_triples: usize,
pub channel_capacity: usize,
pub max_concurrency: usize,
pub deduplicate: bool,
pub min_seed_score: f64,
pub expansion_hops: usize,
}
impl Default for StreamingSparqlConfig {
fn default() -> Self {
Self {
page_size: 1_000,
max_total_triples: 50_000,
channel_capacity: 512,
max_concurrency: 4,
deduplicate: true,
min_seed_score: 0.0,
expansion_hops: 2,
}
}
}
#[derive(Debug, Clone)]
pub struct TriplePage {
pub triples: Vec<Triple>,
pub page_index: usize,
pub cumulative_count: usize,
pub is_last_page: bool,
pub seed_uri: String,
}
pub struct SubgraphStream {
receiver: mpsc::Receiver<GraphRAGResult<TriplePage>>,
}
impl SubgraphStream {
pub async fn next_page(&mut self) -> Option<GraphRAGResult<TriplePage>> {
self.receiver.recv().await
}
pub async fn collect_all(mut self) -> GraphRAGResult<Vec<Triple>> {
let mut result: Vec<Triple> = Vec::new();
while let Some(page_result) = self.next_page().await {
match page_result {
Ok(page) => result.extend(page.triples),
Err(e) => return Err(e),
}
}
Ok(result)
}
}
pub struct StreamingSparqlRetriever<S: SparqlEngineTrait> {
engine: Arc<S>,
config: StreamingSparqlConfig,
}
impl<S: SparqlEngineTrait + 'static> StreamingSparqlRetriever<S> {
pub fn new(engine: Arc<S>, config: StreamingSparqlConfig) -> Self {
Self { engine, config }
}
pub fn with_defaults(engine: Arc<S>) -> Self {
Self::new(engine, StreamingSparqlConfig::default())
}
pub fn stream_subgraph(&self, seeds: Vec<ScoredEntity>) -> SubgraphStream {
let (tx, rx) = mpsc::channel(self.config.channel_capacity);
let engine = Arc::clone(&self.engine);
let config = self.config.clone();
tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
let filtered_seeds: Vec<ScoredEntity> = seeds
.into_iter()
.filter(|s| s.score >= config.min_seed_score)
.collect();
for seed in filtered_seeds {
let seed_uri = seed.uri.clone();
let mut offset = 0usize;
let mut cumulative = 0usize;
let mut seen: HashSet<(String, String, String)> = HashSet::new();
loop {
let permit = match semaphore.clone().acquire_owned().await {
Ok(p) => p,
Err(e) => {
let _ = tx
.send(Err(GraphRAGError::InternalError(format!(
"Semaphore acquire failed: {e}"
))))
.await;
return;
}
};
let sparql = build_expansion_query(
&seed_uri,
config.expansion_hops,
config.page_size,
offset,
);
let raw_triples = engine.construct(&sparql).await;
drop(permit);
let raw_triples = match raw_triples {
Ok(t) => t,
Err(e) => {
warn!("SPARQL page fetch failed for {seed_uri}: {e}");
let _ = tx
.send(Err(GraphRAGError::SparqlError(format!(
"Page fetch error for {seed_uri}: {e}"
))))
.await;
break;
}
};
let page_len = raw_triples.len();
debug!(
seed = %seed_uri,
offset,
fetched = page_len,
"Fetched SPARQL page"
);
if page_len == 0 {
break;
}
let triples: Vec<Triple> = if config.deduplicate {
raw_triples
.into_iter()
.filter(|t| {
let key =
(t.subject.clone(), t.predicate.clone(), t.object.clone());
seen.insert(key)
})
.collect()
} else {
raw_triples
};
cumulative += triples.len();
let (triples, is_last) =
if config.max_total_triples > 0 && cumulative >= config.max_total_triples {
let excess = cumulative - config.max_total_triples;
let capped_len = triples.len().saturating_sub(excess);
let mut t = triples;
t.truncate(capped_len);
(t, true)
} else {
let exhausted = page_len < config.page_size;
(triples, exhausted)
};
let page = TriplePage {
triples,
page_index: offset / config.page_size,
cumulative_count: cumulative,
is_last_page: is_last,
seed_uri: seed_uri.clone(),
};
if tx.send(Ok(page)).await.is_err() {
return;
}
if is_last {
break;
}
offset += config.page_size;
}
}
});
SubgraphStream { receiver: rx }
}
pub async fn collect_subgraph(&self, seeds: Vec<ScoredEntity>) -> GraphRAGResult<Vec<Triple>> {
self.stream_subgraph(seeds).collect_all().await
}
}
fn build_expansion_query(seed_uri: &str, hops: usize, limit: usize, offset: usize) -> String {
let path = if hops <= 1 {
"?p".to_string()
} else {
format!("(:|!:){{1,{}}}", hops)
};
format!(
r#"
CONSTRUCT {{
<{seed}> ?p1 ?o1 .
?s1 ?p2 <{seed}> .
?o1 {path} ?o2 .
}}
WHERE {{
{{
<{seed}> ?p1 ?o1 .
}} UNION {{
?s1 ?p2 <{seed}> .
}} UNION {{
<{seed}> ?p_mid ?mid .
?mid {path} ?o2 .
}}
}}
LIMIT {limit}
OFFSET {offset}
"#,
seed = seed_uri,
path = path,
limit = limit,
offset = offset,
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{GraphRAGResult, ScoreSource, SparqlEngineTrait, Triple};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
struct MockSparql {
triples: Vec<Triple>,
page_call_count: Arc<AtomicUsize>,
}
impl MockSparql {
fn new(triples: Vec<Triple>) -> Self {
Self {
triples,
page_call_count: Arc::new(AtomicUsize::new(0)),
}
}
}
#[async_trait]
impl SparqlEngineTrait for MockSparql {
async fn select(&self, _query: &str) -> GraphRAGResult<Vec<HashMap<String, String>>> {
Ok(vec![])
}
async fn ask(&self, _query: &str) -> GraphRAGResult<bool> {
Ok(false)
}
async fn construct(&self, query: &str) -> GraphRAGResult<Vec<Triple>> {
self.page_call_count.fetch_add(1, Ordering::Relaxed);
let offset: usize = query
.lines()
.find(|l| l.trim_start().starts_with("OFFSET"))
.and_then(|l| l.split_whitespace().nth(1))
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let limit: usize = query
.lines()
.find(|l| l.trim_start().starts_with("LIMIT"))
.and_then(|l| l.split_whitespace().nth(1))
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
let slice: Vec<Triple> = self
.triples
.iter()
.skip(offset)
.take(limit)
.cloned()
.collect();
Ok(slice)
}
}
fn make_seed(uri: &str) -> ScoredEntity {
ScoredEntity {
uri: uri.to_string(),
score: 0.9,
source: ScoreSource::Vector,
metadata: HashMap::new(),
}
}
fn make_triples(n: usize) -> Vec<Triple> {
(0..n)
.map(|i| {
Triple::new(
format!("http://s/{i}"),
"http://p/rel",
format!("http://o/{i}"),
)
})
.collect()
}
#[tokio::test]
async fn test_stream_collects_all_triples() {
let triples = make_triples(50);
let engine = Arc::new(MockSparql::new(triples.clone()));
let config = StreamingSparqlConfig {
page_size: 20,
max_total_triples: 100,
channel_capacity: 8,
max_concurrency: 2,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://example.org/seed1")];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 50);
}
#[tokio::test]
async fn test_stream_respects_max_total_triples() {
let triples = make_triples(3_000);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 1_000,
max_total_triples: 2_500,
channel_capacity: 16,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://example.org/seed1")];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert!(
collected.len() <= 2_500,
"Expected at most 2500 triples, got {}",
collected.len()
);
}
#[tokio::test]
async fn test_stream_deduplicates() {
let triple = Triple::new("http://s", "http://p", "http://o");
let triples: Vec<Triple> = vec![triple.clone(); 100];
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 50,
max_total_triples: 200,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: true,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://example.org/seed1")];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 1);
}
#[tokio::test]
async fn test_stream_empty_seeds() {
let engine = Arc::new(MockSparql::new(vec![]));
let retriever = StreamingSparqlRetriever::with_defaults(engine);
let collected = retriever
.collect_subgraph(vec![])
.await
.expect("should succeed");
assert!(collected.is_empty());
}
#[tokio::test]
async fn test_stream_filters_low_score_seeds() {
let triples = make_triples(10);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
min_seed_score: 0.8,
..Default::default()
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![ScoredEntity {
uri: "http://seed".to_string(),
score: 0.5,
source: ScoreSource::Vector,
metadata: HashMap::new(),
}];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert!(collected.is_empty());
}
#[tokio::test]
async fn test_stream_multiple_seeds() {
let seed1_triples: Vec<Triple> = (0..5)
.map(|i| {
Triple::new(
format!("http://s1/{i}"),
"http://p",
format!("http://o1/{i}"),
)
})
.collect();
let seed2_triples: Vec<Triple> = (0..5)
.map(|i| {
Triple::new(
format!("http://s2/{i}"),
"http://p",
format!("http://o2/{i}"),
)
})
.collect();
let mut all_triples = seed1_triples;
all_triples.extend(seed2_triples);
let engine = Arc::new(MockSparql::new(all_triples));
let config = StreamingSparqlConfig {
page_size: 100,
max_total_triples: 100,
channel_capacity: 8,
max_concurrency: 2,
deduplicate: true,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![
make_seed("http://example.org/s1"),
make_seed("http://example.org/s2"),
];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert!(!collected.is_empty());
}
#[tokio::test]
async fn test_stream_pagination_calls() {
let triples = make_triples(250);
let call_count = Arc::new(AtomicUsize::new(0));
let engine = MockSparql {
triples: triples.clone(),
page_call_count: Arc::clone(&call_count),
};
let engine = Arc::new(engine);
let local_count = Arc::clone(&engine.page_call_count);
let config = StreamingSparqlConfig {
page_size: 100,
max_total_triples: 0, channel_capacity: 16,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://example.org/seed")];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 250);
assert_eq!(local_count.load(Ordering::Relaxed), 3);
}
#[tokio::test]
async fn test_next_page_interface() {
let triples = make_triples(30);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 0,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://example.org/seed")];
let mut stream = retriever.stream_subgraph(seeds);
let mut pages_received = 0usize;
let mut total_triples = 0usize;
while let Some(page_result) = stream.next_page().await {
let page = page_result.expect("should succeed");
total_triples += page.triples.len();
pages_received += 1;
assert_eq!(page.page_index, pages_received - 1);
}
assert_eq!(total_triples, 30);
assert_eq!(pages_received, 3);
}
#[test]
fn test_build_expansion_query_single_hop() {
let q = build_expansion_query("http://example.org/e", 1, 100, 0);
assert!(q.contains("<http://example.org/e>"));
assert!(q.contains("LIMIT 100"));
assert!(q.contains("OFFSET 0"));
}
#[test]
fn test_build_expansion_query_multi_hop() {
let q = build_expansion_query("http://example.org/e", 3, 500, 1000);
assert!(q.contains("LIMIT 500"));
assert!(q.contains("OFFSET 1000"));
assert!(q.contains("{1,3}"));
}
#[test]
fn test_config_defaults() {
let cfg = StreamingSparqlConfig::default();
assert_eq!(cfg.page_size, 1_000);
assert_eq!(cfg.max_concurrency, 4);
assert!(cfg.deduplicate);
}
}
#[cfg(test)]
mod additional_tests {
use super::*;
use crate::{GraphRAGResult, ScoreSource, SparqlEngineTrait, Triple};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct MockSparql {
triples: Vec<Triple>,
page_call_count: Arc<AtomicUsize>,
}
impl MockSparql {
fn new(triples: Vec<Triple>) -> Self {
Self {
triples,
page_call_count: Arc::new(AtomicUsize::new(0)),
}
}
}
#[async_trait]
impl SparqlEngineTrait for MockSparql {
async fn select(&self, _query: &str) -> GraphRAGResult<Vec<HashMap<String, String>>> {
Ok(vec![])
}
async fn ask(&self, _query: &str) -> GraphRAGResult<bool> {
Ok(false)
}
async fn construct(&self, query: &str) -> GraphRAGResult<Vec<Triple>> {
self.page_call_count.fetch_add(1, Ordering::Relaxed);
let offset: usize = query
.lines()
.find(|l| l.trim_start().starts_with("OFFSET"))
.and_then(|l| l.split_whitespace().nth(1))
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let limit: usize = query
.lines()
.find(|l| l.trim_start().starts_with("LIMIT"))
.and_then(|l| l.split_whitespace().nth(1))
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
Ok(self
.triples
.iter()
.skip(offset)
.take(limit)
.cloned()
.collect())
}
}
fn make_seed(uri: &str, score: f64) -> ScoredEntity {
ScoredEntity {
uri: uri.to_string(),
score,
source: ScoreSource::Vector,
metadata: HashMap::new(),
}
}
fn make_triples(n: usize) -> Vec<Triple> {
(0..n)
.map(|i| {
Triple::new(
format!("http://s/{i}"),
"http://p/rel",
format!("http://o/{i}"),
)
})
.collect()
}
#[test]
fn test_config_channel_capacity_default() {
let cfg = StreamingSparqlConfig::default();
assert_eq!(cfg.channel_capacity, 512);
}
#[test]
fn test_config_expansion_hops_default() {
let cfg = StreamingSparqlConfig::default();
assert_eq!(cfg.expansion_hops, 2);
}
#[test]
fn test_config_min_seed_score_default() {
let cfg = StreamingSparqlConfig::default();
assert!((cfg.min_seed_score - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_config_max_total_triples_default() {
let cfg = StreamingSparqlConfig::default();
assert_eq!(cfg.max_total_triples, 50_000);
}
#[test]
fn test_build_expansion_query_hop2() {
let q = build_expansion_query("http://example.org/e", 2, 200, 400);
assert!(q.contains("LIMIT 200"));
assert!(q.contains("OFFSET 400"));
assert!(q.contains("{1,2}"));
}
#[test]
fn test_build_expansion_query_hop1_uses_p() {
let q = build_expansion_query("http://example.org/e", 1, 50, 0);
assert!(q.contains("?p"));
}
#[test]
fn test_build_expansion_query_contains_union() {
let q = build_expansion_query("http://example.org/e", 2, 100, 0);
assert!(q.contains("UNION"));
}
#[test]
fn test_build_expansion_query_construct_keyword() {
let q = build_expansion_query("http://example.org/e", 1, 100, 0);
assert!(q.contains("CONSTRUCT"));
assert!(q.contains("WHERE"));
}
#[tokio::test]
async fn test_stream_zero_triples_engine() {
let engine = Arc::new(MockSparql::new(vec![]));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 100,
channel_capacity: 4,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert!(collected.is_empty());
}
#[tokio::test]
async fn test_stream_exactly_page_size_triples() {
let triples = make_triples(10);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 0,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 10);
}
#[tokio::test]
async fn test_stream_deduplicate_across_pages() {
let base_triples = make_triples(15);
let mut triples = base_triples[0..10].to_vec();
triples.extend_from_slice(&base_triples[0..5]); triples.extend_from_slice(&base_triples[10..15]); let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 0,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: true,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 15);
}
#[tokio::test]
async fn test_stream_no_deduplicate_counts_duplicates() {
let triple = Triple::new("http://s", "http://p", "http://o");
let triples = vec![triple; 30];
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 15,
max_total_triples: 0,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 30);
}
#[tokio::test]
async fn test_stream_score_exactly_at_threshold() {
let triples = make_triples(5);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
min_seed_score: 0.5,
..Default::default()
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.5)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 5);
}
#[tokio::test]
async fn test_stream_score_just_below_threshold_filtered() {
let triples = make_triples(5);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
min_seed_score: 0.5,
..Default::default()
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.499)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert!(collected.is_empty());
}
#[tokio::test]
async fn test_stream_pages_have_correct_page_indices() {
let triples = make_triples(30);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 0,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let mut stream = retriever.stream_subgraph(seeds);
let mut expected_idx = 0usize;
while let Some(page_result) = stream.next_page().await {
let page = page_result.expect("should succeed");
assert_eq!(page.page_index, expected_idx);
expected_idx += 1;
}
assert_eq!(expected_idx, 3); }
#[tokio::test]
async fn test_stream_last_page_flag_set() {
let triples = make_triples(15);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 0,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let mut stream = retriever.stream_subgraph(seeds);
let mut pages = Vec::new();
while let Some(page_result) = stream.next_page().await {
pages.push(page_result.expect("should succeed"));
}
assert!(pages.last().map(|p| p.is_last_page).unwrap_or(false));
}
#[tokio::test]
async fn test_stream_cumulative_count_monotone() {
let triples = make_triples(30);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 0,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let mut stream = retriever.stream_subgraph(seeds);
let mut prev = 0usize;
while let Some(page_result) = stream.next_page().await {
let page = page_result.expect("should succeed");
assert!(
page.cumulative_count > prev,
"cumulative_count should be strictly increasing"
);
prev = page.cumulative_count;
}
}
#[tokio::test]
async fn test_stream_seed_uri_in_page() {
let triples = make_triples(5);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 10,
max_total_triples: 0,
channel_capacity: 4,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://my-seed/42", 0.9)];
let mut stream = retriever.stream_subgraph(seeds);
let page = stream
.next_page()
.await
.expect("should succeed")
.expect("should succeed");
assert_eq!(page.seed_uri, "http://my-seed/42");
}
#[tokio::test]
async fn test_stream_max_total_ceiling_exact() {
let triples = make_triples(200);
let engine = Arc::new(MockSparql::new(triples));
let config = StreamingSparqlConfig {
page_size: 50,
max_total_triples: 100,
channel_capacity: 8,
max_concurrency: 1,
deduplicate: false,
min_seed_score: 0.0,
expansion_hops: 1,
};
let retriever = StreamingSparqlRetriever::new(engine, config);
let seeds = vec![make_seed("http://seed", 0.9)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert!(collected.len() <= 100);
assert_eq!(collected.len(), 100);
}
#[tokio::test]
async fn test_with_defaults_builds_retriever() {
let engine = Arc::new(MockSparql::new(make_triples(3)));
let retriever = StreamingSparqlRetriever::with_defaults(engine);
let seeds = vec![make_seed("http://seed", 0.9)];
let collected = retriever
.collect_subgraph(seeds)
.await
.expect("should succeed");
assert_eq!(collected.len(), 3);
}
}