1use std::collections::HashMap;
15use std::error::Error;
16use std::fmt::Debug;
17use std::sync::{Arc};
18use tokio::sync::{mpsc, oneshot};
19use tokio::runtime::Runtime;
20use futures::lock::Mutex;
21use std::future::Future;
22use std::pin::Pin;
23use tokio::sync::oneshot::Sender;
24
25pub type BoxDynError = Box<dyn Error + Send + Sync>;
26pub type CallbackFuture<O> = Pin<Box<dyn Future<Output = O> + Send>>;
27pub type Callback<T> = Box<dyn (Fn(T) -> CallbackFuture<Result<(), BoxDynError>>) + Send + Sync>;
28
29
30
31#[derive(Debug)]
32pub struct Actor<Message, State, Response> {
33 tx: Mutex<Option<mpsc::Sender<(Message, i32)>>>,
34 rt: Mutex<Option<Arc<Runtime>>>,
35 join_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
36 state: Option<Arc<Mutex<State>>>,
37 self_ref: Mutex<Option<Arc<Actor<Message, State, Response>>>>,
38 message_id: Mutex<i32>,
39 call_back: Mutex<HashMap<i32, Callback<Result<Response, BoxDynError>>>>,
40 promise: Mutex<HashMap<i32, Sender<Result<Response, BoxDynError>>>>,
41 name: String,
42
43}
44#[derive(Debug)]
45pub struct Context<Message, State, Response> {
46 pub mgs: Message,
47 pub state: Arc<Mutex<State>>,
48 pub self_ref: Arc<Actor<Message, State, Response>>,
49}
50
51
52impl<Message: Debug + Send + Sync + 'static, State: Debug + Send + Sync + 'static, Response: Debug + Send + Sync + 'static> Actor<Message, State, Response> {
53 pub async fn new<F>(name: String, handler: F, state: State, buffer: usize) -> Arc<Self>
54 where
55 F: Fn(Context<Message, State, Response>) -> CallbackFuture<Result<Response, BoxDynError>>,
56 F: Send + Sync + 'static,
57 {
58 let state_arc = Arc::new(Mutex::new(state));
59 let state_clone = state_arc.clone();
60 let (tx, mut rx) = mpsc::channel(buffer);
61 let rt = tokio::runtime::Builder::new_multi_thread()
62 .worker_threads(1)
63 .enable_all()
64 .build()
65 .unwrap();
66
67 let actor = Actor {
68 tx: Mutex::new(Some(tx)),
69 rt: Mutex::new(None),
70 join_handle: Mutex::new(None),
71 state: Some(state_clone),
72 self_ref: Mutex::new(None),
73 message_id: Mutex::new(0),
74 call_back: Mutex::new(HashMap::new()),
75 promise: Mutex::new(HashMap::new()),
76 name: name,
77 };
78
79 let ret = Arc::new(actor);
80 let ret_clone = ret.clone();
81 let ret_clone2 = ret.clone();
82 let ret_clone3 = ret.clone();
83 let mut me = ret.self_ref.lock().await;
84 *me = Some(ret.clone());
85
86
87 let _join_handle = rt.spawn(async move {
88 let me = ret_clone2.clone();
89 while let Some(message) = rx.recv().await {
90 let msg = message.0;
91 let message_id = message.1;
92 let state_clone = state_arc.clone();
93 {
94 let state_lock = state_clone.lock().await;
95 log::debug!("<{}> Got message: {:?} Current state: {:?}", me.name, msg, state_lock);
96
97 }
98 let state = state_arc.clone();
99 let context = Context {
100 mgs: msg,
101 state: state,
102 self_ref: me.clone(),
103 };
104 let r = handler(context);
105 {
106 let result = r.await;
107 log::trace!("<{}> Work result: {:?}", me.name, result);
108 if result.is_err() {
109 log::error!("<{}> Work error: {:?}", me.name, result);
110 }
111 let mut call_back_lock = ret_clone3.call_back.lock().await;
112 let call_back = call_back_lock.remove(&message_id);
113 match call_back {
114 None => {
115 log::trace!("<{}> No callback for message_id: {}", me.name, message_id);
116
117 let mut promise_lock = ret_clone3.promise.lock().await;
118 let promise = promise_lock.remove(&message_id);
119 match promise {
120 None => {
121 log::trace!("<{}> No promise for message_id: {}", me.name, message_id);
122 }
123 Some(promise) => {
124 log::trace!("<{}> Promise result: {:?}", me.name, result);
125 let _ = promise.send(result);
126 }
127 }
128
129 }
130 Some(call_back) => {
131 log::trace!("<{}> Callback result: {:?}", me.name, result);
132 let _ = call_back(result).await;
133 }
134 }
135 }
136 let state_clone = state_arc.clone();
137 let state_lock = state_clone.lock().await;
138 log::trace!("<{}> After work on message new state: {:?}", me.name, state_lock);
139 }
140 });
141 let mut rt_mutex = ret.rt.lock().await;
142 *rt_mutex = Some(Arc::new(rt));
143 log::info!("<{}> Actor started", ret_clone.name);
144
145 ret_clone
146 }
147
148 pub async fn callback<F>(&self, msg: Message, handler: F) -> Result<(), BoxDynError>
149 where
150 F: Fn(Result<Response, BoxDynError>) -> CallbackFuture<Result<(), BoxDynError>>,
151 F: Send + Sync + 'static,
152 {
153
154
155 log::trace!("<{}> Ask message: {:?}", self.name, msg);
156 let tx_lock = self.tx.lock().await;
157 let tx = tx_lock.as_ref();
158 match tx {
159 None => {
160 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Err").into());
161 }
162 Some(tx) => {
163 let mut message_id_lock = self.message_id.lock().await;
164 *message_id_lock += 1;
165 let mut call_back_lock = self.call_back.lock().await;
166 call_back_lock.insert(*message_id_lock, Box::new(handler));
167 tx.send((msg, *message_id_lock)).await?;
168 }
169 }
170 Ok(())
171 }
172
173 pub async fn ask(&self, mgs: Message) -> Result<Response, BoxDynError>
174 {
175
176 log::trace!("<{}> Result message: {:?}", self.name, mgs);
177 let tx_lock = self.tx.lock().await;
178 let tx = tx_lock.as_ref();
179 match tx {
180 None => {
181 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Err").into());
182 }
183 Some(tx) => {
184 let (sender, receiver) = oneshot::channel();
185 {
186 let mut message_id_lock = self.message_id.lock().await;
187 *message_id_lock += 1;
188 let mut promise_lock = self.promise.lock().await;
189 promise_lock.insert(*message_id_lock, sender);
190 tx.send((mgs, *message_id_lock)).await?;
191 }
192 let result:Result<Response, BoxDynError> = receiver.await?;
193 result
194 }
195 }
196 }
197
198
199
200 pub async fn send(&self, msg: Message) -> Result<(), BoxDynError> {
201 log::trace!("<{}> Push message: {:?}", self.name, msg);
202 let tx_lock = self.tx.lock().await;
203 let tx = tx_lock.as_ref();
204 match tx {
205 None => {
206 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Err").into());
207 }
208 Some(tx) => {
209 tx.send((msg, 0)).await?;
210 }
211 }
212 Ok(())
213 }
214
215 pub async fn state(&self) -> Result<Arc<Mutex<State>>, BoxDynError> {
216 let state_opt = self.state.clone();
217 log::trace!("<{}> State: {:?}", self.name, state_opt);
218 match state_opt {
219 None => {
220 return Err(std::io::Error::new(std::io::ErrorKind::Other, "No state").into());
221 }
222 Some(state) => {
223 Ok(state)
224 }
225 }
226 }
227
228 pub async fn stop(&self){
229 let mut tx_lock = self.tx.lock().await;
230 *tx_lock = None;
231 let mut rt_lock = self.rt.lock().await;
232 let worker = rt_lock.take().unwrap();
233 std::mem::forget(worker);
234 *rt_lock = None;
235 let mut me_lock = self.self_ref.lock().await;
236 *me_lock = None;
237 let mut join_handle_lock = self.join_handle.lock().await;
239 *join_handle_lock = None;
240 log::debug!("<{}> Stop worker", self.name);
241
242 }
243}
244
245
246