use crate::StarResult;
use super::parallel::StarBinding;
pub struct StreamingStarResult {
inner: Box<dyn Iterator<Item = StarResult<StarBinding>> + Send>,
row_count: usize,
is_exhausted: bool,
peeked: Option<StarResult<StarBinding>>,
}
impl StreamingStarResult {
pub fn new(iter: impl Iterator<Item = StarResult<StarBinding>> + Send + 'static) -> Self {
Self {
inner: Box::new(iter),
row_count: 0,
is_exhausted: false,
peeked: None,
}
}
pub fn from_vec(rows: Vec<StarBinding>) -> Self {
Self::new(rows.into_iter().map(Ok))
}
pub fn empty() -> Self {
Self::new(std::iter::empty())
}
pub fn take_n(&mut self, n: usize) -> Vec<StarResult<StarBinding>> {
let mut batch = Vec::with_capacity(n);
for _ in 0..n {
match self.next() {
Some(item) => batch.push(item),
None => break,
}
}
batch
}
pub fn has_next(&mut self) -> bool {
if self.is_exhausted {
return false;
}
if self.peeked.is_some() {
return true;
}
match self.inner.next() {
Some(item) => {
self.peeked = Some(item);
true
}
None => {
self.is_exhausted = true;
false
}
}
}
pub fn rows_consumed(&self) -> usize {
self.row_count
}
pub fn collect_all(mut self) -> StarResult<Vec<StarBinding>> {
let mut rows = Vec::new();
for item in &mut self {
rows.push(item?);
}
Ok(rows)
}
}
impl Iterator for StreamingStarResult {
type Item = StarResult<StarBinding>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_exhausted {
return None;
}
let item = if let Some(peeked) = self.peeked.take() {
Some(peeked)
} else {
self.inner.next()
};
match item {
Some(row) => {
self.row_count += 1;
Some(row)
}
None => {
self.is_exhausted = true;
None
}
}
}
}
pub struct StreamingJoin {
left: StreamingStarResult,
right_cache: Vec<StarBinding>,
join_vars: Vec<String>,
left_current: Option<StarBinding>,
right_pos: usize,
exhausted: bool,
}
impl StreamingJoin {
pub fn new(left: StreamingStarResult, right: Vec<StarBinding>, join_vars: Vec<String>) -> Self {
Self {
left,
right_cache: right,
join_vars,
left_current: None,
right_pos: 0,
exhausted: false,
}
}
fn advance_left(&mut self) -> Option<StarResult<StarBinding>> {
match self.left.next() {
Some(Ok(row)) => {
self.left_current = Some(row);
self.right_pos = 0;
None
}
Some(Err(e)) => Some(Err(e)),
None => {
self.exhausted = true;
None
}
}
}
fn compatible(left_row: &StarBinding, right_row: &StarBinding, join_vars: &[String]) -> bool {
join_vars.iter().all(|v| {
match (left_row.get(v.as_str()), right_row.get(v.as_str())) {
(Some(lv), Some(rv)) => lv == rv,
_ => true, }
})
}
fn merge(left_row: &StarBinding, right_row: &StarBinding) -> StarBinding {
let mut merged = left_row.clone();
for (k, v) in right_row {
merged.entry(k.clone()).or_insert_with(|| v.clone());
}
merged
}
}
impl Iterator for StreamingJoin {
type Item = StarResult<StarBinding>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.exhausted {
return None;
}
if self.left_current.is_none() {
if let Some(err) = self.advance_left() {
return Some(err);
}
if self.exhausted {
return None;
}
}
let left_row = match self.left_current.as_ref() {
Some(r) => r,
None => {
self.exhausted = true;
return None;
}
};
while self.right_pos < self.right_cache.len() {
let right_row = &self.right_cache[self.right_pos];
self.right_pos += 1;
if Self::compatible(left_row, right_row, &self.join_vars) {
return Some(Ok(Self::merge(left_row, right_row)));
}
}
self.left_current = None;
}
}
}
pub struct StreamingFilter<F>
where
F: Fn(&StarBinding) -> bool + Send,
{
inner: StreamingStarResult,
predicate: F,
rows_passed: usize,
rows_skipped: usize,
}
impl<F> StreamingFilter<F>
where
F: Fn(&StarBinding) -> bool + Send,
{
pub fn new(stream: StreamingStarResult, predicate: F) -> Self {
Self {
inner: stream,
predicate,
rows_passed: 0,
rows_skipped: 0,
}
}
pub fn rows_passed(&self) -> usize {
self.rows_passed
}
pub fn rows_skipped(&self) -> usize {
self.rows_skipped
}
}
impl<F> Iterator for StreamingFilter<F>
where
F: Fn(&StarBinding) -> bool + Send,
{
type Item = StarResult<StarBinding>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.inner.next()? {
Ok(row) => {
if (self.predicate)(&row) {
self.rows_passed += 1;
return Some(Ok(row));
}
self.rows_skipped += 1;
}
Err(e) => return Some(Err(e)),
}
}
}
}
pub struct StreamingProject {
inner: StreamingStarResult,
variables: Vec<String>,
}
impl StreamingProject {
pub fn new(stream: StreamingStarResult, variables: Vec<String>) -> Self {
Self {
inner: stream,
variables,
}
}
}
impl Iterator for StreamingProject {
type Item = StarResult<StarBinding>;
fn next(&mut self) -> Option<Self::Item> {
match self.inner.next()? {
Ok(row) => {
let projected: StarBinding = self
.variables
.iter()
.filter_map(|v| row.get(v.as_str()).map(|t| (v.clone(), t.clone())))
.collect();
Some(Ok(projected))
}
Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::StarTerm;
fn make_binding(pairs: &[(&str, &str)]) -> StarBinding {
pairs
.iter()
.map(|(k, v)| (k.to_string(), StarTerm::iri(v).expect("valid IRI")))
.collect()
}
#[test]
fn test_empty_stream() {
let mut stream = StreamingStarResult::empty();
assert!(!stream.has_next());
assert_eq!(stream.rows_consumed(), 0);
let batch = stream.take_n(10);
assert!(batch.is_empty());
}
#[test]
fn test_from_vec() {
let rows: Vec<StarBinding> = (0..5_usize)
.map(|i| {
let mut b = StarBinding::new();
b.insert(
"x".to_owned(),
StarTerm::iri(&format!("http://ex.org/{i}")).expect("valid IRI"),
);
b
})
.collect();
let mut stream = StreamingStarResult::from_vec(rows);
assert!(stream.has_next());
let batch = stream.take_n(3);
assert_eq!(batch.len(), 3);
assert_eq!(stream.rows_consumed(), 3);
let rest = stream.take_n(100);
assert_eq!(rest.len(), 2);
assert_eq!(stream.rows_consumed(), 5);
assert!(!stream.has_next());
}
#[test]
fn test_collect_all() {
let rows: Vec<StarBinding> = (0..10_usize).map(|_| StarBinding::new()).collect();
let stream = StreamingStarResult::from_vec(rows);
let collected = stream.collect_all().expect("no error");
assert_eq!(collected.len(), 10);
}
#[test]
fn test_streaming_join_basic() {
let left_rows = vec![
make_binding(&[("x", "http://ex.org/alice"), ("y", "http://ex.org/bob")]),
make_binding(&[("x", "http://ex.org/charlie"), ("y", "http://ex.org/dave")]),
];
let right_rows = vec![
make_binding(&[("y", "http://ex.org/bob"), ("z", "http://ex.org/lit1")]),
make_binding(&[("y", "http://ex.org/eve"), ("z", "http://ex.org/lit2")]),
];
let left_stream = StreamingStarResult::from_vec(left_rows);
let join = StreamingJoin::new(left_stream, right_rows, vec!["y".to_owned()]);
let mut count = 0;
for item in join {
let row = item.expect("no error");
assert_eq!(
row.get("x").expect("x"),
&StarTerm::iri("http://ex.org/alice").expect("valid IRI")
);
assert_eq!(
row.get("z").expect("z"),
&StarTerm::iri("http://ex.org/lit1").expect("valid IRI")
);
count += 1;
}
assert_eq!(count, 1);
}
#[test]
fn test_streaming_join_empty_right() {
let left_rows = vec![make_binding(&[("x", "http://ex.org/a")])];
let left_stream = StreamingStarResult::from_vec(left_rows);
let mut join = StreamingJoin::new(left_stream, vec![], vec!["x".to_owned()]);
assert!(join.next().is_none());
}
#[test]
fn test_streaming_filter() {
let rows: Vec<StarBinding> = (0..10_usize)
.map(|i| {
let mut b = StarBinding::new();
b.insert(
"i".to_owned(),
StarTerm::iri(&format!("http://ex.org/{i}")).expect("valid IRI"),
);
b
})
.collect();
let stream = StreamingStarResult::from_vec(rows);
let mut filter = StreamingFilter::new(stream, |row| row.contains_key("i"));
let all: Vec<_> = (&mut filter).collect();
assert_eq!(all.len(), 10);
assert_eq!(filter.rows_passed(), 10);
assert_eq!(filter.rows_skipped(), 0);
}
#[test]
fn test_streaming_filter_with_exclusions() {
let target = StarTerm::iri("http://ex.org/keep").expect("valid IRI");
let keep_binding = {
let mut b = StarBinding::new();
b.insert("x".to_owned(), target.clone());
b
};
let skip_binding = {
let mut b = StarBinding::new();
b.insert(
"x".to_owned(),
StarTerm::iri("http://ex.org/skip").expect("valid IRI"),
);
b
};
let rows = vec![keep_binding, skip_binding];
let stream = StreamingStarResult::from_vec(rows);
let mut filter = StreamingFilter::new(stream, move |row| {
row.get("x").map(|t| t == &target).unwrap_or(false)
});
let results: Vec<_> = (&mut filter).collect();
assert_eq!(results.len(), 1);
assert_eq!(filter.rows_passed(), 1);
assert_eq!(filter.rows_skipped(), 1);
}
#[test]
fn test_streaming_project() {
let mut b = StarBinding::new();
b.insert(
"x".to_owned(),
StarTerm::iri("http://ex.org/x").expect("valid IRI"),
);
b.insert(
"y".to_owned(),
StarTerm::iri("http://ex.org/y").expect("valid IRI"),
);
let rows = vec![b];
let stream = StreamingStarResult::from_vec(rows);
let mut proj = StreamingProject::new(stream, vec!["x".to_owned()]);
let row = proj.next().expect("item").expect("no error");
assert!(row.contains_key("x"), "x should be in projected row");
assert!(!row.contains_key("y"), "y should be projected away");
}
}