1use std::collections::HashMap;
6use std::sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc, Mutex,
9};
10use std::thread;
11
12use super::common::{generate_id, UniqueId};
13use super::sync::{BlockingQueue, Queue};
14
15pub trait Actor<Msg, ContextValue, HandleType, Functor>: UniqueId<String> {
30 fn receive(
31 &mut self,
32 this: &mut Self,
33 message: Msg,
34 context: &mut HashMap<String, ContextValue>,
35 );
36 fn spawn_with_handle(&self, func: Functor) -> HandleType;
37
38 fn get_handle(&self) -> HandleType;
39 fn get_handle_child(&self, name: impl Into<String>) -> Option<HandleType>;
40 fn get_handle_parent(&self) -> Option<HandleType>;
41
42 fn for_each_child(&self, func: impl FnMut(&String, &mut HandleType));
43}
44
45pub trait Handle<Msg>: UniqueId<String> {
46 fn send(&mut self, message: Msg);
47}
48
49#[derive(Debug, Clone)]
50pub struct HandleAsync<Msg>
51where
52 Msg: Send + 'static,
53{
54 id: String,
55 queue: BlockingQueue<Msg>,
56}
57
58impl<Msg> Handle<Msg> for HandleAsync<Msg>
59where
60 Msg: Send + 'static,
61{
62 fn send(&mut self, message: Msg) {
63 self.queue.offer(message);
64 }
65}
66impl<Msg> UniqueId<String> for HandleAsync<Msg>
67where
68 Msg: Send + 'static,
69{
70 fn get_id(&self) -> String {
71 self.id.clone()
72 }
73}
74
75pub struct ActorAsync<Msg, ContextValue>
77where
78 Msg: Send + 'static,
79{
80 started_alive: Arc<Mutex<(AtomicBool, AtomicBool)>>,
81
82 id: String,
83 parent_handle: Option<HandleAsync<Msg>>,
84 children_handle_map: Arc<Mutex<HashMap<String, HandleAsync<Msg>>>>,
85
86 context: Arc<Mutex<Box<HashMap<String, ContextValue>>>>,
87 queue: BlockingQueue<Msg>,
88 effect: Arc<
89 Mutex<
90 dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
91 + Send
92 + Sync
93 + 'static,
94 >,
95 >,
96
97 join_handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
98}
99impl<Msg, ContextValue> Clone for ActorAsync<Msg, ContextValue>
100where
101 Msg: Clone + Send + 'static,
102{
103 fn clone(&self) -> Self {
104 Self {
105 started_alive: self.started_alive.clone(),
106
107 id: self.id.clone(),
108 parent_handle: self.parent_handle.clone(),
109 children_handle_map: self.children_handle_map.clone(),
110
111 context: self.context.clone(),
112 queue: self.queue.clone(),
113 effect: self.effect.clone(),
114 join_handle: self.join_handle.clone(),
115 }
116 }
117}
118
119impl<Msg, ContextValue> ActorAsync<Msg, ContextValue>
120where
121 Msg: Send + 'static,
122{
123 pub fn new(
124 effect: impl FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
125 + Send
126 + Sync
127 + 'static,
128 ) -> Self {
129 Self::new_with_options(effect, None, BlockingQueue::new())
130 }
131
132 pub fn new_with_options(
133 effect: impl FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
134 + Send
135 + Sync
136 + 'static,
137 parent_handle: Option<HandleAsync<Msg>>,
138 queue: BlockingQueue<Msg>,
139 ) -> Self {
140 Self {
141 queue,
142 parent_handle,
143 id: generate_id(),
144 children_handle_map: Arc::new(Mutex::new(HashMap::new())),
145 context: Arc::new(Mutex::new(Box::new(HashMap::new()))),
146 started_alive: Arc::new(Mutex::new((AtomicBool::new(false), AtomicBool::new(false)))),
147 join_handle: Arc::new(Mutex::new(None)),
148 effect: Arc::new(Mutex::new(effect)),
149 }
150 }
151
152 pub fn is_started(&mut self) -> bool {
153 let started_alive = self.started_alive.lock().unwrap();
154 let &(ref started, _) = &*started_alive;
155 started.load(Ordering::SeqCst)
156 }
157
158 pub fn is_alive(&mut self) -> bool {
159 let started_alive = self.started_alive.lock().unwrap();
160 let &(_, ref alive) = &*started_alive;
161 alive.load(Ordering::SeqCst)
162 }
163
164 pub fn stop(&mut self) {
165 {
166 let started_alive = self.started_alive.lock().unwrap();
167 let &(ref started, ref alive) = &*started_alive;
168
169 if !started.load(Ordering::SeqCst) {
170 return;
171 }
172 if !alive.load(Ordering::SeqCst) {
173 return;
174 }
175 alive.store(false, Ordering::SeqCst);
176 }
177
178 }
186}
187
188impl<Msg, ContextValue> ActorAsync<Msg, ContextValue>
189where
190 Msg: Clone + Send + 'static,
191 ContextValue: Send + 'static,
192{
193 pub fn start(&mut self) {
194 {
195 let started_alive = self.started_alive.lock().unwrap();
196 let &(ref started, ref alive) = &*started_alive;
197
198 if started.load(Ordering::SeqCst) {
199 return;
200 }
201 started.store(true, Ordering::SeqCst);
202 if alive.load(Ordering::SeqCst) {
203 return;
204 }
205 alive.store(true, Ordering::SeqCst);
206 }
207
208 let mut this = self.clone();
209 let mut this_for_receive = self.clone();
210 let this_for_context = self.clone();
211 let started_alive_thread = self.started_alive.clone();
212 self.join_handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
213 while {
214 let started_alive = started_alive_thread.lock().unwrap();
215 let &(_, ref alive) = &*started_alive;
216
217 alive.load(Ordering::SeqCst)
218 } {
219 let v = this.queue.take();
220
221 match v {
222 Some(m) => {
223 let mut context = this_for_context.context.lock().unwrap();
224 this.receive(&mut this_for_receive, m, context.as_mut());
225 }
226 None => {
227 let started_alive = started_alive_thread.lock().unwrap();
228 let &(_, ref alive) = &*started_alive;
229
230 alive.store(false, Ordering::SeqCst);
231 }
232 }
233 }
234
235 this.stop();
236 }))));
237 }
238}
239
240impl<Msg, ContextValue> UniqueId<String> for ActorAsync<Msg, ContextValue>
241where
242 Msg: Send + 'static,
243{
244 fn get_id(&self) -> String {
245 self.id.clone()
246 }
247}
248
249impl<Msg, ContextValue>
250 Actor<
251 Msg,
252 ContextValue,
253 HandleAsync<Msg>,
254 Box<
255 dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
256 + Send
257 + Sync
258 + 'static,
259 >,
260 > for ActorAsync<Msg, ContextValue>
261where
262 Msg: Clone + Send + 'static,
263 ContextValue: Send + 'static,
264{
265 fn receive(
266 &mut self,
267 this: &mut Self,
268 message: Msg,
269 context: &mut HashMap<String, ContextValue>,
270 ) {
271 {
272 self.effect.lock().unwrap()(this, message, context);
273 }
274 }
275 fn spawn_with_handle(
276 &self,
277 func: Box<
278 dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
279 + Send
280 + Sync
281 + 'static,
282 >,
283 ) -> HandleAsync<Msg> {
284 let mut new_one = Self::new(func);
285 new_one.parent_handle = Some(self.get_handle());
286 {
287 self.children_handle_map
288 .lock()
289 .unwrap()
290 .insert(new_one.get_id(), new_one.get_handle());
291 }
292 new_one.start();
293 return new_one.get_handle();
294 }
295 fn get_handle(&self) -> HandleAsync<Msg> {
296 HandleAsync {
297 id: self.id.clone(),
298 queue: self.queue.clone(),
299 }
300 }
301 fn get_handle_child(&self, name: impl Into<String>) -> Option<HandleAsync<Msg>> {
302 match self.children_handle_map.lock().unwrap().get(&name.into()) {
303 Some(v) => Some(v.clone()),
304 None => None,
305 }
306 }
307 fn get_handle_parent(&self) -> Option<HandleAsync<Msg>> {
308 return self.parent_handle.clone();
309 }
310
311 fn for_each_child(&self, mut func: impl FnMut(&String, &mut HandleAsync<Msg>)) {
312 for (id, handle) in self.children_handle_map.lock().unwrap().iter_mut() {
313 func(id, handle);
314 }
315 }
316}
317
318#[test]
319fn test_actor_common() {
320 use std::time::Duration;
321
322 use super::common::LinkedListAsync;
323
324 #[derive(Clone, Debug)]
325 enum Value {
326 Int(i32),
328 VecStr(Vec<String>),
329 Spawn,
330 Shutdown,
331 }
332
333 let result_i32 = LinkedListAsync::<i32>::new();
334 let result_i32_thread = result_i32.clone();
335 let result_string = LinkedListAsync::<Vec<String>>::new();
336 let result_string_thread = result_string.clone();
337 let mut root = ActorAsync::new(
338 move |this: &mut ActorAsync<_, _>, msg: Value, context: &mut HashMap<String, Value>| {
339 match msg {
340 Value::Spawn => {
341 println!("Actor Spawn");
342 let result_i32_thread = result_i32_thread.clone();
343 let spawned = this.spawn_with_handle(Box::new(
344 move |this: &mut ActorAsync<_, _>, msg: Value, _| {
345 match msg {
346 Value::Int(v) => {
347 println!("Actor Child Int");
348 result_i32_thread.push_back(v * 10);
349 }
350 Value::Shutdown => {
351 println!("Actor Child Shutdown");
352 this.stop();
353 }
354 _ => {}
355 };
356 },
357 ));
358 let list = context.get("children_ids").cloned();
359 let mut list = match list {
360 Some(Value::VecStr(list)) => list,
361 _ => Vec::new(),
362 };
363 list.push(spawned.get_id());
364 context.insert("children_ids".into(), Value::VecStr(list));
365 }
366 Value::Shutdown => {
367 println!("Actor Shutdown");
368 if let Some(Value::VecStr(ids)) = context.get("children_ids") {
369 result_string_thread.push_back(ids.clone());
370 }
371
372 this.for_each_child(move |id, handle| {
373 println!("Actor Shutdown id {:?}", id);
374 handle.send(Value::Shutdown);
375 });
376 this.stop();
377 }
378 Value::Int(v) => {
379 println!("Actor Int");
380 if let Some(Value::VecStr(ids)) = context.get("children_ids") {
381 for id in ids {
382 println!("Actor Int id {:?}", id);
383 if let Some(mut handle) = this.get_handle_child(id) {
384 handle.send(Value::Int(v));
385 }
386 }
387 }
388 }
389 _ => {}
390 }
391 },
392 );
393
394 let mut root_handle = root.get_handle();
395 root.start();
396
397 root_handle.send(Value::Spawn);
399 root_handle.send(Value::Int(10));
400 root_handle.send(Value::Spawn);
402 root_handle.send(Value::Int(20));
403 root_handle.send(Value::Spawn);
405 root_handle.send(Value::Int(30));
406
407 root_handle.send(Value::Shutdown);
409
410 thread::sleep(Duration::from_millis(10));
411 assert_eq!(3, result_string.pop_front().unwrap().len());
413
414 let mut v = Vec::<Option<i32>>::new();
415 for _ in 1..7 {
416 let i = result_i32.pop_front();
417 println!("Actor {:?}", i);
418 v.push(i);
419 }
420 v.sort();
421 assert_eq!(
422 [
423 Some(100),
424 Some(200),
425 Some(200),
426 Some(300),
427 Some(300),
428 Some(300)
429 ],
430 v.as_slice()
431 )
432}
433
434#[test]
435fn test_actor_ask() {
436 use std::time::Duration;
437
438 use super::common::LinkedListAsync;
439
440 #[derive(Clone, Debug)]
441 enum Value {
442 AskIntByLinkedListAsync((i32, LinkedListAsync<i32>)),
443 AskIntByBlockingQueue((i32, BlockingQueue<i32>)),
444 }
445
446 let mut root = ActorAsync::new(
447 move |_: &mut ActorAsync<_, _>, msg: Value, _: &mut HashMap<String, Value>| match msg {
448 Value::AskIntByLinkedListAsync(v) => {
449 println!("Actor AskIntByLinkedListAsync");
450 v.1.push_back(v.0 * 10);
451 }
452 Value::AskIntByBlockingQueue(mut v) => {
453 println!("Actor AskIntByBlockingQueue");
454
455 if v.0 < 0 {
457 return;
458 }
459
460 v.1.offer(v.0 * 10);
462 } },
464 );
465
466 let mut root_handle = root.get_handle();
467 root.start();
468
469 let result_i32 = LinkedListAsync::<i32>::new();
471 root_handle.send(Value::AskIntByLinkedListAsync((1, result_i32.clone())));
472 root_handle.send(Value::AskIntByLinkedListAsync((2, result_i32.clone())));
473 root_handle.send(Value::AskIntByLinkedListAsync((3, result_i32.clone())));
474 thread::sleep(Duration::from_millis(5));
475 let i = result_i32.pop_front();
476 assert_eq!(Some(10), i);
477 let i = result_i32.pop_front();
478 assert_eq!(Some(20), i);
479 let i = result_i32.pop_front();
480 assert_eq!(Some(30), i);
481
482 let mut result_i32 = BlockingQueue::<i32>::new();
484 result_i32.timeout = Some(Duration::from_millis(1));
485 root_handle.send(Value::AskIntByBlockingQueue((4, result_i32.clone())));
486 root_handle.send(Value::AskIntByBlockingQueue((5, result_i32.clone())));
487 root_handle.send(Value::AskIntByBlockingQueue((6, result_i32.clone())));
488 thread::sleep(Duration::from_millis(5));
489 let i = result_i32.take();
490 assert_eq!(Some(40), i);
491 let i = result_i32.take();
492 assert_eq!(Some(50), i);
493 let i = result_i32.take();
494 assert_eq!(Some(60), i);
495
496 root_handle.send(Value::AskIntByBlockingQueue((-1, result_i32.clone())));
498 let i = result_i32.take();
499 assert_eq!(None, i);
500}