rustupolis/
space.rs

1//! Module Space
2//!
3//! A space combines a store and concurrent matching to allow for searching
4//! tuples containing wildcards.
5
6use futures::future;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::mpsc::{channel, Receiver, Sender};
10use std::task::{Context, Poll};
11
12use crate::error::Error;
13use crate::store::Store;
14use crate::tuple::Tuple;
15use crate::wildcard;
16
17/// Matchings can either be pending or completed.
18pub enum Match {
19    Done(Result<Option<Tuple>, Error>),
20    Pending(Receiver<Tuple>),
21}
22
23impl Future for Match {
24    type Output = Option<Tuple>;
25
26    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
27        match &*self {
28            Match::Done(Ok(result)) => Poll::Ready(result.clone()),
29            Match::Done(Err(e)) => {
30                eprintln!("error polling Match: {:?}", e);
31                Poll::from(None)
32            }
33            // Match::Pending(ref rx) => rx.poll().map_err(|()| "receive failed".into()),
34            // Match::Pending(_) => Poll::Pending,
35            Match::Pending(ref rx) => {
36                let receiver_poll = rx.try_recv();
37                match receiver_poll {
38                    Ok(tup) => Poll::from(Some(tup)),
39                    Err(e) => {
40                        eprintln!("Match::poll encountered error: {:?}", e);
41                        Poll::from(None)
42                    }
43                }
44            }
45        }
46    }
47}
48
49/// Space encapsulates the store and a wildcard tree.
50pub struct Space<T: Store> {
51    store: T,
52    pending: wildcard::Tree<Sender<Tuple>>,
53}
54
55impl<T> Space<T>
56where
57    T: Store,
58{
59    pub fn new(store: T) -> Space<T> {
60        Space {
61            store,
62            pending: wildcard::Tree::new(),
63        }
64    }
65
66    /// Find a matching tuple, retrieve AND remove it from the space.
67    pub fn tuple_in(&mut self, tup: Tuple) -> Match {
68        trace!("tuple_in");
69        match self.store.inp(&tup) {
70            Ok(None) => {
71                trace!("matched Ok(None)");
72                let (tx, rx) = channel();
73                let resultat = self.pending.insert(tup.clone(), tx);
74                trace!("resultat {:?}", resultat);
75                if let Err(e) = resultat {
76                    trace!("return match::Done(Err(...))");
77                    Match::Done(Err(Error::with_chain(e, "send failed")))
78                } else {
79                    trace!("return Match::Pending(rx)");
80                    Match::Pending(rx)
81                }
82            }
83            result => Match::Done(result),
84        }
85    }
86
87    /// Find a matching tuple, retrieve but NOT remove it from the space.
88    pub fn tuple_rd(&mut self, tup: Tuple) -> Match {
89        trace!("tuple_rd");
90        match self.store.rdp(&tup) {
91            Ok(None) => {
92                let (tx, rx) = channel();
93                if let Err(e) = self.pending.insert(tup.clone(), tx) {
94                    Match::Done(Err(Error::with_chain(e, "send failed")))
95                } else {
96                    Match::Pending(rx)
97                }
98            }
99            result => Match::Done(result),
100        }
101    }
102
103    /// Inserts a tuple into the store and returns a match that is
104    /// either still pending or done.
105    pub fn tuple_out(&mut self, tup: Tuple) -> Pin<Box<dyn Future<Output = Result<(), Error>>>> {
106        trace!("tuple_out");
107        if !tup.is_defined() {
108            // Box::new(futures::future::err("undefined tuple".into()))
109            Box::pin(future::err(Error::from("undefined tuple")))
110        } else if let Some(tx) = self.pending.take(tup.clone()) {
111            let send_attempt = tx
112                .send(tup)
113                .map(|_| ())
114                .map_err(|e| Error::with_chain(e, "send failed"));
115
116            Box::pin(future::ready(send_attempt))
117        } else {
118            let result = self.store.out(tup);
119            Box::pin(future::ready(result))
120        }
121    }
122}