1use {std, either, log, smallvec};
2use crate::{channel, message, session, Message};
3
4use std::convert::TryFrom;
5use std::sync::mpsc;
6use std::time;
7use vec_map::VecMap;
8
9pub mod inner;
10pub mod presult;
11
12pub use self::inner::Inner;
13pub use self::presult::Presult;
14
15#[derive(Clone, Debug, Eq, PartialEq)]
21pub struct Def <CTX : session::Context> {
22 id : CTX::PID,
23 kind : Kind,
24 sourcepoints : Vec <CTX::CID>,
25 endpoints : Vec <CTX::CID>
26}
27
28pub struct Handle <CTX : session::Context> {
30 pub result_rx : mpsc::Receiver <CTX::GPRES>,
31 pub continuation_tx : mpsc::Sender <session::Continuation <CTX>>,
32 pub join_or_continue : either::Either <
35 std::thread::JoinHandle <Option <()>>, Option <session::Continuation <CTX>>>
36}
37
38#[derive(Clone, Debug, Eq, PartialEq)]
54pub enum Kind {
55 Asynchronous {
63 messages_per_update : u32
64 },
65
66 Isochronous {
69 tick_ms : u32,
70 ticks_per_update : u32
71 },
72
73 Mesochronous {
75 tick_ms : u32,
76 ticks_per_update : u32
77 },
78
79 Anisochronous
89}
90
91#[derive(Clone, Copy, Debug, Eq, PartialEq)]
92pub enum ControlFlow {
93 Continue,
94 Break
95}
96
97#[derive(Clone, Debug, Eq, PartialEq)]
98pub enum KindError {
99 AsynchronousZeroMessagesPerUpdate,
100 IsochronousZeroTickMs,
101 IsochronousZeroTicksPerUpdate,
102 MesochronousZeroTickMs,
103 MesochronousZeroTicksPerUpdate
104}
105
106#[derive(Clone, Debug, Eq, PartialEq)]
108pub enum DefineError {
109 DuplicateSourcepoint,
110 DuplicateEndpoint,
111 SourcepointEqEndpoint,
112 AsynchronousZeroEndpoints,
113 AsynchronousMultipleEndpoints
114}
115
116pub trait Process <CTX, RES> where
128 CTX : session::Context,
129 RES : Presult <CTX, Self>,
130 Self : TryFrom <CTX::GPROC> + Into <CTX::GPROC>
131{
132 fn new (inner : Inner <CTX>) -> Self;
136 fn inner_ref (&self) -> &Inner <CTX>;
137 fn inner_mut (&mut self) -> &mut Inner <CTX>;
138 fn result_ref (&self) -> &RES;
139 fn result_mut (&mut self) -> &mut RES;
140 fn global_result (&mut self) -> CTX::GPRES;
141 fn extract_result (session_results : &mut VecMap <CTX::GPRES>)
142 -> Result <RES, String>;
143 fn handle_message (&mut self, message : CTX::GMSG) -> ControlFlow;
144 fn update (&mut self) -> ControlFlow;
145
146 fn initialize (&mut self) { }
148 fn terminate (&mut self) { }
150
151 #[inline]
155 fn id (&self) -> &CTX::PID where CTX : 'static {
156 self.def().id()
157 }
158
159 #[inline]
160 fn kind (&self) -> &Kind where CTX : 'static {
161 self.def().kind()
162 }
163
164 #[inline]
165 fn state_id (&self) -> inner::StateId {
166 self.inner_ref().state().id().clone()
167 }
168
169 #[inline]
170 fn def (&self) -> &Def <CTX> {
171 &self.inner_ref().extended_state().def
172 }
173
174 #[inline]
175 fn sourcepoints (&self) -> &VecMap <Box <dyn channel::Sourcepoint <CTX>>> {
176 &self.inner_ref().extended_state().sourcepoints
177 }
178
179 #[inline]
180 fn sourcepoints_mut (&mut self)
181 -> &mut VecMap <Box <dyn channel::Sourcepoint <CTX>>>
182 {
183 &mut self.inner_mut().extended_state_mut().sourcepoints
184 }
185
186 #[inline]
192 #[expect(mismatched_lifetime_syntaxes)]
193 fn endpoints (&self)
194 -> std::cell::Ref <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
195 {
196 self.inner_ref().extended_state().endpoints.borrow()
197 }
198
199 #[inline]
205 #[expect(mismatched_lifetime_syntaxes)]
206 fn endpoints_mut (&mut self) -> std::cell::RefMut
207 <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
208 {
209 self.inner_ref().extended_state().endpoints.borrow_mut()
210 }
211
212 #[inline]
221 fn take_endpoints (&self) -> VecMap <Box <dyn channel::Endpoint <CTX>>> {
222 self.inner_ref().extended_state().endpoints.borrow_mut().take().unwrap()
223 }
224
225 #[inline]
229 fn put_endpoints (&self,
230 endpoints : VecMap <Box <dyn channel::Endpoint <CTX>>>
231 ) {
232 *self.inner_ref().extended_state().endpoints.borrow_mut()
233 = Some (endpoints);
234 }
235
236 fn send <M : Message <CTX>> (&self, channel_id : CTX::CID, message : M)
237 -> Result <(), channel::SendError <CTX::GMSG>>
238 where CTX : 'static {
239 let message_name = message.name();
240 log::debug!(
241 process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
242 "process sending message");
243 let cid : usize = channel_id.clone().into();
244 self.sourcepoints()[cid].send (message.into()).inspect_err (|_|
245 log::warn!(
246 process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
247 "process send error: receiver disconnected"))
248 }
249
250 fn send_to <M : Message <CTX>> (
251 &self, channel_id : CTX::CID, recipient : CTX::PID, message : M
252 ) -> Result <(), channel::SendError <CTX::GMSG>>
253 where CTX : 'static
254 {
255 let message_name = message.name();
256 log::debug!(
257 process:?=self.id(),
258 channel:?=channel_id,
259 peer:?=recipient,
260 message=message_name.as_str();
261 "process sending message to peer");
262 let cid : usize = channel_id.clone().into();
263 self.sourcepoints()[cid].send_to (message.into(), recipient.clone()).inspect_err (
264 |_| log::warn!(
265 process:?=self.id(),
266 channel:?=channel_id,
267 peer:?=recipient,
268 message=message_name.as_str();
269 "process send to peer error: receiver disconnected"))
270 }
271
272 #[inline]
274 fn run (&mut self) where
275 Self : Sized + 'static,
276 CTX : 'static
277 {
278 use message::Global;
279 debug_assert_eq!(self.state_id(), inner::StateId::Ready);
280 self.initialize();
281 match *self.kind() {
282 Kind::Asynchronous {..} => self.run_asynchronous(),
283 Kind::Isochronous {..} => self.run_isochronous(),
284 Kind::Mesochronous {..} => self.run_mesochronous(),
285 Kind::Anisochronous => self.run_anisochronous()
286 }
287 debug_assert_eq!(self.state_id(), inner::StateId::Ended);
288 self.terminate();
289 self.sourcepoints_mut().clear();
292 { let endpoints = self.take_endpoints();
294 let mut unhandled_count = 0;
295 for (cid, endpoint) in endpoints.iter() {
296 #[expect(clippy::cast_possible_truncation)]
297 let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
299 else { unreachable!() };
300 while let Ok (message) = endpoint.try_recv() {
301 log::warn!(
302 process:?=self.id(),
303 channel:?=channel_id,
304 message=format!("{:?}({})", message.id(), message.inner_name()).as_str();
305 "process unhandled message");
306 unhandled_count += 1;
307 }
308 }
309 if unhandled_count > 0 {
310 log::warn!(process:?=self.id(), unhandled_message_count=unhandled_count;
311 "process ended with unhandled messages");
312 }
313 }
314 debug_assert!(self.sourcepoints().is_empty());
315 debug_assert!(self.endpoints().is_none());
316 let gpresult = self.global_result();
317 let session_handle = &self.inner_ref().as_ref().session_handle;
318 session_handle.result_tx.send (gpresult).unwrap();
319 }
320
321 #[inline]
324 fn run_continue (mut self) -> Option <()> where
325 Self : Sized + 'static,
326 CTX : 'static
327 {
328 self.run();
329 let continuation : session::Continuation <CTX> = {
330 let session_handle = &self.inner_ref().as_ref().session_handle;
331 session_handle.continuation_rx.recv().unwrap()
332 };
333 continuation (self.into())
334 }
335
336 fn run_asynchronous (&mut self) where
340 Self : Sized,
341 CTX : 'static
342 {
343 use message::Global;
344
345 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
346
347 let messages_per_update = {
348 match *self.kind() {
349 Kind::Asynchronous { messages_per_update } => messages_per_update,
350 _ => unreachable!(
351 "run asynchronous: process kind does not match run function")
352 }
353 };
354 let _t_start = time::Instant::now();
355 log::debug!(process:?=self.id(), kind="asynchronous", messages_per_update;
356 "process start");
357 debug_assert!(1 <= messages_per_update);
358 let mut _message_count = 0;
359 let mut update_count = 0;
360 let mut messages_since_update = 0;
361
362 let endpoints = self.take_endpoints();
363 { let (cid, endpoint) = endpoints.iter().next().unwrap();
365 #[expect(clippy::cast_possible_truncation)]
366 let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
368 else { unreachable!() };
369 '_run_loop: while self.state_id() == inner::StateId::Running {
370 match endpoint.recv() {
372 Ok (message) => {
373 log::debug!(
374 process:?=self.id(),
375 channel:?=channel_id,
376 message=message.inner_name().as_str();
377 "process received message");
378 let handle_message_result = self.handle_message (message);
379 match handle_message_result {
380 ControlFlow::Continue => {}
381 ControlFlow::Break => {
382 if self.state_id() == inner::StateId::Running {
383 self.inner_mut().handle_event (inner::EventParams::End{}.into())
384 .unwrap();
385 }
386 }
387 }
388 _message_count += 1;
389 messages_since_update += 1;
390 }
391 Err (channel::RecvError) => {
392 log::info!(process:?=self.id(), channel:?=channel_id;
393 "process receive failed: sender disconnected");
394 if self.state_id() == inner::StateId::Running {
395 self.inner_mut().handle_event (inner::EventParams::End{}.into())
396 .unwrap();
397 }
398 }
399 }
400 if messages_per_update <= messages_since_update {
401 log::trace!(process:?=self.id(), update=update_count;
403 "process update");
404 let update_result = self.update();
405 match update_result {
406 ControlFlow::Continue => {}
407 ControlFlow::Break => {
408 if self.state_id() == inner::StateId::Running {
409 self.inner_mut().handle_event (inner::EventParams::End{}.into())
410 .unwrap();
411 }
412 }
413 }
414 update_count += 1;
415 messages_since_update = 0;
416 }
417 } } self.put_endpoints (endpoints);
420 } fn run_isochronous (&mut self) where
438 Self : Sized,
439 CTX : 'static
440 {
441 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
442
443 let t_start = time::Instant::now();
444 let (tick_ms, ticks_per_update) = {
445 match *self.kind() {
446 Kind::Isochronous { tick_ms, ticks_per_update }
447 => (tick_ms, ticks_per_update),
448 _ => unreachable!(
449 "run synchronous: process kind does not match run function")
450 }
451 };
452 log::debug!(
453 process:?=self.id(), kind="isochronous", tick_ms, ticks_per_update;
454 "process start");
455 debug_assert!(1 <= tick_ms);
456 debug_assert!(1 <= ticks_per_update);
457 let tick_dur = time::Duration::from_millis (tick_ms as u64);
458 let mut t_last = t_start - tick_dur;
459 let mut t_next = t_start;
460 let mut ticks_since_update = 0;
461 let mut tick_count = 0;
462 let mut message_count = 0;
463 let mut update_count = 0;
464
465 let endpoints = self.take_endpoints();
466 let mut num_open_channels = endpoints.len();
467 let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
468 let mut v = Vec::with_capacity (num_open_channels);
469 v.resize (num_open_channels, true);
470 v
471 });
472 '_run_loop: while self.state_id() == inner::StateId::Running {
473 let t_now = time::Instant::now();
474 if t_next < t_now {
475 log::trace!(
476 process:?=self.id(),
477 tick=tick_count,
478 since_ns=t_now.duration_since (t_next).as_nanos();
479 "process tick");
480 t_last += tick_dur;
481 t_next += tick_dur;
482
483 poll_messages (self,
485 &endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
486
487 tick_count += 1;
488 ticks_since_update += 1;
489 debug_assert!(ticks_since_update <= ticks_per_update);
490 if ticks_since_update == ticks_per_update {
491 log::trace!(process:?=self.id(), update=update_count;
492 "process update");
493 let update_result = self.update();
494 match update_result {
495 ControlFlow::Continue => {}
496 ControlFlow::Break => {
497 if self.state_id() == inner::StateId::Running {
498 self.inner_mut().handle_event (inner::EventParams::End{}.into())
499 .unwrap();
500 }
501 }
502 }
503 update_count += 1;
504 ticks_since_update = 0;
505 }
506 } else {
507 log::warn!(
508 process:?=self.id(),
509 tick=tick_count,
510 until_ns=t_next.duration_since (t_now).as_nanos();
511 "process tick too early");
512 }
513
514 let t_after = time::Instant::now();
515 if t_after < t_next {
516 let t_until = t_next.duration_since (t_after);
518 std::thread::sleep (time::Duration::from_millis (
519 1 + t_until.as_secs()*1000 +
521 t_until.subsec_nanos() as u64/1_000_000))
522 } else {
523 log::warn!(
524 process:?=self.id(),
525 tick=tick_count,
526 after_ns=t_after.duration_since (t_next).as_nanos();
527 "process late tick");
528 }
529
530 } self.put_endpoints (endpoints);
532 } fn run_mesochronous (&mut self) where
547 Self : Sized,
548 CTX : 'static
549 {
550 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
551
552 let t_start = time::Instant::now();
553 let (tick_ms, ticks_per_update) = {
554 match *self.kind() {
555 Kind::Mesochronous { tick_ms, ticks_per_update }
556 => (tick_ms, ticks_per_update),
557 _ => unreachable!(
558 "run synchronous: process kind does not match run function")
559 }
560 };
561 log::debug!(
562 process:?=self.id(), kind="mesochronous", tick_ms, ticks_per_update;
563 "process start");
564 debug_assert!(1 <= tick_ms);
565 debug_assert!(1 <= ticks_per_update);
566 let tick_dur = time::Duration::from_millis (tick_ms as u64);
567 let mut _t_last = t_start - tick_dur;
568 let mut t_next = t_start;
569 let mut ticks_since_update = 0;
570 let mut tick_count = 0;
571 let mut message_count = 0;
572 let mut update_count = 0;
573
574 let endpoints = self.take_endpoints();
575 let mut num_open_channels = endpoints.len();
576 let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
577 let mut v = Vec::with_capacity (num_open_channels);
578 v.resize (num_open_channels, true);
579 v
580 });
581 '_run_loop: while self.state_id() == inner::StateId::Running {
582 let t_now = time::Instant::now();
583 if t_next < t_now {
584 log::trace!(
585 process:?=self.id(),
586 tick=tick_count,
587 since_ns=t_now.duration_since (t_next).as_nanos();
588 "process tick");
589 _t_last = t_now;
590 t_next = t_now + tick_dur;
591
592 poll_messages (self,
594 &endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
595
596 tick_count += 1;
597 ticks_since_update += 1;
598 debug_assert!(ticks_since_update <= ticks_per_update);
599 if ticks_since_update == ticks_per_update {
600 log::trace!(process:?=self.id(), update=update_count;
601 "process update");
602 let update_result = self.update();
603 match update_result {
604 ControlFlow::Continue => {}
605 ControlFlow::Break => {
606 if self.state_id() == inner::StateId::Running {
607 self.inner_mut().handle_event (inner::EventParams::End{}.into())
608 .unwrap();
609 }
610 }
611 }
612 update_count += 1;
613 ticks_since_update = 0;
614 }
615 } else {
616 log::warn!(
617 process:?=self.id(),
618 tick=tick_count,
619 until_ns=t_next.duration_since (t_now).as_nanos();
620 "process tick too early");
621 }
622
623 let t_after = time::Instant::now();
624 if t_after < t_next {
625 let t_until = t_next.duration_since (t_after);
627 std::thread::sleep (time::Duration::from_millis (
628 1 + t_until.as_secs()*1000 +
630 t_until.subsec_nanos() as u64/1_000_000))
631 } else {
632 log::warn!(
633 process:?=self.id(),
634 tick=tick_count,
635 after_ns=t_after.duration_since (t_next).as_nanos();
636 "process late tick");
637 }
638
639 } self.put_endpoints (endpoints);
641 } fn run_anisochronous (&mut self) where
645 Self : Sized,
646 CTX : 'static
647 {
648 self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
649
650 let _t_start = time::Instant::now();
651 debug_assert_eq!(Kind::Anisochronous, *self.kind());
652 log::debug!(process:?=self.id(), kind="anisochronous"; "process start");
653 let mut message_count = 0;
654 let mut update_count = 0;
655
656 let endpoints = self.take_endpoints();
657 let mut num_open_channels = endpoints.len();
658 let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
659 let mut v = Vec::with_capacity (num_open_channels);
660 v.resize (num_open_channels, true);
661 v
662 });
663 '_run_loop: while self.state_id() == inner::StateId::Running {
664 poll_messages (self,
666 &endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
667 log::trace!(process:?=self.id(), update=update_count; "process update");
669 let update_result = self.update();
670 match update_result {
671 ControlFlow::Continue => {}
672 ControlFlow::Break => {
673 if self.state_id() == inner::StateId::Running {
674 self.inner_mut().handle_event (inner::EventParams::End{}.into())
675 .unwrap()
676 }
677 }
678 }
679 update_count += 1;
680
681 } self.put_endpoints (endpoints);
683 } } pub type IdReprType = u16;
688pub trait Id <CTX> : Clone + Ord + Into <usize> + TryFrom <IdReprType> +
690 std::fmt::Debug + strum::IntoEnumIterator + strum::EnumCount
691where
692 CTX : session::Context <PID=Self>
693{
694 fn def (&self) -> Def <CTX>;
695 fn spawn (inner : Inner <CTX>) -> std::thread::JoinHandle <Option <()>>;
697 fn gproc (inner : Inner <CTX>) -> CTX::GPROC;
699}
700
701pub trait Global <CTX> where
703 Self : Sized,
704 CTX : session::Context <GPROC=Self>
705{
706 fn id (&self) -> CTX::PID;
707 fn run (&mut self);
708 }
710
711impl <CTX : session::Context> Def <CTX> {
716 pub fn define (
813 id : CTX::PID,
814 kind : Kind,
815 sourcepoints : Vec <CTX::CID>,
816 endpoints : Vec <CTX::CID>
817 ) -> Result <Self, Vec <DefineError>> {
818 let def = Def {
819 id, kind, sourcepoints, endpoints
820 };
821 def.validate_role() ?;
822 Ok (def)
823 }
824
825 pub const fn id (&self) -> &CTX::PID {
826 &self.id
827 }
828
829 pub const fn kind (&self) -> &Kind {
830 &self.kind
831 }
832
833 pub const fn sourcepoints (&self) -> &Vec <CTX::CID> {
834 &self.sourcepoints
835 }
836
837 pub const fn endpoints (&self) -> &Vec <CTX::CID> {
838 &self.endpoints
839 }
840
841 fn validate_role (&self) -> Result <(), Vec <DefineError>> {
842 let mut errors = Vec::new();
843
844 let mut producers_dedup = self.sourcepoints.clone();
848 producers_dedup.as_mut_slice().sort();
849 producers_dedup.dedup_by (|x,y| x == y);
850 if producers_dedup.len() < self.sourcepoints.len() {
851 errors.push (DefineError::DuplicateSourcepoint);
852 }
853
854 let mut consumers_dedup = self.endpoints.clone();
856 consumers_dedup.as_mut_slice().sort();
857 consumers_dedup.dedup_by (|x,y| x == y);
858 if consumers_dedup.len() < self.endpoints.len() {
859 errors.push (DefineError::DuplicateEndpoint);
860 }
861
862 let mut producers_and_consumers = producers_dedup.clone();
864 producers_and_consumers.append (&mut consumers_dedup.clone());
865 producers_and_consumers.as_mut_slice().sort();
866 producers_and_consumers.dedup_by (|x,y| x == y);
867 if producers_and_consumers.len()
868 < producers_dedup.len() + consumers_dedup.len()
869 {
870 errors.push (DefineError::SourcepointEqEndpoint);
871 }
872
873 if let Err (mut errs)
875 = self.kind.validate_role::<CTX> (&self.sourcepoints, &self.endpoints)
876 {
877 errors.append (&mut errs);
878 }
879
880 if !errors.is_empty() {
881 Err (errors)
882 } else {
883 Ok (())
884 }
885 }
886}
887
888impl Kind {
889 pub fn asynchronous_default() -> Self {
890 const MESSAGES_PER_UPDATE : u32 = 1;
891 Kind::new_asynchronous (MESSAGES_PER_UPDATE).unwrap()
892 }
893
894 pub fn isochronous_default() -> Self {
895 const TICK_MS : u32 = 1000;
896 const TICKS_PER_UPDATE : u32 = 1;
897 Kind::new_isochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
898 }
899
900 pub fn mesochronous_default() -> Self {
901 const TICK_MS : u32 = 1000;
902 const TICKS_PER_UPDATE : u32 = 1;
903 Kind::new_mesochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
904 }
905
906 pub const fn anisochronous_default() -> Self {
907 Kind::new_anisochronous()
908 }
909
910 pub fn new_asynchronous (messages_per_update : u32)
911 -> Result <Self, Vec <KindError>>
912 {
913 let mut errors = Vec::new();
914 if messages_per_update == 0 {
915 errors.push (KindError::AsynchronousZeroMessagesPerUpdate)
916 }
917 if !errors.is_empty() {
918 Err (errors)
919 } else {
920 Ok (Kind::Asynchronous { messages_per_update })
921 }
922 }
923
924 pub fn new_isochronous (tick_ms : u32, ticks_per_update : u32)
925 -> Result <Self, Vec <KindError>>
926 {
927 let mut errors = Vec::new();
928 if tick_ms == 0 {
929 errors.push (KindError::IsochronousZeroTickMs)
930 }
931 if ticks_per_update == 0 {
932 errors.push (KindError::IsochronousZeroTicksPerUpdate)
933 }
934 if !errors.is_empty() {
935 Err (errors)
936 } else {
937 Ok (Kind::Isochronous { tick_ms, ticks_per_update })
938 }
939 }
940
941 pub fn new_mesochronous (tick_ms : u32, ticks_per_update : u32)
942 -> Result <Self, Vec <KindError>>
943 {
944 let mut errors = Vec::new();
945 if tick_ms == 0 {
946 errors.push (KindError::MesochronousZeroTickMs)
947 }
948 if ticks_per_update == 0 {
949 errors.push (KindError::MesochronousZeroTicksPerUpdate)
950 }
951 if !errors.is_empty() {
952 Err (errors)
953 } else {
954 Ok (Kind::Isochronous { tick_ms, ticks_per_update })
955 }
956 }
957
958 #[inline]
959 pub const fn new_anisochronous() -> Self {
960 Kind::Anisochronous
961 }
962
963 fn validate_role <CTX : session::Context> (&self,
964 _sourcepoints : &[CTX::CID],
965 endpoints : &[CTX::CID]
966 ) -> Result <(), Vec <DefineError>> {
967 let mut errors = Vec::new();
968
969 match *self {
970 Kind::Asynchronous {..} => {
971 if endpoints.is_empty() {
973 errors.push (DefineError::AsynchronousZeroEndpoints)
974 } else if 1 < endpoints.len() {
975 errors.push (DefineError::AsynchronousMultipleEndpoints)
976 }
977 }
978 Kind::Isochronous {..} |
979 Kind::Mesochronous {..} |
980 Kind::Anisochronous => { }
981 }
982
983 if !errors.is_empty() {
984 Err (errors)
985 } else {
986 Ok (())
987 }
988 }
989
990} impl <M> From <Result <(), channel::SendError <M>>> for ControlFlow {
993 fn from (send_result : Result <(), channel::SendError <M>>) -> Self {
994 match send_result {
995 Ok (()) => ControlFlow::Continue,
996 Err (_) => ControlFlow::Break
997 }
998 }
999}
1000
1001pub fn report_sizes <CTX : session::Context + 'static> () {
1009 println!("process report sizes...");
1010 println!(" size of process::Def: {}", size_of::<Def <CTX>>());
1011 Inner::<CTX>::report_sizes();
1012 println!("...process report sizes");
1013}
1014
1015#[inline]
1025fn poll_messages <CTX, P, RES> (
1026 process : &mut P,
1027 endpoints : &VecMap <Box <dyn channel::Endpoint <CTX>>>,
1028 open_channels : &mut smallvec::SmallVec <[bool; 8]>,
1029 num_open_channels : &mut usize,
1030 message_count : &mut usize)
1031where
1032 CTX : session::Context + 'static,
1033 P : Process <CTX, RES> + Sized,
1034 RES : Presult <CTX, P>
1035{
1036 use message::Global;
1037 #[inline]
1038 fn channel_close (is_open : &mut bool, num_open : &mut usize) {
1039 debug_assert!(*is_open);
1040 debug_assert!(0 < *num_open);
1041 *is_open = false;
1042 *num_open -= 1;
1043 }
1044
1045 'poll_outer: for (open_index, (cid, endpoint)) in endpoints.iter().enumerate() {
1048 #[expect(clippy::cast_possible_truncation)]
1049 let Ok (channel_id) = CTX::CID::try_from (cid as u16) else { unreachable!() };
1051 let channel_open = &mut open_channels[open_index];
1052 if !*channel_open {
1053 continue 'poll_outer
1054 }
1055 'poll_inner: loop {
1056 match endpoint.try_recv() {
1057 Ok (message) => {
1058 log::debug!(
1059 process:?=process.id(),
1060 channel:?=channel_id,
1061 message=message.inner_name().as_str();
1062 "process received message");
1063 *message_count += 1;
1064 match process.handle_message (message) {
1065 ControlFlow::Continue => {}
1066 ControlFlow::Break => {
1067 channel_close (channel_open, num_open_channels);
1068 if *num_open_channels == 0 {
1070 process.inner_mut().handle_event (
1071 inner::EventParams::End{}.into()
1072 ).unwrap();
1073 }
1074 break 'poll_inner
1075 }
1076 }
1077 }
1078 Err (channel::TryRecvError::Empty) => { break 'poll_inner }
1079 Err (channel::TryRecvError::Disconnected) => {
1080 log::info!(process:?=process.id(), channel:?=channel_id;
1081 "process receive failed: sender disconnected");
1082 channel_close (channel_open, num_open_channels);
1083 if *num_open_channels == 0 {
1084 process.inner_mut().handle_event (inner::EventParams::End{}.into())
1085 .unwrap();
1086 }
1087 break 'poll_inner
1088 }
1089 } } } }