1use crate::actor::{self, ActorId, NonBoxedErrorStatus};
14use crate::dispatcher::Dispatcher;
15use crate::reactive::{
16 InstantSource, InternalInstant, MessageAndDstId, ReactiveAddr, TimeoutScheduler,
17};
18use crate::{dispatcher, Addr, Behavior, Instant, Message, ProcessContext};
19
20use std::ops::ControlFlow;
21use std::sync::mpsc;
22use std::time::Duration;
23use std::vec::Vec;
24use std::{thread, time};
25
26pub struct MpscDispatcher {
30 disp_actor_id: ActorId,
31 rx: mpsc::Receiver<MessageAndDstId>,
32 pub(crate) tx: mpsc::SyncSender<MessageAndDstId>,
33 reactive_list: Vec<(ActorId, Box<dyn Behavior>)>,
34}
35
36pub struct Builder {
38 disp_actor_id: ActorId,
39 rx: mpsc::Receiver<MessageAndDstId>,
40 tx: mpsc::SyncSender<MessageAndDstId>,
41}
42
43impl Builder {
44 pub fn new(queue_size: usize) -> Builder {
45 let (tx, rx) = std::sync::mpsc::sync_channel::<MessageAndDstId>(queue_size);
46 Builder {
47 disp_actor_id: actor::generate_actor_id(),
48 rx,
49 tx,
50 }
51 }
52
53 pub fn dispatcher_addr(&self) -> Addr {
54 ReactiveAddr::new(self.tx.clone(), self.disp_actor_id).into_addr()
55 }
56
57 fn into_parts(
58 self,
59 ) -> (
60 ActorId,
61 mpsc::Receiver<MessageAndDstId>,
62 mpsc::SyncSender<MessageAndDstId>,
63 ) {
64 (self.disp_actor_id, self.rx, self.tx)
65 }
66
67 pub fn to_accessor(&self) -> dispatcher::SyncAccessor {
68 dispatcher::SyncAccessor::new(&self.dispatcher_addr())
69 }
70
71 pub fn build(self) -> MpscDispatcher {
72 let (disp_actor_id, rx, tx) = self.into_parts();
73 MpscDispatcher {
74 disp_actor_id,
75 rx,
76 tx,
77 reactive_list: Vec::new(),
78 }
79 }
80}
81
82impl MpscDispatcher {
83 pub fn process(&mut self) {
85 let instant_source = StdTimeInstantSource();
86
87 let mut timeout_scheduler = TimeoutScheduler::new();
88 let mut context = ProcessContext::new(self, 0, &instant_source, &mut timeout_scheduler);
89
90 loop {
91 let mut message_processed: bool;
92 let mut stop: bool;
93 let mut duration_to_next_timeout = Duration::MAX;
94 loop {
95 loop {
97 (message_processed, stop) = self.try_process_message(&mut context);
98
99 if stop || !message_processed {
100 break;
101 }
102 }
103 if stop {
104 break;
105 } else {
106 match context.try_send_next_pending_timeout() {
112 ControlFlow::Continue(()) => (),
113 ControlFlow::Break(duration) => {
114 duration_to_next_timeout = duration;
115 break;
116 }
117 }
118 }
119 }
120
121 if stop {
122 break;
123 }
124
125 let (_message_processed, stop) =
127 self.block_process_message(&mut context, duration_to_next_timeout);
128 if stop {
129 break;
130 }
131 }
132 }
133
134 fn build_owned_reactive_addr(&self, id: ActorId) -> Addr {
138 ReactiveAddr::new(self.tx.clone(), id).into_addr()
139 }
140
141 fn unregister_reactive_by_id(&mut self, id: ActorId) -> Option<Box<dyn Behavior>> {
143 match self.get_behavior_index(id) {
144 Some(index) => Some(self.reactive_list.remove(index).1),
145 None => None,
146 }
147 }
148
149 fn replace_reactive_by_id(
151 &mut self,
152 id: ActorId,
153 mut behavior: Box<dyn Behavior>,
154 ) -> Result<Box<dyn Behavior>, Box<dyn Behavior>> {
155 match self.get_behavior_index(id) {
156 Some(index) => {
157 std::mem::swap(&mut self.reactive_list[index].1, &mut behavior);
158 Ok(behavior)
159 }
160 None => Err(behavior),
161 }
162 }
163
164 fn get_behavior_index(&mut self, id: ActorId) -> Option<usize> {
165 let result = self
166 .reactive_list
167 .binary_search_by_key(&id, |element| element.0);
168
169 result.ok()
170 }
171
172 fn drop_queued_messages(&mut self) {
173 while let Ok(msg_and_id) = self.rx.try_recv() {
175 if let Message::Request(request) = msg_and_id.message {
176 let _ = request.src.receive_err_response(
177 request.id,
178 NonBoxedErrorStatus {
179 error: crate::Error::ActorDisappeared,
180 request_data: request.data,
181 },
182 );
183 }
184 }
185 }
186
187 fn process_dispatcher_message(
189 &mut self,
190 context: &mut ProcessContext,
191 message: &Message,
192 ) -> bool {
193 match message {
194 Message::Request(request) => {
195 if let Some(disp_request) = request.data.downcast_ref::<dispatcher::Request>() {
196 match disp_request {
197 dispatcher::Request::RegisterReactive { behavior } => {
198 context.send_response(
199 request,
200 dispatcher::Response::RegisterReactive(
201 if let Some(behavior) = behavior.replace(None) {
202 self.register_reactive(behavior)
203 } else {
204 Addr::INVALID
205 },
206 ),
207 );
208 false
209 }
210 dispatcher::Request::ExecuteFn {
211 executable_fn: boxed_fn,
212 } => {
213 let response_data =
214 (boxed_fn.replace(Box::new(|_| Box::new(()))))(self);
215 context.send_response(request, response_data);
216 false
217 }
218
219 #[allow(deprecated)]
220 dispatcher::Request::StopReactive { addr: _ } => false,
221 dispatcher::Request::StopDispatcher {} => {
222 if true {
223 self.drop_queued_messages();
224
225 self.reactive_list.clear();
227 }
228 context.send_response(request, dispatcher::Response::StopDispatcher());
229 true
230 }
231 }
232 } else {
233 panic!("dispatcher take only dispatcher::Request");
234 }
235 }
236 Message::Response(_) => panic!(),
237 Message::Notification(_) => panic!(),
238 }
239 }
240
241 fn process_current_message(
245 &mut self,
246 context: &mut ProcessContext,
247 message_and_id: MessageAndDstId,
248 ) -> bool {
249 if message_and_id.dst_id == self.disp_actor_id {
250 self.process_dispatcher_message(context, &message_and_id.message)
251 } else {
252 context.own_actor_id = message_and_id.dst_id;
253 match self.get_behavior_index(context.own_actor_id) {
254 Some(index) => self.reactive_list[index]
255 .1
256 .process_message(context, &message_and_id.message),
257 None => {
258 if let Message::Request(request) = message_and_id.message {
259 let _ = request.src.receive_err_response(
260 request.id,
261 NonBoxedErrorStatus {
262 error: crate::Error::ActorDisappeared,
263 request_data: request.data,
264 },
265 );
266 }
267 }
268 }
269 false
270 }
271 }
272
273 pub(crate) fn try_process_message(&mut self, context: &mut ProcessContext) -> (bool, bool) {
277 match self.rx.try_recv() {
278 Ok(message_and_id) => (true, self.process_current_message(context, message_and_id)),
279 Err(mpsc::TryRecvError::Empty) => (false, false),
280 Err(mpsc::TryRecvError::Disconnected) => (false, true), }
282 }
283
284 pub(crate) fn block_process_message(
288 &mut self,
289 context: &mut ProcessContext,
290 timeout: Duration,
291 ) -> (bool, bool) {
292 match self.rx.recv_timeout(timeout) {
293 Ok(message_and_id) => {
294 let stop = self.process_current_message(context, message_and_id);
295 (true, stop)
296 }
297 Err(mpsc::RecvTimeoutError::Disconnected) => (false, true),
298 Err(mpsc::RecvTimeoutError::Timeout) => (false, false),
299 }
300 }
301}
302
303impl dispatcher::Dispatcher for MpscDispatcher {
304 fn addr(&self) -> actor::Addr {
305 self.build_owned_reactive_addr(self.disp_actor_id)
306 }
307
308 fn register_reactive(&mut self, behavior: Box<dyn Behavior>) -> actor::Addr {
309 let id = actor::generate_actor_id();
310 self.reactive_list.push((id, behavior));
311 self.reactive_list.sort_unstable_by_key(|element| element.0);
312 self.build_owned_reactive_addr(id)
313 }
314
315 fn replace_reactive(
316 &mut self,
317 addr: &actor::Addr,
318 behavior: Box<dyn Behavior>,
319 ) -> Result<Box<dyn Behavior>, Box<dyn Behavior>> {
320 if let actor::AddrKind::Reactive(reactive_addr) = &addr.kind {
321 self.replace_reactive_by_id(reactive_addr.dst_id, behavior)
322 } else {
323 Err(behavior)
324 }
325 }
326
327 fn unregister_reactive(&mut self, addr: &actor::Addr) -> Option<Box<dyn Behavior>> {
328 if let actor::AddrKind::Reactive(reactive_addr) = &addr.kind {
329 self.unregister_reactive_by_id(reactive_addr.dst_id)
330 } else {
331 None
332 }
333 }
334}
335
336impl Drop for MpscDispatcher {
337 fn drop(&mut self) {
338 self.drop_queued_messages();
339 }
340}
341
342pub fn spawn_dispatcher<F, T>(
354 queue_size: usize,
355 setup_func: F,
356) -> (actor::Addr, thread::JoinHandle<()>, T)
357where
358 F: FnOnce(&mut dyn Dispatcher) -> T,
359 F: Send + 'static,
360 T: Send + 'static + Sized,
361{
362 let builder = Builder::new(queue_size);
363 let mut accessor = builder.to_accessor();
364 let handle = thread::spawn(move || builder.build().process());
365
366 let out = accessor.execute_fn(setup_func, Duration::MAX).unwrap();
367
368 (accessor.dispatcher_addr().clone(), handle, out)
369}
370
371struct StdTimeInstantSource();
373
374impl InstantSource for StdTimeInstantSource {
375 fn now(&self) -> Instant {
376 InternalInstant::Finite(time::Instant::now()).into_instant()
377 }
378}
379
380#[cfg(test)]
383mod tests {
384 use crate::{actor::AddrKind, dispatcher::Dispatcher};
385
386 use super::*;
387
388 struct TestBehavior();
389
390 impl Behavior for TestBehavior {
391 fn process_message(&mut self, _context: &mut ProcessContext, msg: &Message) {
392 if let Message::Notification(notif) = msg {
393 if let Some(&float) = notif.data.downcast_ref::<f32>() {
394 assert!(float == 3.4);
395 } else if let Some(&int) = notif.data.downcast_ref::<i32>() {
396 assert!(int == -567);
397 }
398 }
399 }
400 }
401
402 #[test]
403 fn simple_reactive_register_unregister() {
404 let mut disp = crate::mpsc_dispatcher::Builder::new(10).build();
405
406 let behavior = Box::new(TestBehavior());
407
408 let addr = disp.register_reactive(behavior);
409 match addr.kind {
410 AddrKind::Reactive(reactive_addr) => {
411 assert!(disp
412 .unregister_reactive_by_id(reactive_addr.dst_id)
413 .is_some())
414 }
415 _ => panic!(),
416 }
417 }
418
419 #[test]
420 fn simple_send_message() {
421 let mut disp = crate::mpsc_dispatcher::Builder::new(10).build();
422
423 let instant_source = StdTimeInstantSource();
424 let mut timeout_scheduler = TimeoutScheduler::new();
425 let mut context = ProcessContext::new(&disp, 0, &instant_source, &mut timeout_scheduler);
426
427 let behavior = Box::new(TestBehavior());
428
429 let addr = disp.register_reactive(behavior);
430
431 let result = addr.receive_notification(3.4f32);
432 assert!(result.is_ok());
433
434 let result = addr.receive_notification(-567i32);
435 assert!(result.is_ok());
436
437 let (message_processed, stop) = disp.try_process_message(&mut context);
438 assert!(!stop);
439 assert!(message_processed);
440
441 let (message_processed, stop) = disp.try_process_message(&mut context);
442 assert!(!stop);
443 assert!(message_processed);
444
445 let (message_processed, stop) = disp.try_process_message(&mut context);
446 assert!(!stop);
447 assert!(!message_processed);
448 }
449}