use futures::future;
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::task::{Context, Poll};
use crate::error::Error;
use crate::store::Store;
use crate::tuple::Tuple;
use crate::wildcard;
pub enum Match {
Done(Result<Option<Tuple>, Error>),
Pending(Receiver<Tuple>),
}
impl Future for Match {
type Output = Option<Tuple>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match &*self {
Match::Done(Ok(result)) => Poll::Ready(result.clone()),
Match::Done(Err(e)) => {
eprintln!("error polling Match: {:?}", e);
Poll::from(None)
}
Match::Pending(ref rx) => {
let receiver_poll = rx.try_recv();
match receiver_poll {
Ok(tup) => Poll::from(Some(tup)),
Err(e) => {
eprintln!("Match::poll encountered error: {:?}", e);
Poll::from(None)
}
}
}
}
}
}
pub struct Space<T: Store> {
store: T,
pending: wildcard::Tree<Sender<Tuple>>,
}
impl<T> Space<T>
where
T: Store,
{
pub fn new(store: T) -> Space<T> {
Space {
store,
pending: wildcard::Tree::new(),
}
}
pub fn tuple_in(&mut self, tup: Tuple) -> Match {
trace!("tuple_in");
match self.store.inp(&tup) {
Ok(None) => {
trace!("matched Ok(None)");
let (tx, rx) = channel();
let resultat = self.pending.insert(tup.clone(), tx);
trace!("resultat {:?}", resultat);
if let Err(e) = resultat {
trace!("return match::Done(Err(...))");
Match::Done(Err(Error::with_chain(e, "send failed")))
} else {
trace!("return Match::Pending(rx)");
Match::Pending(rx)
}
}
result => Match::Done(result),
}
}
pub fn tuple_rd(&mut self, tup: Tuple) -> Match {
trace!("tuple_rd");
match self.store.rdp(&tup) {
Ok(None) => {
let (tx, rx) = channel();
if let Err(e) = self.pending.insert(tup.clone(), tx) {
Match::Done(Err(Error::with_chain(e, "send failed")))
} else {
Match::Pending(rx)
}
}
result => Match::Done(result),
}
}
pub fn tuple_out(&mut self, tup: Tuple) -> Pin<Box<dyn Future<Output = Result<(), Error>>>> {
trace!("tuple_out");
if !tup.is_defined() {
Box::pin(future::err(Error::from("undefined tuple")))
} else if let Some(tx) = self.pending.take(tup.clone()) {
let send_attempt = tx
.send(tup)
.map(|_| ())
.map_err(|e| Error::with_chain(e, "send failed"));
Box::pin(future::ready(send_attempt))
} else {
let result = self.store.out(tup);
Box::pin(future::ready(result))
}
}
}