1use futures::future;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::mpsc::{channel, Receiver, Sender};
10use std::task::{Context, Poll};
11
12use crate::error::Error;
13use crate::store::Store;
14use crate::tuple::Tuple;
15use crate::wildcard;
16
17pub enum Match {
19 Done(Result<Option<Tuple>, Error>),
20 Pending(Receiver<Tuple>),
21}
22
23impl Future for Match {
24 type Output = Option<Tuple>;
25
26 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
27 match &*self {
28 Match::Done(Ok(result)) => Poll::Ready(result.clone()),
29 Match::Done(Err(e)) => {
30 eprintln!("error polling Match: {:?}", e);
31 Poll::from(None)
32 }
33 Match::Pending(ref rx) => {
36 let receiver_poll = rx.try_recv();
37 match receiver_poll {
38 Ok(tup) => Poll::from(Some(tup)),
39 Err(e) => {
40 eprintln!("Match::poll encountered error: {:?}", e);
41 Poll::from(None)
42 }
43 }
44 }
45 }
46 }
47}
48
49pub struct Space<T: Store> {
51 store: T,
52 pending: wildcard::Tree<Sender<Tuple>>,
53}
54
55impl<T> Space<T>
56where
57 T: Store,
58{
59 pub fn new(store: T) -> Space<T> {
60 Space {
61 store,
62 pending: wildcard::Tree::new(),
63 }
64 }
65
66 pub fn tuple_in(&mut self, tup: Tuple) -> Match {
68 trace!("tuple_in");
69 match self.store.inp(&tup) {
70 Ok(None) => {
71 trace!("matched Ok(None)");
72 let (tx, rx) = channel();
73 let resultat = self.pending.insert(tup.clone(), tx);
74 trace!("resultat {:?}", resultat);
75 if let Err(e) = resultat {
76 trace!("return match::Done(Err(...))");
77 Match::Done(Err(Error::with_chain(e, "send failed")))
78 } else {
79 trace!("return Match::Pending(rx)");
80 Match::Pending(rx)
81 }
82 }
83 result => Match::Done(result),
84 }
85 }
86
87 pub fn tuple_rd(&mut self, tup: Tuple) -> Match {
89 trace!("tuple_rd");
90 match self.store.rdp(&tup) {
91 Ok(None) => {
92 let (tx, rx) = channel();
93 if let Err(e) = self.pending.insert(tup.clone(), tx) {
94 Match::Done(Err(Error::with_chain(e, "send failed")))
95 } else {
96 Match::Pending(rx)
97 }
98 }
99 result => Match::Done(result),
100 }
101 }
102
103 pub fn tuple_out(&mut self, tup: Tuple) -> Pin<Box<dyn Future<Output = Result<(), Error>>>> {
106 trace!("tuple_out");
107 if !tup.is_defined() {
108 Box::pin(future::err(Error::from("undefined tuple")))
110 } else if let Some(tx) = self.pending.take(tup.clone()) {
111 let send_attempt = tx
112 .send(tup)
113 .map(|_| ())
114 .map_err(|e| Error::with_chain(e, "send failed"));
115
116 Box::pin(future::ready(send_attempt))
117 } else {
118 let result = self.store.out(tup);
119 Box::pin(future::ready(result))
120 }
121 }
122}