1use {std, either, log, smallvec};
2use crate::{channel, message, session, Message};
3
4use std::convert::TryFrom;
5use std::sync::mpsc;
6use std::time;
7use vec_map::VecMap;
8
9pub mod inner;
10pub mod presult;
11
12pub use self::inner::Inner;
13pub use self::presult::Presult;
14
15#[derive(Clone, Debug, Eq, PartialEq)]
21pub struct Def <CTX : session::Context> {
22 id : CTX::PID,
23 kind : Kind,
24 sourcepoints : Vec <CTX::CID>,
25 endpoints : Vec <CTX::CID>
26}
27
28pub struct Handle <CTX : session::Context> {
30 pub result_rx : mpsc::Receiver <CTX::GPRES>,
31 pub continuation_tx : mpsc::Sender <
32 Box <dyn FnOnce (CTX::GPROC) -> Option <()> + Send>
33 >,
34 pub join_or_continue :
37 either::Either <
38 std::thread::JoinHandle <Option <()>>,
39 Option <Box <dyn FnOnce (CTX::GPROC) -> Option <()> + Send>>
40 >
41}
42
43#[derive(Clone, Debug, Eq, PartialEq)]
59pub enum Kind {
60 Asynchronous {
68 messages_per_update : u32
69 },
70
71 Isochronous {
74 tick_ms : u32,
75 ticks_per_update : u32
76 },
77
78 Mesochronous {
80 tick_ms : u32,
81 ticks_per_update : u32
82 },
83
84 Anisochronous
94}
95
96#[derive(Clone, Copy, Debug, Eq, PartialEq)]
97pub enum ControlFlow {
98 Continue,
99 Break
100}
101
102#[derive(Clone, Debug, Eq, PartialEq)]
103pub enum KindError {
104 AsynchronousZeroMessagesPerUpdate,
105 IsochronousZeroTickMs,
106 IsochronousZeroTicksPerUpdate,
107 MesochronousZeroTickMs,
108 MesochronousZeroTicksPerUpdate
109}
110
111#[derive(Clone, Debug, Eq, PartialEq)]
113pub enum DefineError {
114 DuplicateSourcepoint,
115 DuplicateEndpoint,
116 SourcepointEqEndpoint,
117 AsynchronousZeroEndpoints,
118 AsynchronousMultipleEndpoints
119}
120
121pub trait Process <CTX, RES> where
133 CTX : session::Context,
134 RES : Presult <CTX, Self>,
135 Self : TryFrom <CTX::GPROC> + Into <CTX::GPROC>
136{
137 fn new (inner : Inner <CTX>) -> Self;
141 fn inner_ref (&self) -> &Inner <CTX>;
142 fn inner_mut (&mut self) -> &mut Inner <CTX>;
143 fn result_ref (&self) -> &RES;
144 fn result_mut (&mut self) -> &mut RES;
145 fn global_result (&mut self) -> CTX::GPRES;
146 fn extract_result (session_results : &mut VecMap <CTX::GPRES>)
147 -> Result <RES, String>;
148 fn handle_message (&mut self, message : CTX::GMSG) -> ControlFlow;
149 fn update (&mut self) -> ControlFlow;
150
151 fn initialize (&mut self) { }
153 fn terminate (&mut self) { }
155
156 #[inline]
160 fn id (&self) -> &CTX::PID where CTX : 'static {
161 self.def().id()
162 }
163
164 #[inline]
165 fn kind (&self) -> &Kind where CTX : 'static {
166 self.def().kind()
167 }
168
169 #[inline]
170 fn state_id (&self) -> inner::StateId {
171 self.inner_ref().state().id().clone()
172 }
173
174 #[inline]
175 fn def (&self) -> &Def <CTX> {
176 &self.inner_ref().extended_state().def
177 }
178
179 #[inline]
180 fn sourcepoints (&self) -> &VecMap <Box <dyn channel::Sourcepoint <CTX>>> {
181 &self.inner_ref().extended_state().sourcepoints
182 }
183
184 #[inline]
185 fn sourcepoints_mut (&mut self)
186 -> &mut VecMap <Box <dyn channel::Sourcepoint <CTX>>>
187 {
188 &mut self.inner_mut().extended_state_mut().sourcepoints
189 }
190
191 #[inline]
197 fn endpoints (&self)
198 -> std::cell::Ref <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
199 {
200 self.inner_ref().extended_state().endpoints.borrow()
201 }
202
203 #[inline]
209 fn endpoints_mut (&mut self) -> std::cell::RefMut
210 <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
211 {
212 self.inner_ref().extended_state().endpoints.borrow_mut()
213 }
214
215 #[inline]
224 fn take_endpoints (&self) -> VecMap <Box <dyn channel::Endpoint <CTX>>> {
225 self.inner_ref().extended_state().endpoints.borrow_mut().take().unwrap()
226 }
227
228 #[inline]
232 fn put_endpoints (&self,
233 endpoints : VecMap <Box <dyn channel::Endpoint <CTX>>>
234 ) {
235 *self.inner_ref().extended_state().endpoints.borrow_mut()
236 = Some (endpoints);
237 }
238
239 fn send <M : Message <CTX>> (&self, channel_id : CTX::CID, message : M)
240 -> Result <(), channel::SendError <CTX::GMSG>>
241 where CTX : 'static {
242 let message_name = message.name();
243 log::debug!(
244 process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
245 "process sending message");
246 let cid : usize = channel_id.clone().into();
247 self.sourcepoints()[cid].send (message.into()).map_err (|send_error|{
248 log::warn!(
249 process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
250 "process send error: receiver disconnected");
251 send_error
252 })
253 }
254
255 fn send_to <M : Message <CTX>> (
256 &self, channel_id : CTX::CID, recipient : CTX::PID, message : M
257 ) -> Result <(), channel::SendError <CTX::GMSG>>
258 where CTX : 'static
259 {
260 let message_name = message.name();
261 log::debug!(
262 process:?=self.id(),
263 channel:?=channel_id,
264 peer:?=recipient,
265 message=message_name.as_str();
266 "process sending message to peer");
267 let cid : usize = channel_id.clone().into();
268 self.sourcepoints()[cid].send_to (message.into(), recipient.clone())
269 .map_err (|send_error|{
270 log::warn!(
271 process:?=self.id(),
272 channel:?=channel_id,
273 peer:?=recipient,
274 message=message_name.as_str();
275 "process send to peer error: receiver disconnected");
276 send_error
277 })
278 }
279
280 #[inline]
282 fn run (&mut self) where
283 Self : Sized + 'static,
284 CTX : 'static
285 {
286 use message::Global;
287 debug_assert_eq!(self.state_id(), inner::StateId::Ready);
288 self.initialize();
289 match *self.kind() {
290 Kind::Asynchronous {..} => self.run_asynchronous (),
291 Kind::Isochronous {..} => self.run_isochronous (),
292 Kind::Mesochronous {..} => self.run_mesochronous (),
293 Kind::Anisochronous {..} => self.run_anisochronous()
294 };
295 debug_assert_eq!(self.state_id(), inner::StateId::Ended);
296 self.terminate();
297 self.sourcepoints_mut().clear();
300 { let endpoints = self.take_endpoints();
302 let mut unhandled_count = 0;
303 for (cid, endpoint) in endpoints.iter() {
304 let channel_id = match CTX::CID::try_from (cid as channel::IdReprType) {
306 Ok (cid) => cid,
307 Err (_) => unreachable!()
308 };
309 loop {
310 match endpoint.try_recv() {
311 Ok (message) => {
312 log::warn!(
313 process:?=self.id(),
314 channel:?=channel_id,
315 message=format!("{:?}({})", message.id(), message.inner_name())
316 .as_str();
317 "process unhandled message");
318 unhandled_count += 1;
319 }
320 Err (channel::TryRecvError::Empty) |
321 Err (channel::TryRecvError::Disconnected) => {
322 break
323 }
324 }
325 }
326 }
327 if unhandled_count > 0 {
328 log::warn!(process:?=self.id(), unhandled_message_count=unhandled_count;
329 "process ended with unhandled messages");
330 }
331 }
332 debug_assert!(self.sourcepoints().is_empty());
333 debug_assert!(self.endpoints().is_none());
334 let gpresult = self.global_result();
335 let session_handle = &self.inner_ref().as_ref().session_handle;
336 session_handle.result_tx.send (gpresult).unwrap();
337 }
338
339 #[inline]
342 fn run_continue (mut self) -> Option <()> where
343 Self : Sized + 'static,
344 CTX : 'static
345 {
346 self.run();
347 let continuation : Box <dyn FnOnce (CTX::GPROC) -> Option <()> + Send> = {
348 let session_handle = &self.inner_ref().as_ref().session_handle;
349 session_handle.continuation_rx.recv().unwrap()
350 };
351 continuation (self.into())
352 }
353
354 fn run_asynchronous (&mut self) where
358 Self : Sized,
359 CTX : 'static
360 {
361 use message::Global;
362
363 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
364
365 let messages_per_update = {
366 match *self.kind() {
367 Kind::Asynchronous { messages_per_update } => messages_per_update,
368 _ => unreachable!(
369 "run asynchronous: process kind does not match run function")
370 }
371 };
372 let _t_start = time::Instant::now();
373 log::debug!(process:?=self.id(), kind="asynchronous", messages_per_update;
374 "process start");
375 debug_assert!(1 <= messages_per_update);
376 let mut _message_count = 0;
377 let mut update_count = 0;
378 let mut messages_since_update = 0;
379
380 let endpoints = self.take_endpoints();
381 { let (cid, endpoint) = endpoints.iter().next().unwrap();
383 let channel_id = match CTX::CID::try_from (cid as channel::IdReprType) {
385 Ok (cid) => cid,
386 Err (_) => unreachable!()
387 };
388 '_run_loop: while self.state_id() == inner::StateId::Running {
389 match endpoint.recv() {
391 Ok (message) => {
392 log::debug!(
393 process:?=self.id(),
394 channel:?=channel_id,
395 message=message.inner_name().as_str();
396 "process received message");
397 let handle_message_result = self.handle_message (message);
398 match handle_message_result {
399 ControlFlow::Continue => {}
400 ControlFlow::Break => {
401 if self.state_id() == inner::StateId::Running {
402 self.inner_mut().handle_event (inner::EventParams::End{}.into())
403 .unwrap();
404 }
405 }
406 }
407 _message_count += 1;
408 messages_since_update += 1;
409 }
410 Err (channel::RecvError) => {
411 log::info!(process:?=self.id(), channel:?=channel_id;
412 "process receive failed: sender disconnected");
413 if self.state_id() == inner::StateId::Running {
414 self.inner_mut().handle_event (inner::EventParams::End{}.into())
415 .unwrap();
416 }
417 }
418 }
419 if messages_per_update <= messages_since_update {
420 log::trace!(process:?=self.id(), update=update_count;
422 "process update");
423 let update_result = self.update();
424 match update_result {
425 ControlFlow::Continue => {}
426 ControlFlow::Break => {
427 if self.state_id() == inner::StateId::Running {
428 self.inner_mut().handle_event (inner::EventParams::End{}.into())
429 .unwrap();
430 }
431 }
432 }
433 update_count += 1;
434 messages_since_update = 0;
435 }
436 } } self.put_endpoints (endpoints);
439 } fn run_isochronous (&mut self) where
457 Self : Sized,
458 CTX : 'static
459 {
460 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
461
462 let t_start = time::Instant::now();
463 let (tick_ms, ticks_per_update) = {
464 match *self.kind() {
465 Kind::Isochronous { tick_ms, ticks_per_update }
466 => (tick_ms, ticks_per_update),
467 _ => unreachable!(
468 "run synchronous: process kind does not match run function")
469 }
470 };
471 log::debug!(
472 process:?=self.id(), kind="isochronous", tick_ms, ticks_per_update;
473 "process start");
474 debug_assert!(1 <= tick_ms);
475 debug_assert!(1 <= ticks_per_update);
476 let tick_dur = time::Duration::from_millis (tick_ms as u64);
477 let mut t_last = t_start - tick_dur;
478 let mut t_next = t_start;
479 let mut ticks_since_update = 0;
480 let mut tick_count = 0;
481 #[allow(unused_variables)]
482 let mut _message_count = 0;
483 let mut update_count = 0;
484
485 let endpoints = self.take_endpoints();
486 let mut num_open_channels = endpoints.len();
487 let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
488 let mut v = Vec::with_capacity (num_open_channels);
489 v.resize (num_open_channels, true);
490 v
491 });
492 '_run_loop: while self.state_id() == inner::StateId::Running {
493 let t_now = time::Instant::now();
494 if t_next < t_now {
495 log::trace!(
496 process:?=self.id(),
497 tick=tick_count,
498 since_ns=t_now.duration_since (t_next).as_nanos();
499 "process tick");
500 t_last += tick_dur;
501 t_next += tick_dur;
502
503 poll_messages (self, &endpoints,
505 &mut open_channels, &mut num_open_channels, &mut _message_count);
506
507 tick_count += 1;
508 ticks_since_update += 1;
509 debug_assert!(ticks_since_update <= ticks_per_update);
510 if ticks_since_update == ticks_per_update {
511 log::trace!(process:?=self.id(), update=update_count;
512 "process update");
513 let update_result = self.update();
514 match update_result {
515 ControlFlow::Continue => {}
516 ControlFlow::Break => {
517 if self.state_id() == inner::StateId::Running {
518 self.inner_mut().handle_event (inner::EventParams::End{}.into())
519 .unwrap();
520 }
521 }
522 }
523 update_count += 1;
524 ticks_since_update = 0;
525 }
526 } else {
527 log::warn!(
528 process:?=self.id(),
529 tick=tick_count,
530 until_ns=t_next.duration_since (t_now).as_nanos();
531 "process tick too early");
532 }
533
534 let t_after = time::Instant::now();
535 if t_after < t_next {
536 let t_until = t_next.duration_since (t_after);
538 std::thread::sleep (time::Duration::from_millis (
539 1 + t_until.as_secs()*1000 +
541 t_until.subsec_nanos() as u64/1_000_000))
542 } else {
543 log::warn!(
544 process:?=self.id(),
545 tick=tick_count,
546 after_ns=t_after.duration_since (t_next).as_nanos();
547 "process late tick");
548 }
549
550 } self.put_endpoints (endpoints);
552 } fn run_mesochronous (&mut self) where
567 Self : Sized,
568 CTX : 'static
569 {
570 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
571
572 let t_start = time::Instant::now();
573 let (tick_ms, ticks_per_update) = {
574 match *self.kind() {
575 Kind::Mesochronous { tick_ms, ticks_per_update }
576 => (tick_ms, ticks_per_update),
577 _ => unreachable!(
578 "run synchronous: process kind does not match run function")
579 }
580 };
581 log::debug!(
582 process:?=self.id(), kind="mesochronous", tick_ms, ticks_per_update;
583 "process start");
584 debug_assert!(1 <= tick_ms);
585 debug_assert!(1 <= ticks_per_update);
586 let tick_dur = time::Duration::from_millis (tick_ms as u64);
587 let mut _t_last = t_start - tick_dur;
588 let mut t_next = t_start;
589 let mut ticks_since_update = 0;
590 let mut tick_count = 0;
591 let mut _message_count = 0;
592 let mut update_count = 0;
593
594 let endpoints = self.take_endpoints();
595 let mut num_open_channels = endpoints.len();
596 let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
597 let mut v = Vec::with_capacity (num_open_channels);
598 v.resize (num_open_channels, true);
599 v
600 });
601 '_run_loop: while self.state_id() == inner::StateId::Running {
602 let t_now = time::Instant::now();
603 if t_next < t_now {
604 log::trace!(
605 process:?=self.id(),
606 tick=tick_count,
607 since_ns=t_now.duration_since (t_next).as_nanos();
608 "process tick");
609 _t_last = t_now;
610 t_next = t_now + tick_dur;
611
612 poll_messages (self, &endpoints,
614 &mut open_channels, &mut num_open_channels, &mut _message_count);
615
616 tick_count += 1;
617 ticks_since_update += 1;
618 debug_assert!(ticks_since_update <= ticks_per_update);
619 if ticks_since_update == ticks_per_update {
620 log::trace!(process:?=self.id(), update=update_count;
621 "process update");
622 let update_result = self.update();
623 match update_result {
624 ControlFlow::Continue => {}
625 ControlFlow::Break => {
626 if self.state_id() == inner::StateId::Running {
627 self.inner_mut().handle_event (inner::EventParams::End{}.into())
628 .unwrap();
629 }
630 }
631 }
632 update_count += 1;
633 ticks_since_update = 0;
634 }
635 } else {
636 log::warn!(
637 process:?=self.id(),
638 tick=tick_count,
639 until_ns=t_next.duration_since (t_now).as_nanos();
640 "process tick too early");
641 }
642
643 let t_after = time::Instant::now();
644 if t_after < t_next {
645 let t_until = t_next.duration_since (t_after);
647 std::thread::sleep (time::Duration::from_millis (
648 1 + t_until.as_secs()*1000 +
650 t_until.subsec_nanos() as u64/1_000_000))
651 } else {
652 log::warn!(
653 process:?=self.id(),
654 tick=tick_count,
655 after_ns=t_after.duration_since (t_next).as_nanos();
656 "process late tick");
657 }
658
659 } self.put_endpoints (endpoints);
661 } fn run_anisochronous (&mut self) where
665 Self : Sized,
666 CTX : 'static
667 {
668 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
669
670 let _t_start = time::Instant::now();
671 debug_assert_eq!(Kind::Anisochronous, *self.kind());
672 log::debug!(process:?=self.id(), kind="anisochronous"; "process start");
673 let mut _message_count = 0;
674 let mut update_count = 0;
675
676 let endpoints = self.take_endpoints();
677 let mut num_open_channels = endpoints.len();
678 let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
679 let mut v = Vec::with_capacity (num_open_channels);
680 v.resize (num_open_channels, true);
681 v
682 });
683 '_run_loop: while self.state_id() == inner::StateId::Running {
684 poll_messages (self, &endpoints,
686 &mut open_channels, &mut num_open_channels, &mut _message_count);
687 log::trace!(process:?=self.id(), update=update_count; "process update");
689 let update_result = self.update();
690 match update_result {
691 ControlFlow::Continue => {}
692 ControlFlow::Break => {
693 if self.state_id() == inner::StateId::Running {
694 self.inner_mut().handle_event (inner::EventParams::End{}.into())
695 .unwrap()
696 }
697 }
698 }
699 update_count += 1;
700
701 } self.put_endpoints (endpoints);
703 } } pub type IdReprType = u16;
708pub trait Id <CTX> : Clone + Ord + Into <usize> + TryFrom <IdReprType> +
710 std::fmt::Debug + strum::IntoEnumIterator + strum::EnumCount
711where
712 CTX : session::Context <PID=Self>
713{
714 fn def (&self) -> Def <CTX>;
715 fn spawn (inner : Inner <CTX>) -> std::thread::JoinHandle <Option <()>>;
718 fn gproc (inner : Inner <CTX>) -> CTX::GPROC;
720}
721
722pub trait Global <CTX> where
724 Self : Sized,
725 CTX : session::Context <GPROC=Self>
726{
727 fn id (&self) -> CTX::PID;
728 fn run (&mut self);
729 }
731
732impl <CTX : session::Context> Def <CTX> {
737 pub fn define (
834 id : CTX::PID,
835 kind : Kind,
836 sourcepoints : Vec <CTX::CID>,
837 endpoints : Vec <CTX::CID>
838 ) -> Result <Self, Vec <DefineError>> {
839 let def = Def {
840 id, kind, sourcepoints, endpoints
841 };
842 def.validate_role() ?;
843 Ok (def)
844 }
845
846 pub fn id (&self) -> &CTX::PID {
847 &self.id
848 }
849
850 pub fn kind (&self) -> &Kind {
851 &self.kind
852 }
853
854 pub fn sourcepoints (&self) -> &Vec <CTX::CID> {
855 &self.sourcepoints
856 }
857
858 pub fn endpoints (&self) -> &Vec <CTX::CID> {
859 &self.endpoints
860 }
861
862 fn validate_role (&self) -> Result <(), Vec <DefineError>> {
863 let mut errors = Vec::new();
864
865 let mut producers_dedup = self.sourcepoints.clone();
869 producers_dedup.as_mut_slice().sort();
870 producers_dedup.dedup_by (|x,y| x == y);
871 if producers_dedup.len() < self.sourcepoints.len() {
872 errors.push (DefineError::DuplicateSourcepoint);
873 }
874
875 let mut consumers_dedup = self.endpoints.clone();
877 consumers_dedup.as_mut_slice().sort();
878 consumers_dedup.dedup_by (|x,y| x == y);
879 if consumers_dedup.len() < self.endpoints.len() {
880 errors.push (DefineError::DuplicateEndpoint);
881 }
882
883 let mut producers_and_consumers = producers_dedup.clone();
885 producers_and_consumers.append (&mut consumers_dedup.clone());
886 producers_and_consumers.as_mut_slice().sort();
887 producers_and_consumers.dedup_by (|x,y| x == y);
888 if producers_and_consumers.len()
889 < producers_dedup.len() + consumers_dedup.len()
890 {
891 errors.push (DefineError::SourcepointEqEndpoint);
892 }
893
894 if let Err (mut errs)
896 = self.kind.validate_role::<CTX> (&self.sourcepoints, &self.endpoints)
897 {
898 errors.append (&mut errs);
899 }
900
901 if !errors.is_empty() {
902 Err (errors)
903 } else {
904 Ok (())
905 }
906 }
907}
908
909impl Kind {
910 pub fn asynchronous_default() -> Self {
911 const MESSAGES_PER_UPDATE : u32 = 1;
912 Kind::new_asynchronous (MESSAGES_PER_UPDATE).unwrap()
913 }
914
915 pub fn isochronous_default() -> Self {
916 const TICK_MS : u32 = 1000;
917 const TICKS_PER_UPDATE : u32 = 1;
918 Kind::new_isochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
919 }
920
921 pub fn mesochronous_default() -> Self {
922 const TICK_MS : u32 = 1000;
923 const TICKS_PER_UPDATE : u32 = 1;
924 Kind::new_mesochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
925 }
926
927 pub fn anisochronous_default() -> Self {
928 Kind::new_anisochronous()
929 }
930
931 pub fn new_asynchronous (messages_per_update : u32)
932 -> Result <Self, Vec <KindError>>
933 {
934 let mut errors = Vec::new();
935 if messages_per_update == 0 {
936 errors.push (KindError::AsynchronousZeroMessagesPerUpdate)
937 }
938 if !errors.is_empty() {
939 Err (errors)
940 } else {
941 Ok (Kind::Asynchronous { messages_per_update })
942 }
943 }
944
945 pub fn new_isochronous (tick_ms : u32, ticks_per_update : u32)
946 -> Result <Self, Vec <KindError>>
947 {
948 let mut errors = Vec::new();
949 if tick_ms == 0 {
950 errors.push (KindError::IsochronousZeroTickMs)
951 }
952 if ticks_per_update == 0 {
953 errors.push (KindError::IsochronousZeroTicksPerUpdate)
954 }
955 if !errors.is_empty() {
956 Err (errors)
957 } else {
958 Ok (Kind::Isochronous { tick_ms, ticks_per_update })
959 }
960 }
961
962 pub fn new_mesochronous (tick_ms : u32, ticks_per_update : u32)
963 -> Result <Self, Vec <KindError>>
964 {
965 let mut errors = Vec::new();
966 if tick_ms == 0 {
967 errors.push (KindError::MesochronousZeroTickMs)
968 }
969 if ticks_per_update == 0 {
970 errors.push (KindError::MesochronousZeroTicksPerUpdate)
971 }
972 if !errors.is_empty() {
973 Err (errors)
974 } else {
975 Ok (Kind::Isochronous { tick_ms, ticks_per_update })
976 }
977 }
978
979 #[inline]
980 pub fn new_anisochronous() -> Self {
981 Kind::Anisochronous
982 }
983
984 fn validate_role <CTX : session::Context> (&self,
985 _sourcepoints : &Vec <CTX::CID>,
986 endpoints : &Vec <CTX::CID>
987 ) -> Result <(), Vec <DefineError>> {
988 let mut errors = Vec::new();
989
990 match *self {
991 Kind::Asynchronous {..} => {
992 if endpoints.len() == 0 {
994 errors.push (DefineError::AsynchronousZeroEndpoints)
995 } else if 1 < endpoints.len() {
996 errors.push (DefineError::AsynchronousMultipleEndpoints)
997 }
998 }
999 Kind::Isochronous {..} => { }
1000 Kind::Mesochronous {..} => { }
1001 Kind::Anisochronous {..} => { }
1002 }
1003
1004 if !errors.is_empty() {
1005 Err (errors)
1006 } else {
1007 Ok (())
1008 }
1009 }
1010
1011} impl <M> From <Result <(), channel::SendError <M>>> for ControlFlow {
1014 fn from (send_result : Result <(), channel::SendError <M>>) -> Self {
1015 match send_result {
1016 Ok (_) => ControlFlow::Continue,
1017 Err (_) => ControlFlow::Break
1018 }
1019 }
1020}
1021
1022pub fn report_sizes <CTX : session::Context> () where
1030 CTX : 'static
1031{
1032 println!("process report sizes...");
1033 println!(" size of process::Def: {}", std::mem::size_of::<Def <CTX>>());
1034 Inner::<CTX>::report_sizes();
1035 println!("...process report sizes");
1036}
1037
1038#[inline]
1048fn poll_messages <CTX, P, RES> (
1049 process : &mut P,
1050 endpoints : &VecMap <Box <dyn channel::Endpoint <CTX>>>,
1051 open_channels : &mut smallvec::SmallVec <[bool; 8]>,
1052 num_open_channels : &mut usize,
1053 message_count : &mut usize)
1054where
1055 CTX : session::Context + 'static,
1056 P : Process <CTX, RES> + Sized,
1057 RES : Presult <CTX, P>
1058{
1059 use message::Global;
1060 #[inline]
1061 fn channel_close (is_open : &mut bool, num_open : &mut usize) {
1062 debug_assert!(*is_open);
1063 debug_assert!(0 < *num_open);
1064 *is_open = false;
1065 *num_open -= 1;
1066 }
1067
1068 let mut open_index = 0;
1071 'poll_outer: for (cid, endpoint) in endpoints.iter() {
1072 let channel_id = match CTX::CID::try_from (cid as u16) {
1074 Ok (cid) => cid,
1075 Err (_) => unreachable!()
1076 };
1077 let channel_open = &mut open_channels[open_index];
1078 open_index += 1;
1079 if !*channel_open {
1080 continue 'poll_outer
1081 }
1082 'poll_inner: loop {
1083 match endpoint.try_recv() {
1084 Ok (message) => {
1085 log::debug!(
1086 process:?=process.id(),
1087 channel:?=channel_id,
1088 message=message.inner_name().as_str();
1089 "process received message");
1090 *message_count += 1;
1091 match process.handle_message (message) {
1092 ControlFlow::Continue => {}
1093 ControlFlow::Break => {
1094 channel_close (channel_open, num_open_channels);
1095 if *num_open_channels == 0 {
1097 process.inner_mut().handle_event (
1098 inner::EventParams::End{}.into()
1099 ).unwrap();
1100 }
1101 break 'poll_inner
1102 }
1103 }
1104 }
1105 Err (channel::TryRecvError::Empty) => { break 'poll_inner }
1106 Err (channel::TryRecvError::Disconnected) => {
1107 log::info!(process:?=process.id(), channel:?=channel_id;
1108 "process receive failed: sender disconnected");
1109 channel_close (channel_open, num_open_channels);
1110 if *num_open_channels == 0 {
1111 process.inner_mut().handle_event (inner::EventParams::End{}.into())
1112 .unwrap();
1113 }
1114 break 'poll_inner
1115 }
1116 } } } }