tasks_framework/
single_queue_actor.rs

1use lock_free_stack::lock_free_stack::Stack;
2use actor_runner::ActorRunner;
3
4use std::sync::Arc;
5use std::marker::Sync;
6use threadpool::ThreadPool;
7use runnable::Runnable;
8
9pub trait MessageProcessor<T> {
10    fn process_message(&self, message: T);
11}
12
13struct ActorImpl<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> {
14    message_processor: Arc<M>,
15    queue: Arc<Stack<T>>,
16}
17
18impl<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> Runnable for ActorImpl<T, M> {
19    fn run(&self) {
20        for message in self.queue.remove_all() {
21            self.message_processor.process_message(message);
22        }
23    }
24}
25
26pub struct SingleQueueActor<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> {
27    queue: Arc<Stack<T>>,
28    actor_runner: ActorRunner<ActorImpl<T, M>>
29}
30
31impl<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> SingleQueueActor<T, M> {
32    pub fn new( message_processor: Arc<M>, execution_pool: Arc<ThreadPool> ) -> SingleQueueActor<T, M> {
33        let queue_arc = Arc::new(Stack::new());
34        let actor_impl = ActorImpl {
35            message_processor: message_processor,
36            queue: queue_arc.clone()
37        };
38
39        SingleQueueActor {
40            queue: queue_arc,
41            actor_runner: ActorRunner::new(actor_impl, execution_pool)
42        }
43    }
44
45    pub fn get_queue_size(&self) -> usize {
46        self.queue.size()
47    }
48
49    pub fn add_message(&self, message: T) {
50        self.queue.add(message);
51        self.actor_runner.schedule();
52    }
53
54    pub fn complete(&self) {
55        self.actor_runner.complete();
56    }
57}