tasks_framework/
single_queue_actor.rs1use lock_free_stack::lock_free_stack::Stack;
2use actor_runner::ActorRunner;
3
4use std::sync::Arc;
5use std::marker::Sync;
6use threadpool::ThreadPool;
7use runnable::Runnable;
8
9pub trait MessageProcessor<T> {
10 fn process_message(&self, message: T);
11}
12
13struct ActorImpl<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> {
14 message_processor: Arc<M>,
15 queue: Arc<Stack<T>>,
16}
17
18impl<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> Runnable for ActorImpl<T, M> {
19 fn run(&self) {
20 for message in self.queue.remove_all() {
21 self.message_processor.process_message(message);
22 }
23 }
24}
25
26pub struct SingleQueueActor<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> {
27 queue: Arc<Stack<T>>,
28 actor_runner: ActorRunner<ActorImpl<T, M>>
29}
30
31impl<T: 'static, M: MessageProcessor<T> + Send + Sync + 'static> SingleQueueActor<T, M> {
32 pub fn new( message_processor: Arc<M>, execution_pool: Arc<ThreadPool> ) -> SingleQueueActor<T, M> {
33 let queue_arc = Arc::new(Stack::new());
34 let actor_impl = ActorImpl {
35 message_processor: message_processor,
36 queue: queue_arc.clone()
37 };
38
39 SingleQueueActor {
40 queue: queue_arc,
41 actor_runner: ActorRunner::new(actor_impl, execution_pool)
42 }
43 }
44
45 pub fn get_queue_size(&self) -> usize {
46 self.queue.size()
47 }
48
49 pub fn add_message(&self, message: T) {
50 self.queue.add(message);
51 self.actor_runner.schedule();
52 }
53
54 pub fn complete(&self) {
55 self.actor_runner.complete();
56 }
57}