1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use crate::*;
use std::sync::Arc;
use tracing::Instrument;
type InnerInvoke<T> = Box<dyn FnOnce(&mut T) + 'static + Send>;
type SendInvoke<T> = futures::channel::mpsc::Sender<InnerInvoke<T>>;
pub struct GhostActor<T: 'static + Send>(Arc<SendInvoke<T>>);
impl<T: 'static + Send> GhostActor<T> {
pub fn new(t: T) -> (Self, GhostDriver) {
Self::new_config(GhostConfig::default(), t)
}
pub fn new_config(config: GhostConfig, t: T) -> (Self, GhostDriver) {
let mut t = t;
let (send, recv) = futures::channel::mpsc::channel::<InnerInvoke<T>>(
config.channel_bound,
);
let driver =
GhostDriver(futures::future::FutureExt::boxed(async move {
let mut recv =
futures::stream::StreamExt::ready_chunks(recv, 1024);
while let Some(invokes) =
futures::stream::StreamExt::next(&mut recv).await
{
for invoke in invokes {
invoke(&mut t);
}
}
}));
(Self(Arc::new(send)), driver)
}
pub fn to_boxed(&self) -> BoxGhostActor {
self.__box_clone()
}
pub fn invoke<R, E, F>(&self, invoke: F) -> GhostFuture<R, E>
where
R: 'static + Send,
E: 'static + From<GhostError> + Send,
F: FnOnce(&mut T) -> Result<R, E> + 'static + Send,
{
let mut sender = (*self.0).clone();
resp(
async move {
let strong = Arc::new(tracing::Span::current());
let weak = Arc::downgrade(&strong);
let (o_send, o_recv) = futures::channel::oneshot::channel();
let inner: InnerInvoke<T> = Box::new(move |t: &mut T| {
let strong = match weak.upgrade() {
Some(strong) => strong,
None => {
tracing::warn!("TRACING: Parent context dropped");
Arc::new(tracing::Span::current())
}
};
strong.in_scope(|| {
let r = invoke(t);
let _ = o_send.send(r);
});
});
use futures::sink::SinkExt;
sender.send(inner).await.map_err(GhostError::other)?;
o_recv.await.map_err(GhostError::other)?
}
.instrument(tracing::Span::current()),
)
}
pub fn is_active(&self) -> bool {
!self.0.is_closed()
}
pub fn shutdown(&self) {
(*self.0).clone().close_channel();
}
}
impl<T: 'static + Send> AsGhostActor for GhostActor<T> {
fn __invoke(
&self,
invoke: RawInvokeClosure,
) -> GhostFuture<Box<dyn std::any::Any + 'static + Send>, GhostError> {
let fut = self.invoke(|t| invoke(t));
resp(async move { fut.await })
}
fn __box_clone(&self) -> BoxGhostActor {
BoxGhostActor(Box::new(self.clone()))
}
fn __is_same_actor(&self, o: &dyn std::any::Any) -> bool {
let o: &GhostActor<T> = match std::any::Any::downcast_ref(o) {
None => return false,
Some(o) => o,
};
self.0.same_receiver(&o.0)
}
fn __hash_actor(&self, hasher: &mut dyn std::hash::Hasher) {
self.0.hash_receiver(&mut Box::new(hasher));
}
fn __is_active(&self) -> bool {
GhostActor::is_active(self)
}
fn __shutdown(&self) {
GhostActor::shutdown(self);
}
}
impl<T: 'static + Send> std::clone::Clone for GhostActor<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: 'static + Send> std::cmp::PartialEq for GhostActor<T> {
fn eq(&self, o: &Self) -> bool {
self.0.same_receiver(&o.0)
}
}
impl<T: 'static + Send> std::cmp::Eq for GhostActor<T> {}
impl<T: 'static + Send> std::hash::Hash for GhostActor<T> {
fn hash<Hasher: std::hash::Hasher>(&self, state: &mut Hasher) {
self.0.hash_receiver(state);
}
}