fp_rust/
actor.rs

1/*!
2In this module there're implementations & tests of `Actor`.
3*/
4
5use std::collections::HashMap;
6use std::sync::{
7    atomic::{AtomicBool, Ordering},
8    Arc, Mutex,
9};
10use std::thread;
11
12use super::common::{generate_id, UniqueId};
13use super::sync::{BlockingQueue, Queue};
14
15/**
16`Actor` defines concepts of `Actor`: Send/Receive Messages, States, Methods.
17
18# Arguments
19
20* `Msg` - The generic type of Message data
21* `ContextValue` - The generic type of ContextValue
22
23# Remarks
24
25It defines simple and practical hehaviors of `Actor` model.
26
27``
28*/
29pub trait Actor<Msg, ContextValue, HandleType, Functor>: UniqueId<String> {
30    fn receive(
31        &mut self,
32        this: &mut Self,
33        message: Msg,
34        context: &mut HashMap<String, ContextValue>,
35    );
36    fn spawn_with_handle(&self, func: Functor) -> HandleType;
37
38    fn get_handle(&self) -> HandleType;
39    fn get_handle_child(&self, name: impl Into<String>) -> Option<HandleType>;
40    fn get_handle_parent(&self) -> Option<HandleType>;
41
42    fn for_each_child(&self, func: impl FnMut(&String, &mut HandleType));
43}
44
45pub trait Handle<Msg>: UniqueId<String> {
46    fn send(&mut self, message: Msg);
47}
48
49#[derive(Debug, Clone)]
50pub struct HandleAsync<Msg>
51where
52    Msg: Send + 'static,
53{
54    id: String,
55    queue: BlockingQueue<Msg>,
56}
57
58impl<Msg> Handle<Msg> for HandleAsync<Msg>
59where
60    Msg: Send + 'static,
61{
62    fn send(&mut self, message: Msg) {
63        self.queue.offer(message);
64    }
65}
66impl<Msg> UniqueId<String> for HandleAsync<Msg>
67where
68    Msg: Send + 'static,
69{
70    fn get_id(&self) -> String {
71        self.id.clone()
72    }
73}
74
75// #[derive(Clone)]
76pub struct ActorAsync<Msg, ContextValue>
77where
78    Msg: Send + 'static,
79{
80    started_alive: Arc<Mutex<(AtomicBool, AtomicBool)>>,
81
82    id: String,
83    parent_handle: Option<HandleAsync<Msg>>,
84    children_handle_map: Arc<Mutex<HashMap<String, HandleAsync<Msg>>>>,
85
86    context: Arc<Mutex<Box<HashMap<String, ContextValue>>>>,
87    queue: BlockingQueue<Msg>,
88    effect: Arc<
89        Mutex<
90            dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
91                + Send
92                + Sync
93                + 'static,
94        >,
95    >,
96
97    join_handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
98}
99impl<Msg, ContextValue> Clone for ActorAsync<Msg, ContextValue>
100where
101    Msg: Clone + Send + 'static,
102{
103    fn clone(&self) -> Self {
104        Self {
105            started_alive: self.started_alive.clone(),
106
107            id: self.id.clone(),
108            parent_handle: self.parent_handle.clone(),
109            children_handle_map: self.children_handle_map.clone(),
110
111            context: self.context.clone(),
112            queue: self.queue.clone(),
113            effect: self.effect.clone(),
114            join_handle: self.join_handle.clone(),
115        }
116    }
117}
118
119impl<Msg, ContextValue> ActorAsync<Msg, ContextValue>
120where
121    Msg: Send + 'static,
122{
123    pub fn new(
124        effect: impl FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
125            + Send
126            + Sync
127            + 'static,
128    ) -> Self {
129        Self::new_with_options(effect, None, BlockingQueue::new())
130    }
131
132    pub fn new_with_options(
133        effect: impl FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
134            + Send
135            + Sync
136            + 'static,
137        parent_handle: Option<HandleAsync<Msg>>,
138        queue: BlockingQueue<Msg>,
139    ) -> Self {
140        Self {
141            queue,
142            parent_handle,
143            id: generate_id(),
144            children_handle_map: Arc::new(Mutex::new(HashMap::new())),
145            context: Arc::new(Mutex::new(Box::new(HashMap::new()))),
146            started_alive: Arc::new(Mutex::new((AtomicBool::new(false), AtomicBool::new(false)))),
147            join_handle: Arc::new(Mutex::new(None)),
148            effect: Arc::new(Mutex::new(effect)),
149        }
150    }
151
152    pub fn is_started(&mut self) -> bool {
153        let started_alive = self.started_alive.lock().unwrap();
154        let &(ref started, _) = &*started_alive;
155        started.load(Ordering::SeqCst)
156    }
157
158    pub fn is_alive(&mut self) -> bool {
159        let started_alive = self.started_alive.lock().unwrap();
160        let &(_, ref alive) = &*started_alive;
161        alive.load(Ordering::SeqCst)
162    }
163
164    pub fn stop(&mut self) {
165        {
166            let started_alive = self.started_alive.lock().unwrap();
167            let &(ref started, ref alive) = &*started_alive;
168
169            if !started.load(Ordering::SeqCst) {
170                return;
171            }
172            if !alive.load(Ordering::SeqCst) {
173                return;
174            }
175            alive.store(false, Ordering::SeqCst);
176        }
177
178        // NOTE: Kill thread <- OS depending
179        // let mut join_handle = self.join_handle.lock().unwrap();
180        // join_handle
181        //     .take()
182        //     .expect("Called stop on non-running thread")
183        //     .join()
184        //     .expect("Could not join spawned thread");
185    }
186}
187
188impl<Msg, ContextValue> ActorAsync<Msg, ContextValue>
189where
190    Msg: Clone + Send + 'static,
191    ContextValue: Send + 'static,
192{
193    pub fn start(&mut self) {
194        {
195            let started_alive = self.started_alive.lock().unwrap();
196            let &(ref started, ref alive) = &*started_alive;
197
198            if started.load(Ordering::SeqCst) {
199                return;
200            }
201            started.store(true, Ordering::SeqCst);
202            if alive.load(Ordering::SeqCst) {
203                return;
204            }
205            alive.store(true, Ordering::SeqCst);
206        }
207
208        let mut this = self.clone();
209        let mut this_for_receive = self.clone();
210        let this_for_context = self.clone();
211        let started_alive_thread = self.started_alive.clone();
212        self.join_handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
213            while {
214                let started_alive = started_alive_thread.lock().unwrap();
215                let &(_, ref alive) = &*started_alive;
216
217                alive.load(Ordering::SeqCst)
218            } {
219                let v = this.queue.take();
220
221                match v {
222                    Some(m) => {
223                        let mut context = this_for_context.context.lock().unwrap();
224                        this.receive(&mut this_for_receive, m, context.as_mut());
225                    }
226                    None => {
227                        let started_alive = started_alive_thread.lock().unwrap();
228                        let &(_, ref alive) = &*started_alive;
229
230                        alive.store(false, Ordering::SeqCst);
231                    }
232                }
233            }
234
235            this.stop();
236        }))));
237    }
238}
239
240impl<Msg, ContextValue> UniqueId<String> for ActorAsync<Msg, ContextValue>
241where
242    Msg: Send + 'static,
243{
244    fn get_id(&self) -> String {
245        self.id.clone()
246    }
247}
248
249impl<Msg, ContextValue>
250    Actor<
251        Msg,
252        ContextValue,
253        HandleAsync<Msg>,
254        Box<
255            dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
256                + Send
257                + Sync
258                + 'static,
259        >,
260    > for ActorAsync<Msg, ContextValue>
261where
262    Msg: Clone + Send + 'static,
263    ContextValue: Send + 'static,
264{
265    fn receive(
266        &mut self,
267        this: &mut Self,
268        message: Msg,
269        context: &mut HashMap<String, ContextValue>,
270    ) {
271        {
272            self.effect.lock().unwrap()(this, message, context);
273        }
274    }
275    fn spawn_with_handle(
276        &self,
277        func: Box<
278            dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
279                + Send
280                + Sync
281                + 'static,
282        >,
283    ) -> HandleAsync<Msg> {
284        let mut new_one = Self::new(func);
285        new_one.parent_handle = Some(self.get_handle());
286        {
287            self.children_handle_map
288                .lock()
289                .unwrap()
290                .insert(new_one.get_id(), new_one.get_handle());
291        }
292        new_one.start();
293        return new_one.get_handle();
294    }
295    fn get_handle(&self) -> HandleAsync<Msg> {
296        HandleAsync {
297            id: self.id.clone(),
298            queue: self.queue.clone(),
299        }
300    }
301    fn get_handle_child(&self, name: impl Into<String>) -> Option<HandleAsync<Msg>> {
302        match self.children_handle_map.lock().unwrap().get(&name.into()) {
303            Some(v) => Some(v.clone()),
304            None => None,
305        }
306    }
307    fn get_handle_parent(&self) -> Option<HandleAsync<Msg>> {
308        return self.parent_handle.clone();
309    }
310
311    fn for_each_child(&self, mut func: impl FnMut(&String, &mut HandleAsync<Msg>)) {
312        for (id, handle) in self.children_handle_map.lock().unwrap().iter_mut() {
313            func(id, handle);
314        }
315    }
316}
317
318#[test]
319fn test_actor_common() {
320    use std::time::Duration;
321
322    use super::common::LinkedListAsync;
323
324    #[derive(Clone, Debug)]
325    enum Value {
326        // Str(String),
327        Int(i32),
328        VecStr(Vec<String>),
329        Spawn,
330        Shutdown,
331    }
332
333    let result_i32 = LinkedListAsync::<i32>::new();
334    let result_i32_thread = result_i32.clone();
335    let result_string = LinkedListAsync::<Vec<String>>::new();
336    let result_string_thread = result_string.clone();
337    let mut root = ActorAsync::new(
338        move |this: &mut ActorAsync<_, _>, msg: Value, context: &mut HashMap<String, Value>| {
339            match msg {
340                Value::Spawn => {
341                    println!("Actor Spawn");
342                    let result_i32_thread = result_i32_thread.clone();
343                    let spawned = this.spawn_with_handle(Box::new(
344                        move |this: &mut ActorAsync<_, _>, msg: Value, _| {
345                            match msg {
346                                Value::Int(v) => {
347                                    println!("Actor Child Int");
348                                    result_i32_thread.push_back(v * 10);
349                                }
350                                Value::Shutdown => {
351                                    println!("Actor Child Shutdown");
352                                    this.stop();
353                                }
354                                _ => {}
355                            };
356                        },
357                    ));
358                    let list = context.get("children_ids").cloned();
359                    let mut list = match list {
360                        Some(Value::VecStr(list)) => list,
361                        _ => Vec::new(),
362                    };
363                    list.push(spawned.get_id());
364                    context.insert("children_ids".into(), Value::VecStr(list));
365                }
366                Value::Shutdown => {
367                    println!("Actor Shutdown");
368                    if let Some(Value::VecStr(ids)) = context.get("children_ids") {
369                        result_string_thread.push_back(ids.clone());
370                    }
371
372                    this.for_each_child(move |id, handle| {
373                        println!("Actor Shutdown id {:?}", id);
374                        handle.send(Value::Shutdown);
375                    });
376                    this.stop();
377                }
378                Value::Int(v) => {
379                    println!("Actor Int");
380                    if let Some(Value::VecStr(ids)) = context.get("children_ids") {
381                        for id in ids {
382                            println!("Actor Int id {:?}", id);
383                            if let Some(mut handle) = this.get_handle_child(id) {
384                                handle.send(Value::Int(v));
385                            }
386                        }
387                    }
388                }
389                _ => {}
390            }
391        },
392    );
393
394    let mut root_handle = root.get_handle();
395    root.start();
396
397    // One child
398    root_handle.send(Value::Spawn);
399    root_handle.send(Value::Int(10));
400    // Two children
401    root_handle.send(Value::Spawn);
402    root_handle.send(Value::Int(20));
403    // Three children
404    root_handle.send(Value::Spawn);
405    root_handle.send(Value::Int(30));
406
407    // Send Shutdown
408    root_handle.send(Value::Shutdown);
409
410    thread::sleep(Duration::from_millis(10));
411    // 3 children Actors
412    assert_eq!(3, result_string.pop_front().unwrap().len());
413
414    let mut v = Vec::<Option<i32>>::new();
415    for _ in 1..7 {
416        let i = result_i32.pop_front();
417        println!("Actor {:?}", i);
418        v.push(i);
419    }
420    v.sort();
421    assert_eq!(
422        [
423            Some(100),
424            Some(200),
425            Some(200),
426            Some(300),
427            Some(300),
428            Some(300)
429        ],
430        v.as_slice()
431    )
432}
433
434#[test]
435fn test_actor_ask() {
436    use std::time::Duration;
437
438    use super::common::LinkedListAsync;
439
440    #[derive(Clone, Debug)]
441    enum Value {
442        AskIntByLinkedListAsync((i32, LinkedListAsync<i32>)),
443        AskIntByBlockingQueue((i32, BlockingQueue<i32>)),
444    }
445
446    let mut root = ActorAsync::new(
447        move |_: &mut ActorAsync<_, _>, msg: Value, _: &mut HashMap<String, Value>| match msg {
448            Value::AskIntByLinkedListAsync(v) => {
449                println!("Actor AskIntByLinkedListAsync");
450                v.1.push_back(v.0 * 10);
451            }
452            Value::AskIntByBlockingQueue(mut v) => {
453                println!("Actor AskIntByBlockingQueue");
454
455                // NOTE If negative, hanging for testing timeout
456                if v.0 < 0 {
457                    return;
458                }
459
460                // NOTE General Cases
461                v.1.offer(v.0 * 10);
462            } // _ => {}
463        },
464    );
465
466    let mut root_handle = root.get_handle();
467    root.start();
468
469    // LinkedListAsync<i32>
470    let result_i32 = LinkedListAsync::<i32>::new();
471    root_handle.send(Value::AskIntByLinkedListAsync((1, result_i32.clone())));
472    root_handle.send(Value::AskIntByLinkedListAsync((2, result_i32.clone())));
473    root_handle.send(Value::AskIntByLinkedListAsync((3, result_i32.clone())));
474    thread::sleep(Duration::from_millis(5));
475    let i = result_i32.pop_front();
476    assert_eq!(Some(10), i);
477    let i = result_i32.pop_front();
478    assert_eq!(Some(20), i);
479    let i = result_i32.pop_front();
480    assert_eq!(Some(30), i);
481
482    // BlockingQueue<i32>
483    let mut result_i32 = BlockingQueue::<i32>::new();
484    result_i32.timeout = Some(Duration::from_millis(1));
485    root_handle.send(Value::AskIntByBlockingQueue((4, result_i32.clone())));
486    root_handle.send(Value::AskIntByBlockingQueue((5, result_i32.clone())));
487    root_handle.send(Value::AskIntByBlockingQueue((6, result_i32.clone())));
488    thread::sleep(Duration::from_millis(5));
489    let i = result_i32.take();
490    assert_eq!(Some(40), i);
491    let i = result_i32.take();
492    assert_eq!(Some(50), i);
493    let i = result_i32.take();
494    assert_eq!(Some(60), i);
495
496    // Timeout case:
497    root_handle.send(Value::AskIntByBlockingQueue((-1, result_i32.clone())));
498    let i = result_i32.take();
499    assert_eq!(None, i);
500}