apis/process/
mod.rs

1use {std, either, log, smallvec};
2use crate::{channel, message, session, Message};
3
4use std::convert::TryFrom;
5use std::sync::mpsc;
6use std::time;
7use vec_map::VecMap;
8
9pub mod inner;
10pub mod presult;
11
12pub use self::inner::Inner;
13pub use self::presult::Presult;
14
15////////////////////////////////////////////////////////////////////////////////
16//  structs                                                                   //
17////////////////////////////////////////////////////////////////////////////////
18
19/// Process definition.
20#[derive(Clone, Debug, Eq, PartialEq)]
21pub struct Def <CTX : session::Context> {
22  id           : CTX::PID,
23  kind         : Kind,
24  sourcepoints : Vec <CTX::CID>,
25  endpoints    : Vec <CTX::CID>
26}
27
28/// Handle to a process held by the session.
29pub struct Handle <CTX : session::Context> {
30  pub result_rx        : mpsc::Receiver <CTX::GPRES>,
31  pub continuation_tx  : mpsc::Sender <session::Continuation <CTX>>,
32  /// When the session drops, the `finish` method will either join or send
33  /// a continuation depending on the contents of this field.
34  pub join_or_continue : either::Either <
35    std::thread::JoinHandle <Option <()>>, Option <session::Continuation <CTX>>>
36}
37
38////////////////////////////////////////////////////////////////////////////////
39//  enums                                                                     //
40////////////////////////////////////////////////////////////////////////////////
41
42/// Specifies the loop behavior of a process.
43///
44/// - `Asynchronous` is a loop that blocks waiting on exactly one channel
45///   endpoint.
46/// - `Isochronous` is a fixed-timestep loop in which endpoints are polled
47///   once per 'tick' and will attempt to "catch up" if it falls behind.
48/// - `Mesochronous` is a rate-limited loop that polls processes and loops
49///   immediately if enough time has passed and otherwise sleeps for the
50///   remaining duration until the next 'tick'.
51/// - `Anisochronous` is an un-timed polling loop which always loops immediately
52///   and always processes one update per tick.
53#[derive(Clone, Debug, Eq, PartialEq)]
54pub enum Kind {
55  /// Block waiting on one or more endpoints.
56  ///
57  /// Asynchronous processes can only hold multiple endpoints of compatible
58  /// kinds of channels. Currently this is either any number of sink endpoints,
59  /// or else any number and combination of simplex or source endpoints. This is
60  /// validated internally when defining an `Def` struct with the provided
61  /// kind and endpoints.
62  Asynchronous {
63    messages_per_update : u32
64  },
65
66  /// A fixed-time step polling loop that will try to "catch up" if it falls
67  /// behind.
68  Isochronous {
69    tick_ms          : u32,
70    ticks_per_update : u32
71  },
72
73  /// A rate-limited polling loop.
74  Mesochronous {
75    tick_ms          : u32,
76    ticks_per_update : u32
77  },
78
79  /// Poll to exhaustion and update immediately.
80  ///
81  /// This is useful for blocking update functions as in a readline loop or a
82  /// rendering loop.
83  ///
84  /// Note that unlike other polling process kinds (`Isochronous` and
85  /// `Mesochronous`), ticks and updates are always one-to-one since an
86  /// external blocking mechanism is expected to be used in the `update`
87  /// function.
88  Anisochronous
89}
90
91#[derive(Clone, Copy, Debug, Eq, PartialEq)]
92pub enum ControlFlow {
93  Continue,
94  Break
95}
96
97#[derive(Clone, Debug, Eq, PartialEq)]
98pub enum KindError {
99  AsynchronousZeroMessagesPerUpdate,
100  IsochronousZeroTickMs,
101  IsochronousZeroTicksPerUpdate,
102  MesochronousZeroTickMs,
103  MesochronousZeroTicksPerUpdate
104}
105
106/// Error in `Def`.
107#[derive(Clone, Debug, Eq, PartialEq)]
108pub enum DefineError {
109  DuplicateSourcepoint,
110  DuplicateEndpoint,
111  SourcepointEqEndpoint,
112  AsynchronousZeroEndpoints,
113  AsynchronousMultipleEndpoints
114}
115
116////////////////////////////////////////////////////////////////////////////////
117//  traits                                                                    //
118////////////////////////////////////////////////////////////////////////////////
119
120/// Main process trait.
121///
122/// Process run loop will end after either all endpoint channels have returned
123/// `ControlFlow::Break` from `handle_message()` or else if `update()` returns
124/// `ControlFlow::Break`. Note that after the last endpoint channel has closed a
125/// final `update()` will still be processed. When `update()` returns
126/// `ControlFlow::Break`, no further `handle_message()` calls will be made.
127pub trait Process <CTX, RES> where
128  CTX  : session::Context,
129  RES  : Presult <CTX, Self>,
130  Self : TryFrom <CTX::GPROC> + Into <CTX::GPROC>
131{
132  //
133  //  required
134  //
135  fn new            (inner : Inner <CTX>)            -> Self;
136  fn inner_ref      (&self)                          -> &Inner <CTX>;
137  fn inner_mut      (&mut self)                      -> &mut Inner <CTX>;
138  fn result_ref     (&self)                          -> &RES;
139  fn result_mut     (&mut self)                      -> &mut RES;
140  fn global_result  (&mut self)                      -> CTX::GPRES;
141  fn extract_result (session_results : &mut VecMap <CTX::GPRES>)
142    -> Result <RES, String>;
143  fn handle_message (&mut self, message : CTX::GMSG) -> ControlFlow;
144  fn update         (&mut self)                      -> ControlFlow;
145
146  /// Does nothing by default, may be overridden.
147  fn initialize (&mut self) { }
148  /// Does nothing by default, may be overridden.
149  fn terminate  (&mut self) { }
150
151  //
152  //  provided
153  //
154  #[inline]
155  fn id (&self) -> &CTX::PID where CTX : 'static {
156    self.def().id()
157  }
158
159  #[inline]
160  fn kind (&self) -> &Kind where CTX : 'static {
161    self.def().kind()
162  }
163
164  #[inline]
165  fn state_id (&self) -> inner::StateId {
166    self.inner_ref().state().id().clone()
167  }
168
169  #[inline]
170  fn def (&self) -> &Def <CTX> {
171    &self.inner_ref().extended_state().def
172  }
173
174  #[inline]
175  fn sourcepoints (&self) -> &VecMap <Box <dyn channel::Sourcepoint <CTX>>> {
176    &self.inner_ref().extended_state().sourcepoints
177  }
178
179  #[inline]
180  fn sourcepoints_mut (&mut self)
181    -> &mut VecMap <Box <dyn channel::Sourcepoint <CTX>>>
182  {
183    &mut self.inner_mut().extended_state_mut().sourcepoints
184  }
185
186  /// This method returns a `Ref <Option <...>>` because during the run loop
187  /// the endpoints will be unavailable as they are being iterated over.
188  /// Endpoints are automatically waited on or polled in the appropriate
189  /// `run_*` function. Endpoints will be present for the calls to `terminate`
190  /// or `initialize`, either before or after the run loop, respectively.
191  #[inline]
192  #[expect(mismatched_lifetime_syntaxes)]
193  fn endpoints (&self)
194    -> std::cell::Ref <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
195  {
196    self.inner_ref().extended_state().endpoints.borrow()
197  }
198
199  /// This method returns a `Ref <Option <...>>` because during the run loop
200  /// the endpoints will be unavailable as they are being iterated over.
201  /// Endpoints are automatically waited on or polled in the appropriate
202  /// `run_*` function. Endpoints will be present for the calls to `terminate`
203  /// or `initialize`, either before or after the run loop, respectively.
204  #[inline]
205  #[expect(mismatched_lifetime_syntaxes)]
206  fn endpoints_mut (&mut self) -> std::cell::RefMut
207    <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
208  {
209    self.inner_ref().extended_state().endpoints.borrow_mut()
210  }
211
212  /// This method is used within the process `run_*` methods to get the
213  /// endpoints without borrowing the process. Endpoints will then be replaced
214  /// with `None` and unavailable within the run loop.
215  ///
216  /// # Errors
217  ///
218  /// Taking twice is a fatal error.
219  // TODO: error doctest
220  #[inline]
221  fn take_endpoints (&self) -> VecMap <Box <dyn channel::Endpoint <CTX>>> {
222    self.inner_ref().extended_state().endpoints.borrow_mut().take().unwrap()
223  }
224
225  /// # Errors
226  ///
227  /// Error if current endpoints are not `None`.
228  #[inline]
229  fn put_endpoints (&self,
230    endpoints : VecMap <Box <dyn channel::Endpoint <CTX>>>
231  ) {
232    *self.inner_ref().extended_state().endpoints.borrow_mut()
233      = Some (endpoints);
234  }
235
236  fn send <M : Message <CTX>> (&self, channel_id : CTX::CID, message : M)
237    -> Result <(), channel::SendError <CTX::GMSG>>
238  where CTX : 'static {
239    let message_name = message.name();
240    log::debug!(
241      process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
242      "process sending message");
243    let cid : usize = channel_id.clone().into();
244    self.sourcepoints()[cid].send (message.into()).inspect_err (|_|
245      log::warn!(
246        process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
247        "process send error: receiver disconnected"))
248  }
249
250  fn send_to <M : Message <CTX>> (
251    &self, channel_id : CTX::CID, recipient : CTX::PID, message : M
252  ) -> Result <(), channel::SendError <CTX::GMSG>>
253    where CTX : 'static
254  {
255    let message_name = message.name();
256    log::debug!(
257      process:?=self.id(),
258      channel:?=channel_id,
259      peer:?=recipient,
260      message=message_name.as_str();
261      "process sending message to peer");
262    let cid : usize = channel_id.clone().into();
263    self.sourcepoints()[cid].send_to (message.into(), recipient.clone()).inspect_err (
264      |_| log::warn!(
265        process:?=self.id(),
266        channel:?=channel_id,
267        peer:?=recipient,
268        message=message_name.as_str();
269        "process send to peer error: receiver disconnected"))
270  }
271
272  /// Run a process to completion and send the result on the result channel.
273  #[inline]
274  fn run (&mut self) where
275    Self : Sized + 'static,
276    CTX  : 'static
277  {
278    use message::Global;
279    debug_assert_eq!(self.state_id(), inner::StateId::Ready);
280    self.initialize();
281    match *self.kind() {
282      Kind::Asynchronous  {..} => self.run_asynchronous(),
283      Kind::Isochronous   {..} => self.run_isochronous(),
284      Kind::Mesochronous  {..} => self.run_mesochronous(),
285      Kind::Anisochronous      => self.run_anisochronous()
286    }
287    debug_assert_eq!(self.state_id(), inner::StateId::Ended);
288    self.terminate();
289    // at this point no further messages will be sent or processed so
290    // sourcepoints and endpoints are dropped
291    self.sourcepoints_mut().clear();
292    { // warn of unhandled messages
293      let endpoints = self.take_endpoints();
294      let mut unhandled_count = 0;
295      for (cid, endpoint) in endpoints.iter() {
296        #[expect(clippy::cast_possible_truncation)]
297        // NOTE: unwrap requires that err is debug
298        let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
299          else { unreachable!() };
300        while let Ok (message) = endpoint.try_recv() {
301          log::warn!(
302            process:?=self.id(),
303            channel:?=channel_id,
304            message=format!("{:?}({})", message.id(), message.inner_name()).as_str();
305            "process unhandled message");
306          unhandled_count += 1;
307        }
308      }
309      if unhandled_count > 0 {
310        log::warn!(process:?=self.id(), unhandled_message_count=unhandled_count;
311          "process ended with unhandled messages");
312      }
313    }
314    debug_assert!(self.sourcepoints().is_empty());
315    debug_assert!(self.endpoints().is_none());
316    let gpresult = self.global_result();
317    let session_handle = &self.inner_ref().as_ref().session_handle;
318    session_handle.result_tx.send (gpresult).unwrap();
319  }
320
321  /// Run a process to completion, send the result to the session, and proceed
322  /// with the continuation received from the session.
323  #[inline]
324  fn run_continue (mut self) -> Option <()> where
325    Self : Sized + 'static,
326    CTX  : 'static
327  {
328    self.run();
329    let continuation : session::Continuation <CTX> = {
330      let session_handle = &self.inner_ref().as_ref().session_handle;
331      session_handle.continuation_rx.recv().unwrap()
332    };
333    continuation (self.into())
334  }
335
336  /// Asynchronous run loop waits for messages on the single endpoint held by
337  /// this process and calls the process update method for every $n >= 1$
338  /// messages as specified by the process kind.
339  fn run_asynchronous (&mut self) where
340    Self : Sized,
341    CTX  : 'static
342  {
343    use message::Global;
344
345    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
346
347    let messages_per_update = {
348      match *self.kind() {
349        Kind::Asynchronous { messages_per_update } => messages_per_update,
350        _ => unreachable!(
351          "run asynchronous: process kind does not match run function")
352      }
353    };
354    let _t_start = time::Instant::now();
355    log::debug!(process:?=self.id(), kind="asynchronous", messages_per_update;
356      "process start");
357    debug_assert!(1 <= messages_per_update);
358    let mut _message_count        = 0;
359    let mut update_count          = 0;
360    let mut messages_since_update = 0;
361
362    let endpoints = self.take_endpoints();
363    { // create a scope here so the endpoints can be returned after this borrow
364    let (cid, endpoint) = endpoints.iter().next().unwrap();
365    #[expect(clippy::cast_possible_truncation)]
366    // NOTE: unwrap requires that err is debug
367    let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
368      else { unreachable!() };
369    '_run_loop: while self.state_id() == inner::StateId::Running {
370      // wait on message
371      match endpoint.recv() {
372        Ok (message) => {
373          log::debug!(
374            process:?=self.id(),
375            channel:?=channel_id,
376            message=message.inner_name().as_str();
377            "process received message");
378          let handle_message_result = self.handle_message (message);
379          match handle_message_result {
380            ControlFlow::Continue => {}
381            ControlFlow::Break    => {
382              if self.state_id() == inner::StateId::Running {
383                self.inner_mut().handle_event (inner::EventParams::End{}.into())
384                  .unwrap();
385              }
386            }
387          }
388          _message_count         += 1;
389          messages_since_update += 1;
390        }
391        Err (channel::RecvError) => {
392          log::info!(process:?=self.id(), channel:?=channel_id;
393            "process receive failed: sender disconnected");
394          if self.state_id() == inner::StateId::Running {
395            self.inner_mut().handle_event (inner::EventParams::End{}.into())
396              .unwrap();
397          }
398        }
399      }
400      if messages_per_update <= messages_since_update {
401        // update
402        log::trace!(process:?=self.id(), update=update_count;
403          "process update");
404        let update_result = self.update();
405        match update_result {
406          ControlFlow::Continue => {}
407          ControlFlow::Break    => {
408            if self.state_id() == inner::StateId::Running {
409              self.inner_mut().handle_event (inner::EventParams::End{}.into())
410                .unwrap();
411            }
412          }
413        }
414        update_count += 1;
415        messages_since_update = 0;
416      }
417    } // end 'run_loop
418    } // end borrow endpoint
419    self.put_endpoints (endpoints);
420  } // end fn run_asynchronous
421
422  /// This function implements a fixed-timestep update loop.
423  ///
424  /// Time is checked immediately after update and the thread is put to sleep
425  /// for the time remaining until the next update, plus 1 ms since the thread
426  /// usually wakes up slightly before the set time. In practice this means
427  /// that the update time lags behind the target time by about 1ms or so, but
428  /// the time between updates is consistent. If the thread does somehow wake
429  /// up too early, then no update will be done and the thread will sleep or
430  /// else loop immediately depending on the result of a second time query.
431  ///
432  /// After an update, if the next (absolute) tick time has already passed,
433  /// then the thread will not sleep and instead will loop immediately. Note
434  /// that the tick time is measured on an absolute clock, allowing the thread
435  /// to "catch up" in case of a long update by processing the "backlog" of
436  /// ticks as fast as possible.
437  fn run_isochronous (&mut self) where
438    Self : Sized,
439    CTX  : 'static
440  {
441    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
442
443    let t_start = time::Instant::now();
444    let (tick_ms, ticks_per_update) = {
445      match *self.kind() {
446        Kind::Isochronous { tick_ms, ticks_per_update }
447          => (tick_ms, ticks_per_update),
448        _ => unreachable!(
449          "run synchronous: process kind does not match run function")
450      }
451    };
452    log::debug!(
453      process:?=self.id(), kind="isochronous", tick_ms, ticks_per_update;
454      "process start");
455    debug_assert!(1 <= tick_ms);
456    debug_assert!(1 <= ticks_per_update);
457    let tick_dur = time::Duration::from_millis (tick_ms as u64);
458    let mut t_last             = t_start - tick_dur;
459    let mut t_next             = t_start;
460    let mut ticks_since_update = 0;
461    let mut tick_count         = 0;
462    let mut message_count      = 0;
463    let mut update_count       = 0;
464
465    let endpoints              = self.take_endpoints();
466    let mut num_open_channels  = endpoints.len();
467    let mut open_channels      = smallvec::SmallVec::<[bool; 8]>::from_vec ({
468      let mut v = Vec::with_capacity (num_open_channels);
469      v.resize (num_open_channels, true);
470      v
471    });
472    '_run_loop: while self.state_id() == inner::StateId::Running {
473      let t_now = time::Instant::now();
474      if t_next < t_now {
475        log::trace!(
476          process:?=self.id(),
477          tick=tick_count,
478          since_ns=t_now.duration_since (t_next).as_nanos();
479          "process tick");
480        t_last += tick_dur;
481        t_next += tick_dur;
482
483        // poll messages
484        poll_messages (self,
485          &endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
486
487        tick_count += 1;
488        ticks_since_update += 1;
489        debug_assert!(ticks_since_update <= ticks_per_update);
490        if ticks_since_update == ticks_per_update {
491          log::trace!(process:?=self.id(), update=update_count;
492            "process update");
493          let update_result = self.update();
494          match update_result {
495            ControlFlow::Continue => {}
496            ControlFlow::Break    => {
497              if self.state_id() == inner::StateId::Running {
498                self.inner_mut().handle_event (inner::EventParams::End{}.into())
499                  .unwrap();
500              }
501            }
502          }
503          update_count += 1;
504          ticks_since_update = 0;
505        }
506      } else {
507        log::warn!(
508          process:?=self.id(),
509          tick=tick_count,
510          until_ns=t_next.duration_since (t_now).as_nanos();
511          "process tick too early");
512      }
513
514      let t_after = time::Instant::now();
515      if t_after < t_next {
516        // must be positive
517        let t_until = t_next.duration_since (t_after);
518        std::thread::sleep (time::Duration::from_millis (
519          1 +  // add 1ms to avoid too-early update
520          t_until.as_secs()*1000 +
521          t_until.subsec_nanos() as u64/1_000_000))
522      } else {
523        log::warn!(
524          process:?=self.id(),
525          tick=tick_count,
526          after_ns=t_after.duration_since (t_next).as_nanos();
527          "process late tick");
528      }
529
530    } // end 'run_loop
531    self.put_endpoints (endpoints);
532  } // end fn run_isochronous
533
534  /// This function implements a rate-limited update loop.
535  ///
536  /// Time is checked immediately after update and the thread is put to sleep
537  /// for the time remaining until the next update, plus 1 ms since the thread
538  /// usually wakes up slightly before the set time. In practice this means
539  /// that the update time lags behind the target time by about 1ms or so, but
540  /// the time between updates is consistent. If the thread does somehow wake
541  /// up too early, then no update will be done and the thread will sleep, or
542  /// else loop immediately depending on the result of a second time query.
543  ///
544  /// After a tick, if the next tick time has already passed, then the thread
545  /// will not sleep and instead will loop immediately.
546  fn run_mesochronous (&mut self) where
547    Self : Sized,
548    CTX  : 'static
549  {
550    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
551
552    let t_start = time::Instant::now();
553    let (tick_ms, ticks_per_update) = {
554      match *self.kind() {
555        Kind::Mesochronous { tick_ms, ticks_per_update }
556          => (tick_ms, ticks_per_update),
557        _ => unreachable!(
558          "run synchronous: process kind does not match run function")
559      }
560    };
561    log::debug!(
562      process:?=self.id(), kind="mesochronous", tick_ms, ticks_per_update;
563      "process start");
564    debug_assert!(1 <= tick_ms);
565    debug_assert!(1 <= ticks_per_update);
566    let tick_dur = time::Duration::from_millis (tick_ms as u64);
567    let mut _t_last            = t_start - tick_dur;
568    let mut t_next             = t_start;
569    let mut ticks_since_update = 0;
570    let mut tick_count         = 0;
571    let mut message_count      = 0;
572    let mut update_count       = 0;
573
574    let endpoints              = self.take_endpoints();
575    let mut num_open_channels  = endpoints.len();
576    let mut open_channels      = smallvec::SmallVec::<[bool; 8]>::from_vec ({
577      let mut v = Vec::with_capacity (num_open_channels);
578      v.resize (num_open_channels, true);
579      v
580    });
581    '_run_loop: while self.state_id() == inner::StateId::Running {
582      let t_now = time::Instant::now();
583      if t_next < t_now {
584        log::trace!(
585          process:?=self.id(),
586          tick=tick_count,
587          since_ns=t_now.duration_since (t_next).as_nanos();
588          "process tick");
589        _t_last = t_now;
590        t_next  = t_now + tick_dur;
591
592        // poll messages
593        poll_messages (self,
594          &endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
595
596        tick_count += 1;
597        ticks_since_update += 1;
598        debug_assert!(ticks_since_update <= ticks_per_update);
599        if ticks_since_update == ticks_per_update {
600          log::trace!(process:?=self.id(), update=update_count;
601            "process update");
602          let update_result = self.update();
603          match update_result {
604            ControlFlow::Continue => {}
605            ControlFlow::Break    => {
606              if self.state_id() == inner::StateId::Running {
607                self.inner_mut().handle_event (inner::EventParams::End{}.into())
608                  .unwrap();
609              }
610            }
611          }
612          update_count += 1;
613          ticks_since_update = 0;
614        }
615      } else {
616        log::warn!(
617          process:?=self.id(),
618          tick=tick_count,
619          until_ns=t_next.duration_since (t_now).as_nanos();
620          "process tick too early");
621      }
622
623      let t_after = time::Instant::now();
624      if t_after < t_next {
625        // must be positive
626        let t_until = t_next.duration_since (t_after);
627        std::thread::sleep (time::Duration::from_millis (
628          1 +  // add 1ms to avoid too-early update
629          t_until.as_secs()*1000 +
630          t_until.subsec_nanos() as u64/1_000_000))
631      } else {
632        log::warn!(
633          process:?=self.id(),
634          tick=tick_count,
635          after_ns=t_after.duration_since (t_next).as_nanos();
636          "process late tick");
637      }
638
639    } // end 'run_loop
640    self.put_endpoints (endpoints);
641  } // end fn run_mesochronous
642
643  /// An un-timed run loop that polls for messages.
644  fn run_anisochronous (&mut self) where
645    Self : Sized,
646    CTX  : 'static
647  {
648    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
649
650    let _t_start = time::Instant::now();
651    debug_assert_eq!(Kind::Anisochronous, *self.kind());
652    log::debug!(process:?=self.id(), kind="anisochronous"; "process start");
653    let mut message_count = 0;
654    let mut update_count  = 0;
655
656    let endpoints = self.take_endpoints();
657    let mut num_open_channels = endpoints.len();
658    let mut open_channels     = smallvec::SmallVec::<[bool; 8]>::from_vec ({
659      let mut v = Vec::with_capacity (num_open_channels);
660      v.resize (num_open_channels, true);
661      v
662    });
663    '_run_loop: while self.state_id() == inner::StateId::Running {
664      // poll messages
665      poll_messages (self,
666        &endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
667      // update
668      log::trace!(process:?=self.id(), update=update_count; "process update");
669      let update_result = self.update();
670      match update_result {
671        ControlFlow::Continue => {}
672        ControlFlow::Break    => {
673          if self.state_id() == inner::StateId::Running {
674            self.inner_mut().handle_event (inner::EventParams::End{}.into())
675              .unwrap()
676          }
677        }
678      }
679      update_count += 1;
680
681    } // end 'run_loop
682    self.put_endpoints (endpoints);
683  } // end fn run_anisochronous
684
685} // end trait Process
686
687pub type IdReprType = u16;
688/// Unique identifier with a total mapping to process defs.
689pub trait Id <CTX> : Clone + Ord + Into <usize> + TryFrom <IdReprType> +
690  std::fmt::Debug + strum::IntoEnumIterator + strum::EnumCount
691where
692  CTX : session::Context <PID=Self>
693{
694  fn def (&self) -> Def <CTX>;
695  /// Must initialize the concrete process type start running the initial closure.
696  fn spawn (inner : Inner <CTX>) -> std::thread::JoinHandle <Option <()>>;
697  /// Initialize the concrete proces type and return in a `CTX::GPROC`.
698  fn gproc (inner : Inner <CTX>) -> CTX::GPROC;
699}
700
701/// The global process type.
702pub trait Global <CTX> where
703  Self : Sized,
704  CTX  : session::Context <GPROC=Self>
705{
706  fn id (&self) -> CTX::PID;
707  fn run (&mut self);
708  //fn run_continue (mut self) -> Option <()>;
709}
710
711////////////////////////////////////////////////////////////////////////////////
712//  impls                                                                     //
713////////////////////////////////////////////////////////////////////////////////
714
715impl <CTX : session::Context> Def <CTX> {
716  /// The only method to create a valid process def struct. Checks for
717  /// duplicate sourcepoints or endpoints, self-loops, and restrictions on
718  /// process kind (asynchronous processes are incompatible with certain
719  /// combinations of backends).
720  ///
721  /// # Errors
722  ///
723  /// Duplicate sourcepoint:
724  ///
725  /// ```
726  /// # extern crate apis;
727  /// # use apis::{channel,message,process};
728  /// # use apis::session::mock::*;
729  /// # fn main() {
730  /// let result = process::Def::<Mycontext>::define (
731  ///   ProcessId::A,
732  ///   process::Kind::isochronous_default(),
733  ///   vec![ChannelId::X, ChannelId::Z, ChannelId::X],
734  ///   vec![ChannelId::Y]);
735  /// assert_eq!(
736  ///   result, Err (vec![process::DefineError::DuplicateSourcepoint]));
737  /// # }
738  /// ```
739  ///
740  /// Duplicate endpoint:
741  ///
742  /// ```
743  /// # extern crate apis;
744  /// # use apis::{channel,message,process};
745  /// # use apis::session::mock::*;
746  /// # fn main() {
747  /// let result = process::Def::<Mycontext>::define (
748  ///   ProcessId::A,
749  ///   process::Kind::isochronous_default(),
750  ///   vec![ChannelId::X, ChannelId::Z],
751  ///   vec![ChannelId::Y, ChannelId::Y]);
752  /// assert_eq!(
753  ///   result, Err (vec![process::DefineError::DuplicateEndpoint]));
754  /// # }
755  /// ```
756  ///
757  /// Self-loop:
758  ///
759  /// ```
760  /// # extern crate apis;
761  /// # use apis::{channel,message,process};
762  /// # use apis::session::mock::*;
763  /// # fn main() {
764  /// let result = process::Def::<Mycontext>::define (
765  ///   ProcessId::A,
766  ///   process::Kind::isochronous_default(),
767  ///   vec![ChannelId::X, ChannelId::Z],
768  ///   vec![ChannelId::Y, ChannelId::Z]);
769  /// assert_eq!(
770  ///   result, Err (vec![process::DefineError::SourcepointEqEndpoint]));
771  /// # }
772  /// ```
773  ///
774  /// Asynchronous process zero endpoints:
775  ///
776  /// ```
777  /// # extern crate apis;
778  /// # use apis::{channel,message,process};
779  /// # use apis::session::mock::*;
780  /// # use channel::Id;
781  /// # fn main() {
782  /// let result = process::Def::<Mycontext>::define (
783  ///   ProcessId::A,
784  ///   process::Kind::asynchronous_default(),
785  ///   vec![ChannelId::Z],
786  ///   vec![]);
787  /// assert_eq!(
788  ///   result,
789  ///   Err (vec![process::DefineError::AsynchronousZeroEndpoints]));
790  /// # }
791  /// ```
792  ///
793  /// Asynchronous process multiple endpoints:
794  ///
795  /// ```
796  /// # extern crate apis;
797  /// # use apis::{channel,message,process};
798  /// # use apis::session::mock::*;
799  /// # use channel::Id;
800  /// # fn main() {
801  /// let result = process::Def::<Mycontext>::define (
802  ///   ProcessId::A,
803  ///   process::Kind::asynchronous_default(),
804  ///   vec![ChannelId::Z],
805  ///   vec![ChannelId::X, ChannelId::Y]);
806  /// assert_eq!(
807  ///   result,
808  ///   Err (vec![process::DefineError::AsynchronousMultipleEndpoints]));
809  /// # }
810  /// ```
811  ///
812  pub fn define (
813    id           : CTX::PID,
814    kind         : Kind,
815    sourcepoints : Vec <CTX::CID>,
816    endpoints    : Vec <CTX::CID>
817  ) -> Result <Self, Vec <DefineError>> {
818    let def = Def {
819      id, kind, sourcepoints, endpoints
820    };
821    def.validate_role() ?;
822    Ok (def)
823  }
824
825  pub const fn id (&self) -> &CTX::PID {
826    &self.id
827  }
828
829  pub const fn kind (&self) -> &Kind {
830    &self.kind
831  }
832
833  pub const fn sourcepoints (&self) -> &Vec <CTX::CID> {
834    &self.sourcepoints
835  }
836
837  pub const fn endpoints (&self) -> &Vec <CTX::CID> {
838    &self.endpoints
839  }
840
841  fn validate_role (&self) -> Result <(), Vec <DefineError>> {
842    let mut errors = Vec::new();
843
844    // we will not check that a process has zero sourcepoints or endpoints
845
846    // duplicate sourcepoints
847    let mut producers_dedup = self.sourcepoints.clone();
848    producers_dedup.as_mut_slice().sort();
849    producers_dedup.dedup_by (|x,y| x == y);
850    if producers_dedup.len() < self.sourcepoints.len() {
851      errors.push (DefineError::DuplicateSourcepoint);
852    }
853
854    // duplicate endpoints
855    let mut consumers_dedup = self.endpoints.clone();
856    consumers_dedup.as_mut_slice().sort();
857    consumers_dedup.dedup_by (|x,y| x == y);
858    if consumers_dedup.len() < self.endpoints.len() {
859      errors.push (DefineError::DuplicateEndpoint);
860    }
861
862    // self-loops
863    let mut producers_and_consumers = producers_dedup.clone();
864    producers_and_consumers.append (&mut consumers_dedup.clone());
865    producers_and_consumers.as_mut_slice().sort();
866    producers_and_consumers.dedup_by (|x,y| x == y);
867    if producers_and_consumers.len()
868      < producers_dedup.len() + consumers_dedup.len()
869    {
870      errors.push (DefineError::SourcepointEqEndpoint);
871    }
872
873    // validate process kind
874    if let Err (mut errs)
875      = self.kind.validate_role::<CTX> (&self.sourcepoints, &self.endpoints)
876    {
877      errors.append (&mut errs);
878    }
879
880    if !errors.is_empty() {
881      Err (errors)
882    } else {
883      Ok (())
884    }
885  }
886}
887
888impl Kind {
889  pub fn asynchronous_default() -> Self {
890    const MESSAGES_PER_UPDATE : u32 = 1;
891    Kind::new_asynchronous (MESSAGES_PER_UPDATE).unwrap()
892  }
893
894  pub fn isochronous_default() -> Self {
895    const TICK_MS          : u32 = 1000;
896    const TICKS_PER_UPDATE : u32 = 1;
897    Kind::new_isochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
898  }
899
900  pub fn mesochronous_default() -> Self {
901    const TICK_MS          : u32 = 1000;
902    const TICKS_PER_UPDATE : u32 = 1;
903    Kind::new_mesochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
904  }
905
906  pub const fn anisochronous_default() -> Self {
907    Kind::new_anisochronous()
908  }
909
910  pub fn new_asynchronous (messages_per_update : u32)
911    -> Result <Self, Vec <KindError>>
912  {
913    let mut errors = Vec::new();
914    if messages_per_update == 0 {
915      errors.push (KindError::AsynchronousZeroMessagesPerUpdate)
916    }
917    if !errors.is_empty() {
918      Err (errors)
919    } else {
920      Ok (Kind::Asynchronous { messages_per_update })
921    }
922  }
923
924  pub fn new_isochronous (tick_ms : u32, ticks_per_update : u32)
925    -> Result <Self, Vec <KindError>>
926  {
927    let mut errors = Vec::new();
928    if tick_ms == 0 {
929      errors.push (KindError::IsochronousZeroTickMs)
930    }
931    if ticks_per_update == 0 {
932      errors.push (KindError::IsochronousZeroTicksPerUpdate)
933    }
934    if !errors.is_empty() {
935      Err (errors)
936    } else {
937      Ok (Kind::Isochronous { tick_ms, ticks_per_update })
938    }
939  }
940
941  pub fn new_mesochronous (tick_ms : u32, ticks_per_update : u32)
942    -> Result <Self, Vec <KindError>>
943  {
944    let mut errors = Vec::new();
945    if tick_ms == 0 {
946      errors.push (KindError::MesochronousZeroTickMs)
947    }
948    if ticks_per_update == 0 {
949      errors.push (KindError::MesochronousZeroTicksPerUpdate)
950    }
951    if !errors.is_empty() {
952      Err (errors)
953    } else {
954      Ok (Kind::Isochronous { tick_ms, ticks_per_update })
955    }
956  }
957
958  #[inline]
959  pub const fn new_anisochronous() -> Self {
960    Kind::Anisochronous
961  }
962
963  fn validate_role <CTX : session::Context> (&self,
964    _sourcepoints : &[CTX::CID],
965    endpoints     : &[CTX::CID]
966  ) -> Result <(), Vec <DefineError>> {
967    let mut errors = Vec::new();
968
969    match *self {
970      Kind::Asynchronous {..} => {
971        // asynchronous processes must have exactly one endpoint
972        if endpoints.is_empty() {
973          errors.push (DefineError::AsynchronousZeroEndpoints)
974        } else if 1 < endpoints.len() {
975          errors.push (DefineError::AsynchronousMultipleEndpoints)
976        }
977      }
978      Kind::Isochronous   {..} |
979      Kind::Mesochronous  {..} |
980      Kind::Anisochronous      => { /* no restrictions */ }
981    }
982
983    if !errors.is_empty() {
984      Err (errors)
985    } else {
986      Ok (())
987    }
988  }
989
990} // end impl Kind
991
992impl <M> From <Result <(), channel::SendError <M>>> for ControlFlow {
993  fn from (send_result : Result <(), channel::SendError <M>>) -> Self {
994    match send_result {
995      Ok  (()) => ControlFlow::Continue,
996      Err (_)  => ControlFlow::Break
997    }
998  }
999}
1000
1001////////////////////////////////////////////////////////////////////////////////
1002//  functions                                                                 //
1003////////////////////////////////////////////////////////////////////////////////
1004
1005//
1006//  public
1007//
1008pub fn report_sizes <CTX : session::Context + 'static> () {
1009  println!("process report sizes...");
1010  println!("  size of process::Def: {}", size_of::<Def <CTX>>());
1011  Inner::<CTX>::report_sizes();
1012  println!("...process report sizes");
1013}
1014
1015//
1016//  private
1017//
1018
1019//
1020//  fn poll_messages
1021//
1022/// Message polling loop for `Isochronous`, `Mesochronous`, and `Anisochronous`
1023/// processes.
1024#[inline]
1025fn poll_messages <CTX, P, RES> (
1026  process           : &mut P,
1027  endpoints         : &VecMap <Box <dyn channel::Endpoint <CTX>>>,
1028  open_channels     : &mut smallvec::SmallVec <[bool; 8]>,
1029  num_open_channels : &mut usize,
1030  message_count     : &mut usize)
1031where
1032  CTX : session::Context + 'static,
1033  P   : Process <CTX, RES> + Sized,
1034  RES : Presult <CTX, P>
1035{
1036  use message::Global;
1037  #[inline]
1038  fn channel_close (is_open : &mut bool, num_open : &mut usize) {
1039    debug_assert!(*is_open);
1040    debug_assert!(0 < *num_open);
1041    *is_open = false;
1042    *num_open -= 1;
1043  }
1044
1045  // for each open channel (outer loop), poll for messages with try_recv (inner loop)
1046  // until "empty" or "disconnected" is encountered
1047  'poll_outer: for (open_index, (cid, endpoint)) in endpoints.iter().enumerate() {
1048    #[expect(clippy::cast_possible_truncation)]
1049    // NOTE: unwrap requires that err is debug
1050    let Ok (channel_id) = CTX::CID::try_from (cid as u16) else { unreachable!() };
1051    let channel_open = &mut open_channels[open_index];
1052    if !*channel_open {
1053      continue 'poll_outer
1054    }
1055    'poll_inner: loop {
1056      match endpoint.try_recv() {
1057        Ok (message) => {
1058          log::debug!(
1059            process:?=process.id(),
1060            channel:?=channel_id,
1061            message=message.inner_name().as_str();
1062            "process received message");
1063          *message_count += 1;
1064          match process.handle_message (message) {
1065            ControlFlow::Continue => {}
1066            ControlFlow::Break    => {
1067              channel_close (channel_open, num_open_channels);
1068              // only transition to "ended" if this is the last channel to close
1069              if *num_open_channels == 0 {
1070                process.inner_mut().handle_event (
1071                  inner::EventParams::End{}.into()
1072                ).unwrap();
1073              }
1074              break 'poll_inner
1075            }
1076          }
1077        }
1078        Err (channel::TryRecvError::Empty) => { break 'poll_inner }
1079        Err (channel::TryRecvError::Disconnected) => {
1080          log::info!(process:?=process.id(), channel:?=channel_id;
1081            "process receive failed: sender disconnected");
1082          channel_close (channel_open, num_open_channels);
1083          if *num_open_channels == 0 {
1084            process.inner_mut().handle_event (inner::EventParams::End{}.into())
1085              .unwrap();
1086          }
1087          break 'poll_inner
1088        }
1089      } // end match try_recv
1090    } // end 'poll_inner
1091  } // end 'poll_outer
1092} // end fn poll_messages