1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::envelope::{
AsyncNonReturningEnvelope, AsyncReturningEnvelope, Envelope, SyncNonReturningEnvelope,
SyncReturningEnvelope,
};
use crate::{Actor, AsyncHandler, Handler, Message};
use futures::channel::mpsc::UnboundedSender;
use futures::future::Either;
use futures::{Future, TryFutureExt};
#[derive(Clone)]
pub struct Address<A: Actor> {
pub(crate) sender: UnboundedSender<Box<dyn Envelope<Actor = A>>>,
}
impl<A: Actor> Address<A> {
pub fn do_send<M>(&self, message: M) -> Result<(), Disconnected>
where
M: Message,
A: Handler<M> + Send,
{
let envelope = SyncNonReturningEnvelope::new(message);
self.sender
.unbounded_send(Box::new(envelope))
.map_err(|_| Disconnected)
}
pub fn do_send_async<M>(&self, message: M) -> Result<(), Disconnected>
where
M: Message,
A: AsyncHandler<M> + Send,
{
let envelope = AsyncNonReturningEnvelope::new(message);
self.sender
.unbounded_send(Box::new(envelope))
.map_err(|_| Disconnected)
}
pub fn send<M>(&self, message: M) -> impl Future<Output = Result<M::Result, Disconnected>>
where
M: Message,
A: Handler<M> + Send,
M::Result: Send,
{
let t = SyncReturningEnvelope::new(message);
let envelope: SyncReturningEnvelope<A, M> = t.0;
let rx = t.1;
let res = self
.sender
.unbounded_send(Box::new(envelope))
.map_err(|_| Disconnected);
match res {
Ok(()) => Either::Left(rx.map_err(|_| Disconnected)),
Err(e) => Either::Right(futures::future::err(e)),
}
}
pub fn send_async<M>(&self, message: M) -> impl Future<Output = Result<M::Result, Disconnected>>
where
M: Message,
A: AsyncHandler<M> + Send,
for<'a> A::Responder<'a>: Future<Output = M::Result> + Send,
{
let t = AsyncReturningEnvelope::new(message);
let envelope: AsyncReturningEnvelope<A, M> = t.0;
let rx = t.1;
let res = self
.sender
.unbounded_send(Box::new(envelope))
.map_err(|_| Disconnected);
match res {
Ok(()) => Either::Left(rx.map_err(|_| Disconnected)),
Err(e) => Either::Right(futures::future::err(e)),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct Disconnected;