tasks_framework/
actor_runner.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Barrier};
3use std::marker::Sync;
4use threadpool::ThreadPool;
5
6use tasks::Tasks;
7use runnable::Runnable;
8
9pub struct ActorRunner<T: Runnable + Send + Sync + 'static> {
10    tasks: Arc<Tasks>,
11    shutdown: Arc<AtomicBool>,
12    complete_status: Arc<Barrier>,
13
14    actor: Arc<T>,
15    execution_pool: Arc<ThreadPool>
16}
17
18impl<T: Runnable + Send + Sync + 'static> ActorRunner<T> {
19    pub fn new( actor: T, execution_pool: Arc<ThreadPool> ) -> ActorRunner<T> {
20        ActorRunner {
21            tasks: Arc::new(Tasks::new()),
22            shutdown: Arc::new(AtomicBool::new(false)),
23            complete_status: Arc::new(Barrier::new(2)),
24
25            actor: Arc::new(actor),
26            execution_pool: execution_pool
27        }
28    }
29
30    pub fn schedule(&self) {
31        if self.tasks.add_task() {
32            let tasks_ref = self.tasks.clone();
33            let actor_ref = self.actor.clone();
34            let shutdown_ref = self.shutdown.clone();
35            let complete_status_ref = self.complete_status.clone();
36
37            self.execution_pool.execute(move || {
38                while tasks_ref.fetch_task() {
39                    actor_ref.run();
40
41                    if shutdown_ref.load(Ordering::Relaxed) {
42                        complete_status_ref.wait();
43                    }
44                }
45            })
46        }
47    }
48
49    pub fn complete(&self) {
50        self.shutdown.store(true, Ordering::Relaxed);
51        self.schedule();
52
53        self.complete_status.wait();
54    }
55}
56
57impl<T: Runnable + Send + Sync + 'static> Drop for ActorRunner<T> {
58    fn drop(&mut self) {
59        self.complete();
60    }
61}