tasks_framework/
actor_runner.rs1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Barrier};
3use std::marker::Sync;
4use threadpool::ThreadPool;
5
6use tasks::Tasks;
7use runnable::Runnable;
8
9pub struct ActorRunner<T: Runnable + Send + Sync + 'static> {
10 tasks: Arc<Tasks>,
11 shutdown: Arc<AtomicBool>,
12 complete_status: Arc<Barrier>,
13
14 actor: Arc<T>,
15 execution_pool: Arc<ThreadPool>
16}
17
18impl<T: Runnable + Send + Sync + 'static> ActorRunner<T> {
19 pub fn new( actor: T, execution_pool: Arc<ThreadPool> ) -> ActorRunner<T> {
20 ActorRunner {
21 tasks: Arc::new(Tasks::new()),
22 shutdown: Arc::new(AtomicBool::new(false)),
23 complete_status: Arc::new(Barrier::new(2)),
24
25 actor: Arc::new(actor),
26 execution_pool: execution_pool
27 }
28 }
29
30 pub fn schedule(&self) {
31 if self.tasks.add_task() {
32 let tasks_ref = self.tasks.clone();
33 let actor_ref = self.actor.clone();
34 let shutdown_ref = self.shutdown.clone();
35 let complete_status_ref = self.complete_status.clone();
36
37 self.execution_pool.execute(move || {
38 while tasks_ref.fetch_task() {
39 actor_ref.run();
40
41 if shutdown_ref.load(Ordering::Relaxed) {
42 complete_status_ref.wait();
43 }
44 }
45 })
46 }
47 }
48
49 pub fn complete(&self) {
50 self.shutdown.store(true, Ordering::Relaxed);
51 self.schedule();
52
53 self.complete_status.wait();
54 }
55}
56
57impl<T: Runnable + Send + Sync + 'static> Drop for ActorRunner<T> {
58 fn drop(&mut self) {
59 self.complete();
60 }
61}