1use crate::{Actor, ActorError, JobSpec, LifeCycle, Message};
2use std::collections::{HashMap, HashSet};
3
4pub enum ActorSystemCmd {
7 Register(
8 String,
9 tokio::sync::mpsc::UnboundedSender<Message>,
10 tokio::sync::mpsc::UnboundedSender<()>,
11 tokio::sync::mpsc::UnboundedSender<()>,
12 LifeCycle,
13 tokio::sync::oneshot::Sender<Result<(), ActorError>>,
14 bool,
15 ),
16 Restart(String),
17 Unregister(String),
18 FilterAddress(String, tokio::sync::oneshot::Sender<Vec<String>>),
19 FindActor(
20 String,
21 tokio::sync::oneshot::Sender<
22 Option<(tokio::sync::mpsc::UnboundedSender<Message>, bool)>, >,
24 ),
25 SetLifeCycle(String, LifeCycle),
26}
27
28#[derive(Clone)]
29pub struct ActorSystem {
33 handler_tx: tokio::sync::mpsc::UnboundedSender<ActorSystemCmd>,
34 pub blocking: bool,
35}
36
37impl Default for ActorSystem {
38 fn default() -> Self {
39 let (handler_tx, handler_rx) = tokio::sync::mpsc::unbounded_channel();
40 let mut me = Self {
41 handler_tx,
42 blocking: true,
43 };
44 me.run(handler_rx);
45 me
46 }
47}
48
49impl ActorSystem {
50 pub fn new(blocking: bool) -> Self {
52 let (handler_tx, handler_rx) = tokio::sync::mpsc::unbounded_channel();
53 let mut me = Self {
54 handler_tx,
55 blocking,
56 };
57 me.run(handler_rx);
58 me
59 }
60
61 pub fn handler_tx(&self) -> tokio::sync::mpsc::UnboundedSender<ActorSystemCmd> {
64 self.handler_tx.clone()
65 }
66
67 pub async fn filter_address(&mut self, address_regex: String) -> Vec<String> {
69 let (tx, rx) = tokio::sync::oneshot::channel();
70 let _ = self
71 .handler_tx
72 .send(ActorSystemCmd::FilterAddress(address_regex, tx));
73 match rx.await {
74 Ok(addresses) => addresses,
75 Err(e) => {
76 error!("Receive address list failed: {:?}", e);
77 Vec::new()
78 }
79 }
80 }
81
82 pub fn restart(&mut self, address_regex: String) {
84 let _ = self.handler_tx.send(ActorSystemCmd::Restart(address_regex));
85 }
86
87 pub fn unregister(&mut self, address_regex: String) {
89 let _ = self
90 .handler_tx
91 .send(ActorSystemCmd::Unregister(address_regex));
92 }
93
94 pub async fn send<T>(
97 &self,
98 address: String,
99 msg: <T as Actor>::Message,
100 ) -> Result<(), ActorError>
101 where
102 T: Actor,
103 {
104 let (tx, rx) = tokio::sync::oneshot::channel();
105 let _ = self
106 .handler_tx
107 .send(ActorSystemCmd::FindActor(address.clone(), tx));
108 if let Ok(Some((tx, ready))) = rx.await {
109 if ready {
110 let _ = tx.send(Message::new(rmp_serde::to_vec(&msg)?, None))?;
111 Ok(())
112 } else {
113 Err(ActorError::ActorNotReady(address))
114 }
115 } else {
116 Err(ActorError::AddressNotFound(address))
117 }
118 }
119
120 pub async fn send_broadcast<T>(
124 &self,
125 address_regex: String,
126 msg: <T as Actor>::Message,
127 ) -> Vec<Result<(), ActorError>>
128 where
129 T: Actor,
130 {
131 let (tx, rx) = tokio::sync::oneshot::channel();
132 let _ = self
133 .handler_tx
134 .send(ActorSystemCmd::FilterAddress(address_regex, tx));
135 let addresses = match rx.await {
136 Ok(addresses) => addresses,
137 Err(e) => {
138 error!("Receive address list failed: {:?}", e);
139 return vec![Err(ActorError::from(e))];
140 }
141 };
142 let mut result = Vec::new();
143 for address in addresses {
144 let (tx, rx) = tokio::sync::oneshot::channel();
145 let _ = self
146 .handler_tx
147 .send(ActorSystemCmd::FindActor(address.clone(), tx));
148 if let Ok(Some((tx, ready))) = rx.await {
149 if ready {
150 match rmp_serde::to_vec(&msg) {
151 Ok(x) => {
152 let message = Message::new(x, None);
153 result.push(
154 tx.send(message)
155 .map(|_| ())
156 .map_err(|e| ActorError::UnboundedChannelSend(e)),
157 );
158 }
159 Err(e) => {
160 result.push(Err(ActorError::from(e)));
161 }
162 }
163 } else {
164 result.push(Err(ActorError::ActorNotReady(address)));
165 }
166 } else {
167 result.push(Err(ActorError::AddressNotFound(address)));
168 }
169 }
170 result
171 }
172
173 pub async fn send_and_recv<T>(
175 &self,
176 address: String,
177 msg: <T as Actor>::Message,
178 ) -> Result<<T as Actor>::Result, ActorError>
179 where
180 T: Actor,
181 {
182 let (tx, rx) = tokio::sync::oneshot::channel();
183 let _ = self
184 .handler_tx
185 .send(ActorSystemCmd::FindActor(address.clone(), tx));
186 if let Ok(Some((tx, ready))) = rx.await {
187 if ready {
188 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
189 let _ = tx.send(Message::new(rmp_serde::to_vec(&msg)?, Some(result_tx)))?;
190 Ok(rmp_serde::from_slice::<<T as Actor>::Result>(
191 &result_rx.await?,
192 )?)
193 } else {
194 Err(ActorError::ActorNotReady(address))
195 }
196 } else {
197 Err(ActorError::AddressNotFound(address))
198 }
199 }
200
201 pub async fn run_job<T>(
205 &self,
206 address: String,
207 subscribe: bool,
208 job: JobSpec,
209 msg: <T as Actor>::Message,
210 ) -> Result<
211 Option<
212 tokio::sync::mpsc::UnboundedReceiver<
213 Result<<T as Actor>::Result, rmp_serde::decode::Error>,
214 >,
215 >,
216 ActorError,
217 >
218 where
219 T: Actor,
220 {
221 let (tx, rx) = tokio::sync::oneshot::channel();
222 let msg = match rmp_serde::to_vec(&msg) {
223 Ok(msg) => msg,
224 Err(e) => {
225 error!("Serialize message failed: {:?}", e);
226 return Err(ActorError::from(e));
227 }
228 };
229 let _ = self
230 .handler_tx
231 .send(ActorSystemCmd::FindActor(address.clone(), tx));
232 if let Ok(Some((tx, ready))) = rx.await {
233 if ready {
234 let tx = tx.clone();
235 if subscribe {
236 let (sub_tx, sub_rx) = tokio::sync::mpsc::unbounded_channel();
237 let msg = msg.clone();
238 let _ = tokio::spawn(async move {
239 let mut i = 0;
240 if let Some(interval) = job.interval() {
241 loop {
242 if job.start_at() <= std::time::SystemTime::now() {
243 i += 1;
244 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
245 if let Err(e) =
246 tx.send(Message::new(msg.clone(), Some(result_tx)))
247 {
248 error!("Send message failed: {:?}", e);
249 drop(sub_tx);
250 return;
251 }
252 let result = match result_rx.await {
253 Ok(result) => result,
254 Err(e) => {
255 error!("Receive result failed: {:?}", e);
256 drop(sub_tx);
257 return;
258 }
259 };
260 let _ =
261 sub_tx.send(rmp_serde::from_slice::<<T as Actor>::Result>(
262 &result,
263 ));
264 tokio::time::sleep(interval).await;
265 if let Some(max_iter) = job.max_iter() {
266 if i >= max_iter {
267 drop(sub_tx);
268 return;
269 }
270 }
271 }
272 }
273 } else {
274 if job.start_at() <= std::time::SystemTime::now() {
275 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
276 let msg = match rmp_serde::to_vec(&msg) {
277 Ok(msg) => msg,
278 Err(e) => {
279 error!("Serialize message failed: {:?}", e);
280 drop(sub_tx);
281 return;
282 }
283 };
284 if let Err(e) = tx.send(Message::new(msg, Some(result_tx))) {
285 error!("Send message failed: {:?}", e);
286 return;
287 }
288 let result = match result_rx
289 .await
290 .map(|x| rmp_serde::from_slice::<<T as Actor>::Result>(&x))
291 {
292 Ok(result) => result,
293 Err(e) => {
294 error!("Receive result failed: {:?}", e);
295 drop(sub_tx);
296 return;
297 }
298 };
299 let _ = sub_tx.send(result);
300 }
301 }
302 });
303 Ok(Some(sub_rx))
304 } else {
305 let _ = tokio::spawn(async move {
306 let mut i = 0;
307 if let Some(interval) = job.interval() {
308 loop {
309 if job.start_at() <= std::time::SystemTime::now() {
310 i += 1;
311 if let Err(e) = tx.send(Message::new(msg.clone(), None)) {
312 error!("Send message failed: {:?}", e);
313 return;
314 }
315 tokio::time::sleep(interval).await;
316 if let Some(max_iter) = job.max_iter() {
317 if i >= max_iter {
318 return;
319 }
320 }
321 }
322 }
323 } else {
324 if job.start_at() <= std::time::SystemTime::now() {
325 let _ = tx.send(Message::new(msg.clone(), None));
326 }
327 }
328 });
329 Ok(None)
330 }
331 } else {
332 Err(ActorError::ActorNotReady(address))
333 }
334 } else {
335 Err(ActorError::AddressNotFound(address))
336 }
337 }
338
339 fn run(
340 &mut self,
341 handler_rx: tokio::sync::mpsc::UnboundedReceiver<ActorSystemCmd>,
342 ) -> tokio::task::JoinHandle<()> {
343 let handle = tokio::task::spawn_blocking(|| {
344 tokio::runtime::Handle::current().block_on(actor_system_loop(handler_rx))
345 });
346 handle
347 }
348}
349
350async fn actor_system_loop(mut handler_rx: tokio::sync::mpsc::UnboundedReceiver<ActorSystemCmd>) {
352 let mut address_list = HashSet::<String>::new();
353 let mut map = HashMap::<
354 String,
355 (
356 tokio::sync::mpsc::UnboundedSender<Message>,
357 tokio::sync::mpsc::UnboundedSender<()>,
358 tokio::sync::mpsc::UnboundedSender<()>,
359 LifeCycle,
360 ),
361 >::new();
362 while let Some(msg) = handler_rx.recv().await {
363 match msg {
364 ActorSystemCmd::Register(
365 address,
366 tx,
367 restart_tx,
368 kill_tx,
369 life_cycle,
370 result_tx,
371 is_restarted,
372 ) => {
373 debug!("Register actor with address {}", address);
374 if map.contains_key(&address) && !is_restarted {
375 let _ = result_tx.send(Err(ActorError::AddressAlreadyExist(address)));
376 continue;
377 }
378 map.insert(address.clone(), (tx, restart_tx, kill_tx, life_cycle));
379 address_list.insert(address);
380 let _ = result_tx.send(Ok(()));
381 }
382 ActorSystemCmd::Restart(address_regex) => {
383 debug!("Restart actor with address {}", address_regex);
384 let addresses = match filter_address(&address_list, &address_regex) {
385 Ok(addresses) => addresses,
386 Err(e) => {
387 error!("Filter address failed: {:?}", e);
388 continue;
389 }
390 };
391 for address in addresses {
392 if let Some((_tx, restart_tx, _kill_tx, _life_cycle)) = map.get(&address) {
393 let _ = restart_tx.send(());
394 }
395 }
396 }
397 ActorSystemCmd::Unregister(address_regex) => {
398 debug!("Unregister actor with address {}", address_regex);
399 let addresses = match filter_address(&address_list, &address_regex) {
400 Ok(addresses) => addresses,
401 Err(e) => {
402 error!("Filter address failed: {:?}", e);
403 continue;
404 }
405 };
406 for address in addresses {
407 match map.entry(address.to_string()) {
408 std::collections::hash_map::Entry::Occupied(mut entry) => {
409 let _ = entry.get_mut().2.send(());
410 entry.remove_entry();
411 address_list.remove(&address);
412 }
413 std::collections::hash_map::Entry::Vacant(_) => {
414 continue;
415 }
416 }
417 }
418 }
419 ActorSystemCmd::FilterAddress(address_regex, result_tx) => {
420 debug!("FilterAddress with regex {}", address_regex);
421 let addresses = match filter_address(&address_list, &address_regex) {
422 Ok(addresses) => addresses,
423 Err(e) => {
424 error!("Filter address failed: {:?}", e);
425 continue;
426 }
427 };
428 let _ = result_tx.send(addresses);
429 }
430 ActorSystemCmd::FindActor(address, result_tx) => {
431 debug!("FindActor with address {}", address);
432 if let Some((tx, _restart_tx, _kill_tx, life_cycle)) = map.get(&address) {
433 let _ = result_tx.send(Some((
434 tx.clone(),
435 match life_cycle {
436 LifeCycle::Receiving => true,
437 _ => false,
438 },
439 )));
440 } else {
441 let _ = result_tx.send(None);
442 }
443 }
444 ActorSystemCmd::SetLifeCycle(address, life_cycle) => {
445 debug!(
446 "SetLifecycle with address {} into {:?}",
447 address, life_cycle
448 );
449 if let Some(actor) = map.get_mut(&address) {
450 actor.3 = life_cycle;
451 };
452 }
453 };
454 }
455}
456fn filter_address(
460 address_list: &HashSet<String>,
461 regex: &str,
462) -> Result<Vec<String>, regex::Error> {
463 let regex = regex::Regex::new(&format!("^{}$", regex.replace("*", "(\\S+)"))).map_err(|e| {
464 error!("Regex error: {:?}", e);
465 e
466 })?;
467 Ok(address_list
468 .iter()
469 .filter(|x| regex.is_match(x))
470 .map(|x| x.to_string())
471 .collect())
472}
473