1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::{pin::Pin, sync::Arc};
use crate::{receiver::UntypedPollerCallback, synchronized_poller_macro};
use futures::{Future, Stream};
use super::SynchronizedConfig;
use crate::{
builder::ReceiverSubscriberBuilder,
error::{Error, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::Request,
AsyncSynchronizedHandler, Bus, Message, Untyped,
};
use tokio::sync::{
mpsc::{self, UnboundedSender},
Mutex,
};
synchronized_poller_macro! {
T,
AsyncSynchronizedHandler,
|mid, msg, bus, ut: Arc<Mutex<T>>, stx: UnboundedSender<_>| {
tokio::spawn(async move {
let resp = ut.lock().await.handle(msg, &bus).await;
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
})
},
|bus, ut: Arc<Mutex<T>>| async move {
ut.lock().await.sync(&bus).await
}
}
pub struct SynchronizedAsync<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for SynchronizedAsync<M, R, E>
where
T: AsyncSynchronizedHandler<M, Response = R, Error = E> + 'static,
R: Message,
M: Message,
E: StdSyncSendError,
{
type Config = SynchronizedConfig;
fn build(_cfg: Self::Config) -> (Self, UntypedPollerCallback) {
let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(synchronized_poller::<T, M, R>(rx, bus, ut, stx))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(
SynchronizedAsync::<M, R, E> {
tx,
srx: parking_lot::Mutex::new(Some(srx)),
},
poller,
)
}
}
impl<M, R, E> SendUntypedReceiver for SynchronizedAsync<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(m)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
}
impl<M, R, E> SendTypedReceiver<M> for SynchronizedAsync<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}
}
impl<M, R, E> ReciveTypedReceiver<R, E> for SynchronizedAsync<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self, _: Bus) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}