xan_actor/
actor_system.rs

1use crate::{Actor, ActorError, JobSpec, LifeCycle, Message};
2use std::collections::{HashMap, HashSet};
3
4/// Commands for the ActorSystem to handle various operations
5/// You can send these commands to the ActorSystem's handler channel directly.
6pub enum ActorSystemCmd {
7    Register(
8        String,
9        tokio::sync::mpsc::UnboundedSender<Message>,
10        tokio::sync::mpsc::UnboundedSender<()>,
11        tokio::sync::mpsc::UnboundedSender<()>,
12        LifeCycle,
13        tokio::sync::oneshot::Sender<Result<(), ActorError>>,
14        bool,
15    ),
16    Restart(String),
17    Unregister(String),
18    FilterAddress(String, tokio::sync::oneshot::Sender<Vec<String>>),
19    FindActor(
20        String,
21        tokio::sync::oneshot::Sender<
22            Option<(tokio::sync::mpsc::UnboundedSender<Message>, bool)>, // tx, ready
23        >,
24    ),
25    SetLifeCycle(String, LifeCycle),
26}
27
28#[derive(Clone)]
29/// The ActorSystem is the main entry point for managing actors.
30/// It contains a handler channel to send commands to the actor system.
31/// It's clonable so that it can be shared across different parts of the application.
32pub struct ActorSystem {
33    handler_tx: tokio::sync::mpsc::UnboundedSender<ActorSystemCmd>,
34    pub blocking: bool,
35}
36
37impl Default for ActorSystem {
38    fn default() -> Self {
39        let (handler_tx, handler_rx) = tokio::sync::mpsc::unbounded_channel();
40        let mut me = Self {
41            handler_tx,
42            blocking: true,
43        };
44        me.run(handler_rx);
45        me
46    }
47}
48
49impl ActorSystem {
50    /// Creates a new ActorSystem instance
51    pub fn new(blocking: bool) -> Self {
52        let (handler_tx, handler_rx) = tokio::sync::mpsc::unbounded_channel();
53        let mut me = Self {
54            handler_tx,
55            blocking,
56        };
57        me.run(handler_rx);
58        me
59    }
60
61    /// Returns the handler channel sender for the ActorSystem.
62    /// You can use this to send commands to the ActorSystem directly.
63    pub fn handler_tx(&self) -> tokio::sync::mpsc::UnboundedSender<ActorSystemCmd> {
64        self.handler_tx.clone()
65    }
66
67    /// Filters the addresses of actors based on a regex pattern.
68    pub async fn filter_address(&mut self, address_regex: String) -> Vec<String> {
69        let (tx, rx) = tokio::sync::oneshot::channel();
70        let _ = self
71            .handler_tx
72            .send(ActorSystemCmd::FilterAddress(address_regex, tx));
73        match rx.await {
74            Ok(addresses) => addresses,
75            Err(e) => {
76                error!("Receive address list failed: {:?}", e);
77                Vec::new()
78            }
79        }
80    }
81
82    /// Registers an actor.
83    pub fn restart(&mut self, address_regex: String) {
84        let _ = self.handler_tx.send(ActorSystemCmd::Restart(address_regex));
85    }
86
87    /// Registers an actor.
88    pub fn unregister(&mut self, address_regex: String) {
89        let _ = self
90            .handler_tx
91            .send(ActorSystemCmd::Unregister(address_regex));
92    }
93
94    /// Send a message to a specific actor by its address.
95    /// It doesn't wait for the actor to be ready.
96    pub async fn send<T>(
97        &self,
98        address: String,
99        msg: <T as Actor>::Message,
100    ) -> Result<(), ActorError>
101    where
102        T: Actor,
103    {
104        let (tx, rx) = tokio::sync::oneshot::channel();
105        let _ = self
106            .handler_tx
107            .send(ActorSystemCmd::FindActor(address.clone(), tx));
108        if let Ok(Some((tx, ready))) = rx.await {
109            if ready {
110                let _ = tx.send(Message::new(rmp_serde::to_vec(&msg)?, None))?;
111                Ok(())
112            } else {
113                Err(ActorError::ActorNotReady(address))
114            }
115        } else {
116            Err(ActorError::AddressNotFound(address))
117        }
118    }
119
120    /// Sends a message to all actors that match the given address regex.
121    /// It returns a vector of results, success or error for each actor.
122    /// It doesn't returns results from the actors, only whether the message was sent successfully or not.
123    pub async fn send_broadcast<T>(
124        &self,
125        address_regex: String,
126        msg: <T as Actor>::Message,
127    ) -> Vec<Result<(), ActorError>>
128    where
129        T: Actor,
130    {
131        let (tx, rx) = tokio::sync::oneshot::channel();
132        let _ = self
133            .handler_tx
134            .send(ActorSystemCmd::FilterAddress(address_regex, tx));
135        let addresses = match rx.await {
136            Ok(addresses) => addresses,
137            Err(e) => {
138                error!("Receive address list failed: {:?}", e);
139                return vec![Err(ActorError::from(e))];
140            }
141        };
142        let mut result = Vec::new();
143        for address in addresses {
144            let (tx, rx) = tokio::sync::oneshot::channel();
145            let _ = self
146                .handler_tx
147                .send(ActorSystemCmd::FindActor(address.clone(), tx));
148            if let Ok(Some((tx, ready))) = rx.await {
149                if ready {
150                    match rmp_serde::to_vec(&msg) {
151                        Ok(x) => {
152                            let message = Message::new(x, None);
153                            result.push(
154                                tx.send(message)
155                                    .map(|_| ())
156                                    .map_err(|e| ActorError::UnboundedChannelSend(e)),
157                            );
158                        }
159                        Err(e) => {
160                            result.push(Err(ActorError::from(e)));
161                        }
162                    }
163                } else {
164                    result.push(Err(ActorError::ActorNotReady(address)));
165                }
166            } else {
167                result.push(Err(ActorError::AddressNotFound(address)));
168            }
169        }
170        result
171    }
172
173    /// Sends a message to a specific actor and waits for the result.
174    pub async fn send_and_recv<T>(
175        &self,
176        address: String,
177        msg: <T as Actor>::Message,
178    ) -> Result<<T as Actor>::Result, ActorError>
179    where
180        T: Actor,
181    {
182        let (tx, rx) = tokio::sync::oneshot::channel();
183        let _ = self
184            .handler_tx
185            .send(ActorSystemCmd::FindActor(address.clone(), tx));
186        if let Ok(Some((tx, ready))) = rx.await {
187            if ready {
188                let (result_tx, result_rx) = tokio::sync::oneshot::channel();
189                let _ = tx.send(Message::new(rmp_serde::to_vec(&msg)?, Some(result_tx)))?;
190                Ok(rmp_serde::from_slice::<<T as Actor>::Result>(
191                    &result_rx.await?,
192                )?)
193            } else {
194                Err(ActorError::ActorNotReady(address))
195            }
196        } else {
197            Err(ActorError::AddressNotFound(address))
198        }
199    }
200
201    /// Runs a job with the specified actor and message.
202    /// If you want to subscribe to the results, set `subscribe` to true.
203    /// It returns a receiver that you can use to receive the results.
204    pub async fn run_job<T>(
205        &self,
206        address: String,
207        subscribe: bool,
208        job: JobSpec,
209        msg: <T as Actor>::Message,
210    ) -> Result<
211        Option<
212            tokio::sync::mpsc::UnboundedReceiver<
213                Result<<T as Actor>::Result, rmp_serde::decode::Error>,
214            >,
215        >,
216        ActorError,
217    >
218    where
219        T: Actor,
220    {
221        let (tx, rx) = tokio::sync::oneshot::channel();
222        let msg = match rmp_serde::to_vec(&msg) {
223            Ok(msg) => msg,
224            Err(e) => {
225                error!("Serialize message failed: {:?}", e);
226                return Err(ActorError::from(e));
227            }
228        };
229        let _ = self
230            .handler_tx
231            .send(ActorSystemCmd::FindActor(address.clone(), tx));
232        if let Ok(Some((tx, ready))) = rx.await {
233            if ready {
234                let tx = tx.clone();
235                if subscribe {
236                    let (sub_tx, sub_rx) = tokio::sync::mpsc::unbounded_channel();
237                    let msg = msg.clone();
238                    let _ = tokio::spawn(async move {
239                        let mut i = 0;
240                        if let Some(interval) = job.interval() {
241                            loop {
242                                if job.start_at() <= std::time::SystemTime::now() {
243                                    i += 1;
244                                    let (result_tx, result_rx) = tokio::sync::oneshot::channel();
245                                    if let Err(e) =
246                                        tx.send(Message::new(msg.clone(), Some(result_tx)))
247                                    {
248                                        error!("Send message failed: {:?}", e);
249                                        drop(sub_tx);
250                                        return;
251                                    }
252                                    let result = match result_rx.await {
253                                        Ok(result) => result,
254                                        Err(e) => {
255                                            error!("Receive result failed: {:?}", e);
256                                            drop(sub_tx);
257                                            return;
258                                        }
259                                    };
260                                    let _ =
261                                        sub_tx.send(rmp_serde::from_slice::<<T as Actor>::Result>(
262                                            &result,
263                                        ));
264                                    tokio::time::sleep(interval).await;
265                                    if let Some(max_iter) = job.max_iter() {
266                                        if i >= max_iter {
267                                            drop(sub_tx);
268                                            return;
269                                        }
270                                    }
271                                }
272                            }
273                        } else {
274                            if job.start_at() <= std::time::SystemTime::now() {
275                                let (result_tx, result_rx) = tokio::sync::oneshot::channel();
276                                let msg = match rmp_serde::to_vec(&msg) {
277                                    Ok(msg) => msg,
278                                    Err(e) => {
279                                        error!("Serialize message failed: {:?}", e);
280                                        drop(sub_tx);
281                                        return;
282                                    }
283                                };
284                                if let Err(e) = tx.send(Message::new(msg, Some(result_tx))) {
285                                    error!("Send message failed: {:?}", e);
286                                    return;
287                                }
288                                let result = match result_rx
289                                    .await
290                                    .map(|x| rmp_serde::from_slice::<<T as Actor>::Result>(&x))
291                                {
292                                    Ok(result) => result,
293                                    Err(e) => {
294                                        error!("Receive result failed: {:?}", e);
295                                        drop(sub_tx);
296                                        return;
297                                    }
298                                };
299                                let _ = sub_tx.send(result);
300                            }
301                        }
302                    });
303                    Ok(Some(sub_rx))
304                } else {
305                    let _ = tokio::spawn(async move {
306                        let mut i = 0;
307                        if let Some(interval) = job.interval() {
308                            loop {
309                                if job.start_at() <= std::time::SystemTime::now() {
310                                    i += 1;
311                                    if let Err(e) = tx.send(Message::new(msg.clone(), None)) {
312                                        error!("Send message failed: {:?}", e);
313                                        return;
314                                    }
315                                    tokio::time::sleep(interval).await;
316                                    if let Some(max_iter) = job.max_iter() {
317                                        if i >= max_iter {
318                                            return;
319                                        }
320                                    }
321                                }
322                            }
323                        } else {
324                            if job.start_at() <= std::time::SystemTime::now() {
325                                let _ = tx.send(Message::new(msg.clone(), None));
326                            }
327                        }
328                    });
329                    Ok(None)
330                }
331            } else {
332                Err(ActorError::ActorNotReady(address))
333            }
334        } else {
335            Err(ActorError::AddressNotFound(address))
336        }
337    }
338
339    fn run(
340        &mut self,
341        handler_rx: tokio::sync::mpsc::UnboundedReceiver<ActorSystemCmd>,
342    ) -> tokio::task::JoinHandle<()> {
343        let handle = tokio::task::spawn_blocking(|| {
344            tokio::runtime::Handle::current().block_on(actor_system_loop(handler_rx))
345        });
346        handle
347    }
348}
349
350// {{{ fn actor_system_loop
351async fn actor_system_loop(mut handler_rx: tokio::sync::mpsc::UnboundedReceiver<ActorSystemCmd>) {
352    let mut address_list = HashSet::<String>::new();
353    let mut map = HashMap::<
354        String,
355        (
356            tokio::sync::mpsc::UnboundedSender<Message>,
357            tokio::sync::mpsc::UnboundedSender<()>,
358            tokio::sync::mpsc::UnboundedSender<()>,
359            LifeCycle,
360        ),
361    >::new();
362    while let Some(msg) = handler_rx.recv().await {
363        match msg {
364            ActorSystemCmd::Register(
365                address,
366                tx,
367                restart_tx,
368                kill_tx,
369                life_cycle,
370                result_tx,
371                is_restarted,
372            ) => {
373                debug!("Register actor with address {}", address);
374                if map.contains_key(&address) && !is_restarted {
375                    let _ = result_tx.send(Err(ActorError::AddressAlreadyExist(address)));
376                    continue;
377                }
378                map.insert(address.clone(), (tx, restart_tx, kill_tx, life_cycle));
379                address_list.insert(address);
380                let _ = result_tx.send(Ok(()));
381            }
382            ActorSystemCmd::Restart(address_regex) => {
383                debug!("Restart actor with address {}", address_regex);
384                let addresses = match filter_address(&address_list, &address_regex) {
385                    Ok(addresses) => addresses,
386                    Err(e) => {
387                        error!("Filter address failed: {:?}", e);
388                        continue;
389                    }
390                };
391                for address in addresses {
392                    if let Some((_tx, restart_tx, _kill_tx, _life_cycle)) = map.get(&address) {
393                        let _ = restart_tx.send(());
394                    }
395                }
396            }
397            ActorSystemCmd::Unregister(address_regex) => {
398                debug!("Unregister actor with address {}", address_regex);
399                let addresses = match filter_address(&address_list, &address_regex) {
400                    Ok(addresses) => addresses,
401                    Err(e) => {
402                        error!("Filter address failed: {:?}", e);
403                        continue;
404                    }
405                };
406                for address in addresses {
407                    match map.entry(address.to_string()) {
408                        std::collections::hash_map::Entry::Occupied(mut entry) => {
409                            let _ = entry.get_mut().2.send(());
410                            entry.remove_entry();
411                            address_list.remove(&address);
412                        }
413                        std::collections::hash_map::Entry::Vacant(_) => {
414                            continue;
415                        }
416                    }
417                }
418            }
419            ActorSystemCmd::FilterAddress(address_regex, result_tx) => {
420                debug!("FilterAddress with regex {}", address_regex);
421                let addresses = match filter_address(&address_list, &address_regex) {
422                    Ok(addresses) => addresses,
423                    Err(e) => {
424                        error!("Filter address failed: {:?}", e);
425                        continue;
426                    }
427                };
428                let _ = result_tx.send(addresses);
429            }
430            ActorSystemCmd::FindActor(address, result_tx) => {
431                debug!("FindActor with address {}", address);
432                if let Some((tx, _restart_tx, _kill_tx, life_cycle)) = map.get(&address) {
433                    let _ = result_tx.send(Some((
434                        tx.clone(),
435                        match life_cycle {
436                            LifeCycle::Receiving => true,
437                            _ => false,
438                        },
439                    )));
440                } else {
441                    let _ = result_tx.send(None);
442                }
443            }
444            ActorSystemCmd::SetLifeCycle(address, life_cycle) => {
445                debug!(
446                    "SetLifecycle with address {} into {:?}",
447                    address, life_cycle
448                );
449                if let Some(actor) = map.get_mut(&address) {
450                    actor.3 = life_cycle;
451                };
452            }
453        };
454    }
455}
456// }}}
457
458// {{{ fn filter_address
459fn filter_address(
460    address_list: &HashSet<String>,
461    regex: &str,
462) -> Result<Vec<String>, regex::Error> {
463    let regex = regex::Regex::new(&format!("^{}$", regex.replace("*", "(\\S+)"))).map_err(|e| {
464        error!("Regex error: {:?}", e);
465        e
466    })?;
467    Ok(address_list
468        .iter()
469        .filter(|x| regex.is_match(x))
470        .map(|x| x.to_string())
471        .collect())
472}
473// }}}