rtactor/
mpsc_dispatcher.rs

1//! Dispatcher implementation based on std::sync::mpsc::sync_channel.
2//!
3//! A Builder is used to construct the dispatcher and it allows to use
4//! this class with different way of starting a thread (like with the
5//! thread-priority crate or in the future with RTOS).
6//! ```
7//! let builder = rtactor::mpsc_dispatcher::Builder::new(10);
8//! let mut accessor = builder.to_accessor();
9//! std::thread::spawn(move || builder.build().process());
10//! accessor.stop_dispatcher(std::time::Duration::from_secs(1)).unwrap();
11//! ```
12
13use crate::actor::{self, ActorId, NonBoxedErrorStatus};
14use crate::dispatcher::Dispatcher;
15use crate::reactive::{
16    InstantSource, InternalInstant, MessageAndDstId, ReactiveAddr, TimeoutScheduler,
17};
18use crate::{dispatcher, Addr, Behavior, Instant, Message, ProcessContext};
19
20use std::ops::ControlFlow;
21use std::sync::mpsc;
22use std::time::Duration;
23use std::vec::Vec;
24use std::{thread, time};
25
26/// An object that register `reactive::Behavior` and dispatch messages for them.
27///
28/// The proper way to construct it is to use Builder.
29pub struct MpscDispatcher {
30    disp_actor_id: ActorId,
31    rx: mpsc::Receiver<MessageAndDstId>,
32    pub(crate) tx: mpsc::SyncSender<MessageAndDstId>,
33    reactive_list: Vec<(ActorId, Box<dyn Behavior>)>,
34}
35
36// Allows to build a MpscDispatcher.
37pub struct Builder {
38    disp_actor_id: ActorId,
39    rx: mpsc::Receiver<MessageAndDstId>,
40    tx: mpsc::SyncSender<MessageAndDstId>,
41}
42
43impl Builder {
44    pub fn new(queue_size: usize) -> Builder {
45        let (tx, rx) = std::sync::mpsc::sync_channel::<MessageAndDstId>(queue_size);
46        Builder {
47            disp_actor_id: actor::generate_actor_id(),
48            rx,
49            tx,
50        }
51    }
52
53    pub fn dispatcher_addr(&self) -> Addr {
54        ReactiveAddr::new(self.tx.clone(), self.disp_actor_id).into_addr()
55    }
56
57    fn into_parts(
58        self,
59    ) -> (
60        ActorId,
61        mpsc::Receiver<MessageAndDstId>,
62        mpsc::SyncSender<MessageAndDstId>,
63    ) {
64        (self.disp_actor_id, self.rx, self.tx)
65    }
66
67    pub fn to_accessor(&self) -> dispatcher::SyncAccessor {
68        dispatcher::SyncAccessor::new(&self.dispatcher_addr())
69    }
70
71    pub fn build(self) -> MpscDispatcher {
72        let (disp_actor_id, rx, tx) = self.into_parts();
73        MpscDispatcher {
74            disp_actor_id,
75            rx,
76            tx,
77            reactive_list: Vec::new(),
78        }
79    }
80}
81
82impl MpscDispatcher {
83    /// Process messages until dispatcher::Request::StopDispatcher is received.
84    pub fn process(&mut self) {
85        let instant_source = StdTimeInstantSource();
86
87        let mut timeout_scheduler = TimeoutScheduler::new();
88        let mut context = ProcessContext::new(self, 0, &instant_source, &mut timeout_scheduler);
89
90        loop {
91            let mut message_processed: bool;
92            let mut stop: bool;
93            let mut duration_to_next_timeout = Duration::MAX;
94            loop {
95                // process queued messages
96                loop {
97                    (message_processed, stop) = self.try_process_message(&mut context);
98
99                    if stop || !message_processed {
100                        break;
101                    }
102                }
103                if stop {
104                    break;
105                } else {
106                    // Try to queue a mature timeout.
107                    // It is done one at a time, so the queue is not overfilled.
108                    // The processing of the timeout and message generated by it handling is done
109                    // before looking for new timeout.
110                    // It is also more efficient, dequeuing timeout need to ask now().
111                    match context.try_send_next_pending_timeout() {
112                        ControlFlow::Continue(()) => (),
113                        ControlFlow::Break(duration) => {
114                            duration_to_next_timeout = duration;
115                            break;
116                        }
117                    }
118                }
119            }
120
121            if stop {
122                break;
123            }
124
125            // block until a new message is posted or the next timeout is mature
126            let (_message_processed, stop) =
127                self.block_process_message(&mut context, duration_to_next_timeout);
128            if stop {
129                break;
130            }
131        }
132    }
133
134    /// Get the Addr of reactive owned inside the dispatcher.
135    ///
136    /// It does not check if the id is really inside the dispatcher.
137    fn build_owned_reactive_addr(&self, id: ActorId) -> Addr {
138        ReactiveAddr::new(self.tx.clone(), id).into_addr()
139    }
140
141    /// Extract a Behavior from the dispatcher
142    fn unregister_reactive_by_id(&mut self, id: ActorId) -> Option<Box<dyn Behavior>> {
143        match self.get_behavior_index(id) {
144            Some(index) => Some(self.reactive_list.remove(index).1),
145            None => None,
146        }
147    }
148
149    /// Replace a Behavior inside the dispatcher
150    fn replace_reactive_by_id(
151        &mut self,
152        id: ActorId,
153        mut behavior: Box<dyn Behavior>,
154    ) -> Result<Box<dyn Behavior>, Box<dyn Behavior>> {
155        match self.get_behavior_index(id) {
156            Some(index) => {
157                std::mem::swap(&mut self.reactive_list[index].1, &mut behavior);
158                Ok(behavior)
159            }
160            None => Err(behavior),
161        }
162    }
163
164    fn get_behavior_index(&mut self, id: ActorId) -> Option<usize> {
165        let result = self
166            .reactive_list
167            .binary_search_by_key(&id, |element| element.0);
168
169        result.ok()
170    }
171
172    fn drop_queued_messages(&mut self) {
173        // Drop all message in queue.
174        while let Ok(msg_and_id) = self.rx.try_recv() {
175            if let Message::Request(request) = msg_and_id.message {
176                let _ = request.src.receive_err_response(
177                    request.id,
178                    NonBoxedErrorStatus {
179                        error: crate::Error::ActorDisappeared,
180                        request_data: request.data,
181                    },
182                );
183            }
184        }
185    }
186
187    /// Process a message targeted to the dispatcher itself.
188    fn process_dispatcher_message(
189        &mut self,
190        context: &mut ProcessContext,
191        message: &Message,
192    ) -> bool {
193        match message {
194            Message::Request(request) => {
195                if let Some(disp_request) = request.data.downcast_ref::<dispatcher::Request>() {
196                    match disp_request {
197                        dispatcher::Request::RegisterReactive { behavior } => {
198                            context.send_response(
199                                request,
200                                dispatcher::Response::RegisterReactive(
201                                    if let Some(behavior) = behavior.replace(None) {
202                                        self.register_reactive(behavior)
203                                    } else {
204                                        Addr::INVALID
205                                    },
206                                ),
207                            );
208                            false
209                        }
210                        dispatcher::Request::ExecuteFn {
211                            executable_fn: boxed_fn,
212                        } => {
213                            let response_data =
214                                (boxed_fn.replace(Box::new(|_| Box::new(()))))(self);
215                            context.send_response(request, response_data);
216                            false
217                        }
218
219                        #[allow(deprecated)]
220                        dispatcher::Request::StopReactive { addr: _ } => false,
221                        dispatcher::Request::StopDispatcher {} => {
222                            if true {
223                                self.drop_queued_messages();
224
225                                // Destroy all registered actors.
226                                self.reactive_list.clear();
227                            }
228                            context.send_response(request, dispatcher::Response::StopDispatcher());
229                            true
230                        }
231                    }
232                } else {
233                    panic!("dispatcher take only dispatcher::Request");
234                }
235            }
236            Message::Response(_) => panic!(),
237            Message::Notification(_) => panic!(),
238        }
239    }
240
241    /// Process a single message.
242    ///
243    /// Return if stop is requested.
244    fn process_current_message(
245        &mut self,
246        context: &mut ProcessContext,
247        message_and_id: MessageAndDstId,
248    ) -> bool {
249        if message_and_id.dst_id == self.disp_actor_id {
250            self.process_dispatcher_message(context, &message_and_id.message)
251        } else {
252            context.own_actor_id = message_and_id.dst_id;
253            match self.get_behavior_index(context.own_actor_id) {
254                Some(index) => self.reactive_list[index]
255                    .1
256                    .process_message(context, &message_and_id.message),
257                None => {
258                    if let Message::Request(request) = message_and_id.message {
259                        let _ = request.src.receive_err_response(
260                            request.id,
261                            NonBoxedErrorStatus {
262                                error: crate::Error::ActorDisappeared,
263                                request_data: request.data,
264                            },
265                        );
266                    }
267                }
268            }
269            false
270        }
271    }
272
273    /// Try to extract a message from the queue and process it if needed.
274    ///
275    /// Return if the message was processed and if stop asked.
276    pub(crate) fn try_process_message(&mut self, context: &mut ProcessContext) -> (bool, bool) {
277        match self.rx.try_recv() {
278            Ok(message_and_id) => (true, self.process_current_message(context, message_and_id)),
279            Err(mpsc::TryRecvError::Empty) => (false, false),
280            Err(mpsc::TryRecvError::Disconnected) => (false, true), // TODO respond ActorDisappeared
281        }
282    }
283
284    /// Block on the queue for a message and process it if needed.
285    ///
286    /// Return if the message was processed and if stop asked.
287    pub(crate) fn block_process_message(
288        &mut self,
289        context: &mut ProcessContext,
290        timeout: Duration,
291    ) -> (bool, bool) {
292        match self.rx.recv_timeout(timeout) {
293            Ok(message_and_id) => {
294                let stop = self.process_current_message(context, message_and_id);
295                (true, stop)
296            }
297            Err(mpsc::RecvTimeoutError::Disconnected) => (false, true),
298            Err(mpsc::RecvTimeoutError::Timeout) => (false, false),
299        }
300    }
301}
302
303impl dispatcher::Dispatcher for MpscDispatcher {
304    fn addr(&self) -> actor::Addr {
305        self.build_owned_reactive_addr(self.disp_actor_id)
306    }
307
308    fn register_reactive(&mut self, behavior: Box<dyn Behavior>) -> actor::Addr {
309        let id = actor::generate_actor_id();
310        self.reactive_list.push((id, behavior));
311        self.reactive_list.sort_unstable_by_key(|element| element.0);
312        self.build_owned_reactive_addr(id)
313    }
314
315    fn replace_reactive(
316        &mut self,
317        addr: &actor::Addr,
318        behavior: Box<dyn Behavior>,
319    ) -> Result<Box<dyn Behavior>, Box<dyn Behavior>> {
320        if let actor::AddrKind::Reactive(reactive_addr) = &addr.kind {
321            self.replace_reactive_by_id(reactive_addr.dst_id, behavior)
322        } else {
323            Err(behavior)
324        }
325    }
326
327    fn unregister_reactive(&mut self, addr: &actor::Addr) -> Option<Box<dyn Behavior>> {
328        if let actor::AddrKind::Reactive(reactive_addr) = &addr.kind {
329            self.unregister_reactive_by_id(reactive_addr.dst_id)
330        } else {
331            None
332        }
333    }
334}
335
336impl Drop for MpscDispatcher {
337    fn drop(&mut self) {
338        self.drop_queued_messages();
339    }
340}
341
342////////////////////////////// public fn's /////////////////////////////////////
343
344/// Start a dispatcher in its own std::thread and return an address to it.
345///
346/// Argument:
347///
348/// * `queue_size` : how many messages can be stored before being full
349/// * `setup_func` : a FnOnce called after the dispatcher initialization, it output will be returned by this function
350///
351/// Return a tuple of the address of the dispatcher as an actor, an thread handle of the thread
352/// of the dispatcher and the return value of the `setup_func`.
353pub fn spawn_dispatcher<F, T>(
354    queue_size: usize,
355    setup_func: F,
356) -> (actor::Addr, thread::JoinHandle<()>, T)
357where
358    F: FnOnce(&mut dyn Dispatcher) -> T,
359    F: Send + 'static,
360    T: Send + 'static + Sized,
361{
362    let builder = Builder::new(queue_size);
363    let mut accessor = builder.to_accessor();
364    let handle = thread::spawn(move || builder.build().process());
365
366    let out = accessor.execute_fn(setup_func, Duration::MAX).unwrap();
367
368    (accessor.dispatcher_addr().clone(), handle, out)
369}
370
371/// InstantSource based on `std::time::now()`.
372struct StdTimeInstantSource();
373
374impl InstantSource for StdTimeInstantSource {
375    fn now(&self) -> Instant {
376        InternalInstant::Finite(time::Instant::now()).into_instant()
377    }
378}
379
380////////////////////////////// tests /////////////////////////////////////
381
382#[cfg(test)]
383mod tests {
384    use crate::{actor::AddrKind, dispatcher::Dispatcher};
385
386    use super::*;
387
388    struct TestBehavior();
389
390    impl Behavior for TestBehavior {
391        fn process_message(&mut self, _context: &mut ProcessContext, msg: &Message) {
392            if let Message::Notification(notif) = msg {
393                if let Some(&float) = notif.data.downcast_ref::<f32>() {
394                    assert!(float == 3.4);
395                } else if let Some(&int) = notif.data.downcast_ref::<i32>() {
396                    assert!(int == -567);
397                }
398            }
399        }
400    }
401
402    #[test]
403    fn simple_reactive_register_unregister() {
404        let mut disp = crate::mpsc_dispatcher::Builder::new(10).build();
405
406        let behavior = Box::new(TestBehavior());
407
408        let addr = disp.register_reactive(behavior);
409        match addr.kind {
410            AddrKind::Reactive(reactive_addr) => {
411                assert!(disp
412                    .unregister_reactive_by_id(reactive_addr.dst_id)
413                    .is_some())
414            }
415            _ => panic!(),
416        }
417    }
418
419    #[test]
420    fn simple_send_message() {
421        let mut disp = crate::mpsc_dispatcher::Builder::new(10).build();
422
423        let instant_source = StdTimeInstantSource();
424        let mut timeout_scheduler = TimeoutScheduler::new();
425        let mut context = ProcessContext::new(&disp, 0, &instant_source, &mut timeout_scheduler);
426
427        let behavior = Box::new(TestBehavior());
428
429        let addr = disp.register_reactive(behavior);
430
431        let result = addr.receive_notification(3.4f32);
432        assert!(result.is_ok());
433
434        let result = addr.receive_notification(-567i32);
435        assert!(result.is_ok());
436
437        let (message_processed, stop) = disp.try_process_message(&mut context);
438        assert!(!stop);
439        assert!(message_processed);
440
441        let (message_processed, stop) = disp.try_process_message(&mut context);
442        assert!(!stop);
443        assert!(message_processed);
444
445        let (message_processed, stop) = disp.try_process_message(&mut context);
446        assert!(!stop);
447        assert!(!message_processed);
448    }
449}