1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use std::error::Error;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::FutureExt;
use log::error;
use crate::Addr;
pub type ActorError = Box<dyn Error + Send + Sync>;
pub type ActorResult<T> = Result<Produces<T>, ActorError>;
#[derive(Debug)]
#[non_exhaustive]
pub enum Produces<T> {
None,
Value(T),
Deferred(oneshot::Receiver<Produces<T>>),
}
impl<T> Unpin for Produces<T> {}
impl<T> Produces<T> {
pub fn ok(value: T) -> ActorResult<T> {
Ok(Produces::Value(value))
}
}
impl<T> Future for Produces<T> {
type Output = Result<T, oneshot::Canceled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
break match mem::replace(&mut *self, Produces::None) {
Produces::None => Poll::Ready(Err(oneshot::Canceled)),
Produces::Value(value) => Poll::Ready(Ok(value)),
Produces::Deferred(mut recv) => match recv.poll_unpin(cx) {
Poll::Ready(Ok(producer)) => {
*self = producer;
continue;
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => {
*self = Produces::Deferred(recv);
Poll::Pending
}
},
};
}
}
}
#[async_trait]
pub trait Actor: Send + 'static {
async fn started(&mut self, _addr: Addr<Self>) -> ActorResult<()>
where
Self: Sized,
{
Produces::ok(())
}
async fn error(&mut self, error: ActorError) -> bool {
error!("{}", error);
true
}
}
pub trait IntoActorResult {
type Output;
fn into_actor_result(self) -> ActorResult<Self::Output>;
}
impl<T> IntoActorResult for ActorResult<T> {
type Output = T;
fn into_actor_result(self) -> ActorResult<T> {
self
}
}
impl IntoActorResult for () {
type Output = ();
fn into_actor_result(self) -> ActorResult<()> {
Produces::ok(())
}
}