eyros 4.2.2

multi-dimensional interval database
Documentation
use crate::{Error,Point,Value};
use async_std::{prelude::*,stream::Stream,sync::{Arc,Mutex}};
use crate::unfold::unfold;
use std::marker::Unpin;

type Out<P,V> = Option<Result<(P,V),Error>>;
pub type QStream<P,V> = Box<dyn Stream<Item=Result<(P,V),Error>>+Send+Unpin>;

pin_project_lite::pin_project! {
  pub struct QueryStream<P,V> where P: Point, V: Value {
    index: Arc<Mutex<usize>>,
    #[pin] queries: Arc<Mutex<Vec<QStream<P,V>>>>,
  }
}

impl<P,V> QueryStream<P,V> where P: Point, V: Value {
  pub fn from_queries(queries: Vec<QStream<P,V>>) -> Result<QStream<P,V>,Error> {
    let qs = Self {
      index: Arc::new(Mutex::new(0)),
      queries: Arc::new(Mutex::new(queries)),
    };
    Ok(Box::new(unfold(qs, async move |mut qs| {
      qs.get_next().await.map(|p| (p,qs))
    })))
  }
  async fn get_next(&mut self) -> Out<P,V> {
    let mut qs = self.queries.lock().await;
    while !qs.is_empty() {
      let len = qs.len();
      {
        let mut ix = self.index.lock().await;
        let q = &mut qs[*ix];
        let result = q.next().await;
        if result.is_some() {
          *ix = ((*ix)+1) % len;
          return result;
        }
      }
      {
        let mut ix = self.index.lock().await;
        let _ = qs.remove(*ix);
        if !qs.is_empty() {
          *ix %= qs.len();
        }
      }
    }
    None
  }
}