apis/process/
mod.rs

1use {std, either, log, smallvec};
2use crate::{channel, message, session, Message};
3
4use std::convert::TryFrom;
5use std::sync::mpsc;
6use std::time;
7use vec_map::VecMap;
8
9pub mod inner;
10pub mod presult;
11
12pub use self::inner::Inner;
13pub use self::presult::Presult;
14
15////////////////////////////////////////////////////////////////////////////////
16//  structs                                                                   //
17////////////////////////////////////////////////////////////////////////////////
18
19/// Process definition.
20#[derive(Clone, Debug, Eq, PartialEq)]
21pub struct Def <CTX : session::Context> {
22  id           : CTX::PID,
23  kind         : Kind,
24  sourcepoints : Vec <CTX::CID>,
25  endpoints    : Vec <CTX::CID>
26}
27
28/// Handle to a process held by the session.
29pub struct Handle <CTX : session::Context> {
30  pub result_rx        : mpsc::Receiver <CTX::GPRES>,
31  pub continuation_tx  : mpsc::Sender <
32    Box <dyn FnOnce (CTX::GPROC) -> Option <()> + Send>
33  >,
34  /// When the session drops, the `finish` method will either join or send
35  /// a continuation depending on the contents of this field.
36  pub join_or_continue :
37    either::Either <
38      std::thread::JoinHandle <Option <()>>,
39      Option <Box <dyn FnOnce (CTX::GPROC) -> Option <()> + Send>>
40    >
41}
42
43////////////////////////////////////////////////////////////////////////////////
44//  enums                                                                     //
45////////////////////////////////////////////////////////////////////////////////
46
47/// Specifies the loop behavior of a process.
48///
49/// - `Asynchronous` is a loop that blocks waiting on exactly one channel
50///   endpoint.
51/// - `Isochronous` is a fixed-timestep loop in which endpoints are polled
52///   once per 'tick' and will attempt to "catch up" if it falls behind.
53/// - `Mesochronous` is a rate-limited loop that polls processes and loops
54///   immediately if enough time has passed and otherwise sleeps for the
55///   remaining duration until the next 'tick'.
56/// - `Anisochronous` is an un-timed polling loop which always loops immediately
57///   and always processes one update per tick.
58#[derive(Clone, Debug, Eq, PartialEq)]
59pub enum Kind {
60  /// Block waiting on one or more endpoints.
61  ///
62  /// Asynchronous processes can only hold multiple endpoints of compatible
63  /// kinds of channels. Currently this is either any number of sink endpoints,
64  /// or else any number and combination of simplex or source endpoints. This is
65  /// validated internally when defining an `Def` struct with the provided
66  /// kind and endpoints.
67  Asynchronous {
68    messages_per_update : u32
69  },
70
71  /// A fixed-time step polling loop that will try to "catch up" if it falls
72  /// behind.
73  Isochronous {
74    tick_ms          : u32,
75    ticks_per_update : u32
76  },
77
78  /// A rate-limited polling loop.
79  Mesochronous {
80    tick_ms          : u32,
81    ticks_per_update : u32
82  },
83
84  /// Poll to exhaustion and update immediately.
85  ///
86  /// This is useful for blocking update functions as in a readline loop or a
87  /// rendering loop.
88  ///
89  /// Note that unlike other polling process kinds (`Isochronous` and
90  /// `Mesochronous`), ticks and updates are always one-to-one since an
91  /// external blocking mechanism is expected to be used in the `update`
92  /// function.
93  Anisochronous
94}
95
96#[derive(Clone, Copy, Debug, Eq, PartialEq)]
97pub enum ControlFlow {
98  Continue,
99  Break
100}
101
102#[derive(Clone, Debug, Eq, PartialEq)]
103pub enum KindError {
104  AsynchronousZeroMessagesPerUpdate,
105  IsochronousZeroTickMs,
106  IsochronousZeroTicksPerUpdate,
107  MesochronousZeroTickMs,
108  MesochronousZeroTicksPerUpdate
109}
110
111/// Error in `Def`.
112#[derive(Clone, Debug, Eq, PartialEq)]
113pub enum DefineError {
114  DuplicateSourcepoint,
115  DuplicateEndpoint,
116  SourcepointEqEndpoint,
117  AsynchronousZeroEndpoints,
118  AsynchronousMultipleEndpoints
119}
120
121////////////////////////////////////////////////////////////////////////////////
122//  traits                                                                    //
123////////////////////////////////////////////////////////////////////////////////
124
125/// Main process trait.
126///
127/// Process run loop will end after either all endpoint channels have returned
128/// `ControlFlow::Break` from `handle_message()` or else if `update()` returns
129/// `ControlFlow::Break`. Note that after the last endpoint channel has closed a
130/// final `update()` will still be processed. When `update()` returns
131/// `ControlFlow::Break`, no further `handle_message()` calls will be made.
132pub trait Process <CTX, RES> where
133  CTX  : session::Context,
134  RES  : Presult <CTX, Self>,
135  Self : TryFrom <CTX::GPROC> + Into <CTX::GPROC>
136{
137  //
138  //  required
139  //
140  fn new            (inner : Inner <CTX>)            -> Self;
141  fn inner_ref      (&self)                          -> &Inner <CTX>;
142  fn inner_mut      (&mut self)                      -> &mut Inner <CTX>;
143  fn result_ref     (&self)                          -> &RES;
144  fn result_mut     (&mut self)                      -> &mut RES;
145  fn global_result  (&mut self)                      -> CTX::GPRES;
146  fn extract_result (session_results : &mut VecMap <CTX::GPRES>)
147    -> Result <RES, String>;
148  fn handle_message (&mut self, message : CTX::GMSG) -> ControlFlow;
149  fn update         (&mut self)                      -> ControlFlow;
150
151  /// Does nothing by default, may be overridden.
152  fn initialize (&mut self) { }
153  /// Does nothing by default, may be overridden.
154  fn terminate  (&mut self) { }
155
156  //
157  //  provided
158  //
159  #[inline]
160  fn id (&self) -> &CTX::PID where CTX : 'static {
161    self.def().id()
162  }
163
164  #[inline]
165  fn kind (&self) -> &Kind where CTX : 'static {
166    self.def().kind()
167  }
168
169  #[inline]
170  fn state_id (&self) -> inner::StateId {
171    self.inner_ref().state().id().clone()
172  }
173
174  #[inline]
175  fn def (&self) -> &Def <CTX> {
176    &self.inner_ref().extended_state().def
177  }
178
179  #[inline]
180  fn sourcepoints (&self) -> &VecMap <Box <dyn channel::Sourcepoint <CTX>>> {
181    &self.inner_ref().extended_state().sourcepoints
182  }
183
184  #[inline]
185  fn sourcepoints_mut (&mut self)
186    -> &mut VecMap <Box <dyn channel::Sourcepoint <CTX>>>
187  {
188    &mut self.inner_mut().extended_state_mut().sourcepoints
189  }
190
191  /// This method returns a `Ref <Option <...>>` because during the run loop
192  /// the endpoints will be unavailable as they are being iterated over.
193  /// Endpoints are automatically waited on or polled in the appropriate
194  /// `run_*` function. Endpoints will be present for the calls to `terminate`
195  /// or `initialize`, either before or after the run loop, respectively.
196  #[inline]
197  fn endpoints (&self)
198    -> std::cell::Ref <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
199  {
200    self.inner_ref().extended_state().endpoints.borrow()
201  }
202
203  /// This method returns a `Ref <Option <...>>` because during the run loop
204  /// the endpoints will be unavailable as they are being iterated over.
205  /// Endpoints are automatically waited on or polled in the appropriate
206  /// `run_*` function. Endpoints will be present for the calls to `terminate`
207  /// or `initialize`, either before or after the run loop, respectively.
208  #[inline]
209  fn endpoints_mut (&mut self) -> std::cell::RefMut
210    <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
211  {
212    self.inner_ref().extended_state().endpoints.borrow_mut()
213  }
214
215  /// This method is used within the process `run_*` methods to get the
216  /// endpoints without borrowing the process. Endpoints will then be replaced
217  /// with `None` and unavailable within the run loop.
218  ///
219  /// # Errors
220  ///
221  /// Taking twice is a fatal error.
222  // TODO: error doctest
223  #[inline]
224  fn take_endpoints (&self) -> VecMap <Box <dyn channel::Endpoint <CTX>>> {
225    self.inner_ref().extended_state().endpoints.borrow_mut().take().unwrap()
226  }
227
228  /// # Errors
229  ///
230  /// Error if current endpoints are not `None`.
231  #[inline]
232  fn put_endpoints (&self,
233    endpoints : VecMap <Box <dyn channel::Endpoint <CTX>>>
234  ) {
235    *self.inner_ref().extended_state().endpoints.borrow_mut()
236      = Some (endpoints);
237  }
238
239  fn send <M : Message <CTX>> (&self, channel_id : CTX::CID, message : M)
240    -> Result <(), channel::SendError <CTX::GMSG>>
241  where CTX : 'static {
242    let message_name = message.name();
243    log::debug!(
244      process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
245      "process sending message");
246    let cid : usize = channel_id.clone().into();
247    self.sourcepoints()[cid].send (message.into()).map_err (|send_error|{
248      log::warn!(
249        process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
250        "process send error: receiver disconnected");
251      send_error
252    })
253  }
254
255  fn send_to <M : Message <CTX>> (
256    &self, channel_id : CTX::CID, recipient : CTX::PID, message : M
257  ) -> Result <(), channel::SendError <CTX::GMSG>>
258    where CTX : 'static
259  {
260    let message_name = message.name();
261    log::debug!(
262      process:?=self.id(),
263      channel:?=channel_id,
264      peer:?=recipient,
265      message=message_name.as_str();
266      "process sending message to peer");
267    let cid : usize = channel_id.clone().into();
268    self.sourcepoints()[cid].send_to (message.into(), recipient.clone())
269      .map_err (|send_error|{
270        log::warn!(
271          process:?=self.id(),
272          channel:?=channel_id,
273          peer:?=recipient,
274          message=message_name.as_str();
275          "process send to peer error: receiver disconnected");
276        send_error
277      })
278  }
279
280  /// Run a process to completion and send the result on the result channel.
281  #[inline]
282  fn run (&mut self) where
283    Self : Sized + 'static,
284    CTX  : 'static
285  {
286    use message::Global;
287    debug_assert_eq!(self.state_id(), inner::StateId::Ready);
288    self.initialize();
289    match *self.kind() {
290      Kind::Asynchronous  {..} => self.run_asynchronous (),
291      Kind::Isochronous   {..} => self.run_isochronous  (),
292      Kind::Mesochronous  {..} => self.run_mesochronous (),
293      Kind::Anisochronous {..} => self.run_anisochronous()
294    };
295    debug_assert_eq!(self.state_id(), inner::StateId::Ended);
296    self.terminate();
297    // at this point no further messages will be sent or processed so
298    // sourcepoints and endpoints are dropped
299    self.sourcepoints_mut().clear();
300    { // warn of unhandled messages
301      let endpoints = self.take_endpoints();
302      let mut unhandled_count = 0;
303      for (cid, endpoint) in endpoints.iter() {
304        // NOTE: unwrap requires that err is debug
305        let channel_id = match CTX::CID::try_from (cid as channel::IdReprType) {
306          Ok  (cid) => cid,
307          Err (_)   => unreachable!()
308        };
309        loop {
310          match endpoint.try_recv() {
311            Ok (message) => {
312              log::warn!(
313                process:?=self.id(),
314                channel:?=channel_id,
315                message=format!("{:?}({})", message.id(), message.inner_name())
316                  .as_str();
317                "process unhandled message");
318              unhandled_count += 1;
319            }
320            Err (channel::TryRecvError::Empty) |
321            Err (channel::TryRecvError::Disconnected) => {
322              break
323            }
324          }
325        }
326      }
327      if unhandled_count > 0 {
328        log::warn!(process:?=self.id(), unhandled_message_count=unhandled_count;
329          "process ended with unhandled messages");
330      }
331    }
332    debug_assert!(self.sourcepoints().is_empty());
333    debug_assert!(self.endpoints().is_none());
334    let gpresult = self.global_result();
335    let session_handle = &self.inner_ref().as_ref().session_handle;
336    session_handle.result_tx.send (gpresult).unwrap();
337  }
338
339  /// Run a process to completion, send the result to the session, and proceed
340  /// with the continuation received from the session.
341  #[inline]
342  fn run_continue (mut self) -> Option <()> where
343    Self : Sized + 'static,
344    CTX  : 'static
345  {
346    self.run();
347    let continuation : Box <dyn FnOnce (CTX::GPROC) -> Option <()> + Send> = {
348      let session_handle = &self.inner_ref().as_ref().session_handle;
349      session_handle.continuation_rx.recv().unwrap()
350    };
351    continuation (self.into())
352  }
353
354  /// Asynchronous run loop waits for messages on the single endpoint held by
355  /// this process and calls the process update method for every $n >= 1$
356  /// messages as specified by the process kind.
357  fn run_asynchronous (&mut self) where
358    Self : Sized,
359    CTX  : 'static
360  {
361    use message::Global;
362
363    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
364
365    let messages_per_update = {
366      match *self.kind() {
367        Kind::Asynchronous { messages_per_update } => messages_per_update,
368        _ => unreachable!(
369          "run asynchronous: process kind does not match run function")
370      }
371    };
372    let _t_start = time::Instant::now();
373    log::debug!(process:?=self.id(), kind="asynchronous", messages_per_update;
374      "process start");
375    debug_assert!(1 <= messages_per_update);
376    let mut _message_count        = 0;
377    let mut update_count          = 0;
378    let mut messages_since_update = 0;
379
380    let endpoints = self.take_endpoints();
381    { // create a scope here so the endpoints can be returned after this borrow
382    let (cid, endpoint) = endpoints.iter().next().unwrap();
383    // NOTE: unwrap requires that err is debug
384    let channel_id = match CTX::CID::try_from (cid as channel::IdReprType) {
385      Ok  (cid) => cid,
386      Err (_)   => unreachable!()
387    };
388    '_run_loop: while self.state_id() == inner::StateId::Running {
389      // wait on message
390      match endpoint.recv() {
391        Ok (message) => {
392          log::debug!(
393            process:?=self.id(),
394            channel:?=channel_id,
395            message=message.inner_name().as_str();
396            "process received message");
397          let handle_message_result = self.handle_message (message);
398          match handle_message_result {
399            ControlFlow::Continue => {}
400            ControlFlow::Break    => {
401              if self.state_id() == inner::StateId::Running {
402                self.inner_mut().handle_event (inner::EventParams::End{}.into())
403                  .unwrap();
404              }
405            }
406          }
407          _message_count         += 1;
408          messages_since_update += 1;
409        }
410        Err (channel::RecvError) => {
411          log::info!(process:?=self.id(), channel:?=channel_id;
412            "process receive failed: sender disconnected");
413          if self.state_id() == inner::StateId::Running {
414            self.inner_mut().handle_event (inner::EventParams::End{}.into())
415              .unwrap();
416          }
417        }
418      }
419      if messages_per_update <= messages_since_update {
420        // update
421        log::trace!(process:?=self.id(), update=update_count;
422          "process update");
423        let update_result = self.update();
424        match update_result {
425          ControlFlow::Continue => {}
426          ControlFlow::Break    => {
427            if self.state_id() == inner::StateId::Running {
428              self.inner_mut().handle_event (inner::EventParams::End{}.into())
429                .unwrap();
430            }
431          }
432        }
433        update_count += 1;
434        messages_since_update = 0;
435      }
436    } // end 'run_loop
437    } // end borrow endpoint
438    self.put_endpoints (endpoints);
439  } // end fn run_asynchronous
440
441  /// This function implements a fixed-timestep update loop.
442  ///
443  /// Time is checked immediately after update and the thread is put to sleep
444  /// for the time remaining until the next update, plus 1 ms since the thread
445  /// usually wakes up slightly before the set time. In practice this means
446  /// that the update time lags behind the target time by about 1ms or so, but
447  /// the time between updates is consistent. If the thread does somehow wake
448  /// up too early, then no update will be done and the thread will sleep or
449  /// else loop immediately depending on the result of a second time query.
450  ///
451  /// After an update, if the next (absolute) tick time has already passed,
452  /// then the thread will not sleep and instead will loop immediately. Note
453  /// that the tick time is measured on an absolute clock, allowing the thread
454  /// to "catch up" in case of a long update by processing the "backlog" of
455  /// ticks as fast as possible.
456  fn run_isochronous (&mut self) where
457    Self : Sized,
458    CTX  : 'static
459  {
460    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
461
462    let t_start = time::Instant::now();
463    let (tick_ms, ticks_per_update) = {
464      match *self.kind() {
465        Kind::Isochronous { tick_ms, ticks_per_update }
466          => (tick_ms, ticks_per_update),
467        _ => unreachable!(
468          "run synchronous: process kind does not match run function")
469      }
470    };
471    log::debug!(
472      process:?=self.id(), kind="isochronous", tick_ms, ticks_per_update;
473      "process start");
474    debug_assert!(1 <= tick_ms);
475    debug_assert!(1 <= ticks_per_update);
476    let tick_dur = time::Duration::from_millis (tick_ms as u64);
477    let mut t_last             = t_start - tick_dur;
478    let mut t_next             = t_start;
479    let mut ticks_since_update = 0;
480    let mut tick_count         = 0;
481    #[allow(unused_variables)]
482    let mut _message_count      = 0;
483    let mut update_count       = 0;
484
485    let endpoints              = self.take_endpoints();
486    let mut num_open_channels  = endpoints.len();
487    let mut open_channels      = smallvec::SmallVec::<[bool; 8]>::from_vec ({
488      let mut v = Vec::with_capacity (num_open_channels);
489      v.resize (num_open_channels, true);
490      v
491    });
492    '_run_loop: while self.state_id() == inner::StateId::Running {
493      let t_now = time::Instant::now();
494      if t_next < t_now {
495        log::trace!(
496          process:?=self.id(),
497          tick=tick_count,
498          since_ns=t_now.duration_since (t_next).as_nanos();
499          "process tick");
500        t_last += tick_dur;
501        t_next += tick_dur;
502
503        // poll messages
504        poll_messages (self, &endpoints,
505          &mut open_channels, &mut num_open_channels, &mut _message_count);
506
507        tick_count += 1;
508        ticks_since_update += 1;
509        debug_assert!(ticks_since_update <= ticks_per_update);
510        if ticks_since_update == ticks_per_update {
511          log::trace!(process:?=self.id(), update=update_count;
512            "process update");
513          let update_result = self.update();
514          match update_result {
515            ControlFlow::Continue => {}
516            ControlFlow::Break    => {
517              if self.state_id() == inner::StateId::Running {
518                self.inner_mut().handle_event (inner::EventParams::End{}.into())
519                  .unwrap();
520              }
521            }
522          }
523          update_count += 1;
524          ticks_since_update = 0;
525        }
526      } else {
527        log::warn!(
528          process:?=self.id(),
529          tick=tick_count,
530          until_ns=t_next.duration_since (t_now).as_nanos();
531          "process tick too early");
532      }
533
534      let t_after = time::Instant::now();
535      if t_after < t_next {
536        // must be positive
537        let t_until = t_next.duration_since (t_after);
538        std::thread::sleep (time::Duration::from_millis (
539          1 +  // add 1ms to avoid too-early update
540          t_until.as_secs()*1000 +
541          t_until.subsec_nanos() as u64/1_000_000))
542      } else {
543        log::warn!(
544          process:?=self.id(),
545          tick=tick_count,
546          after_ns=t_after.duration_since (t_next).as_nanos();
547          "process late tick");
548      }
549
550    } // end 'run_loop
551    self.put_endpoints (endpoints);
552  } // end fn run_isochronous
553
554  /// This function implements a rate-limited update loop.
555  ///
556  /// Time is checked immediately after update and the thread is put to sleep
557  /// for the time remaining until the next update, plus 1 ms since the thread
558  /// usually wakes up slightly before the set time. In practice this means
559  /// that the update time lags behind the target time by about 1ms or so, but
560  /// the time between updates is consistent. If the thread does somehow wake
561  /// up too early, then no update will be done and the thread will sleep, or
562  /// else loop immediately depending on the result of a second time query.
563  ///
564  /// After a tick, if the next tick time has already passed, then the thread
565  /// will not sleep and instead will loop immediately.
566  fn run_mesochronous (&mut self) where
567    Self : Sized,
568    CTX  : 'static
569  {
570    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
571
572    let t_start = time::Instant::now();
573    let (tick_ms, ticks_per_update) = {
574      match *self.kind() {
575        Kind::Mesochronous { tick_ms, ticks_per_update }
576          => (tick_ms, ticks_per_update),
577        _ => unreachable!(
578          "run synchronous: process kind does not match run function")
579      }
580    };
581    log::debug!(
582      process:?=self.id(), kind="mesochronous", tick_ms, ticks_per_update;
583      "process start");
584    debug_assert!(1 <= tick_ms);
585    debug_assert!(1 <= ticks_per_update);
586    let tick_dur = time::Duration::from_millis (tick_ms as u64);
587    let mut _t_last            = t_start - tick_dur;
588    let mut t_next             = t_start;
589    let mut ticks_since_update = 0;
590    let mut tick_count         = 0;
591    let mut _message_count     = 0;
592    let mut update_count       = 0;
593
594    let endpoints              = self.take_endpoints();
595    let mut num_open_channels  = endpoints.len();
596    let mut open_channels      = smallvec::SmallVec::<[bool; 8]>::from_vec ({
597      let mut v = Vec::with_capacity (num_open_channels);
598      v.resize (num_open_channels, true);
599      v
600    });
601    '_run_loop: while self.state_id() == inner::StateId::Running {
602      let t_now = time::Instant::now();
603      if t_next < t_now {
604        log::trace!(
605          process:?=self.id(),
606          tick=tick_count,
607          since_ns=t_now.duration_since (t_next).as_nanos();
608          "process tick");
609        _t_last = t_now;
610        t_next  = t_now + tick_dur;
611
612        // poll messages
613        poll_messages (self, &endpoints,
614          &mut open_channels, &mut num_open_channels, &mut _message_count);
615
616        tick_count += 1;
617        ticks_since_update += 1;
618        debug_assert!(ticks_since_update <= ticks_per_update);
619        if ticks_since_update == ticks_per_update {
620          log::trace!(process:?=self.id(), update=update_count;
621            "process update");
622          let update_result = self.update();
623          match update_result {
624            ControlFlow::Continue => {}
625            ControlFlow::Break    => {
626              if self.state_id() == inner::StateId::Running {
627                self.inner_mut().handle_event (inner::EventParams::End{}.into())
628                  .unwrap();
629              }
630            }
631          }
632          update_count += 1;
633          ticks_since_update = 0;
634        }
635      } else {
636        log::warn!(
637          process:?=self.id(),
638          tick=tick_count,
639          until_ns=t_next.duration_since (t_now).as_nanos();
640          "process tick too early");
641      }
642
643      let t_after = time::Instant::now();
644      if t_after < t_next {
645        // must be positive
646        let t_until = t_next.duration_since (t_after);
647        std::thread::sleep (time::Duration::from_millis (
648          1 +  // add 1ms to avoid too-early update
649          t_until.as_secs()*1000 +
650          t_until.subsec_nanos() as u64/1_000_000))
651      } else {
652        log::warn!(
653          process:?=self.id(),
654          tick=tick_count,
655          after_ns=t_after.duration_since (t_next).as_nanos();
656          "process late tick");
657      }
658
659    } // end 'run_loop
660    self.put_endpoints (endpoints);
661  } // end fn run_mesochronous
662
663  /// An un-timed run loop that polls for messages.
664  fn run_anisochronous (&mut self) where
665    Self : Sized,
666    CTX  : 'static
667  {
668    self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
669
670    let _t_start = time::Instant::now();
671    debug_assert_eq!(Kind::Anisochronous, *self.kind());
672    log::debug!(process:?=self.id(), kind="anisochronous"; "process start");
673    let mut _message_count = 0;
674    let mut update_count  = 0;
675
676    let endpoints = self.take_endpoints();
677    let mut num_open_channels = endpoints.len();
678    let mut open_channels     = smallvec::SmallVec::<[bool; 8]>::from_vec ({
679      let mut v = Vec::with_capacity (num_open_channels);
680      v.resize (num_open_channels, true);
681      v
682    });
683    '_run_loop: while self.state_id() == inner::StateId::Running {
684      // poll messages
685      poll_messages (self, &endpoints,
686        &mut open_channels, &mut num_open_channels, &mut _message_count);
687      // update
688      log::trace!(process:?=self.id(), update=update_count; "process update");
689      let update_result = self.update();
690      match update_result {
691        ControlFlow::Continue => {}
692        ControlFlow::Break    => {
693          if self.state_id() == inner::StateId::Running {
694            self.inner_mut().handle_event (inner::EventParams::End{}.into())
695              .unwrap()
696          }
697        }
698      }
699      update_count += 1;
700
701    } // end 'run_loop
702    self.put_endpoints (endpoints);
703  } // end fn run_anisochronous
704
705} // end trait Process
706
707pub type IdReprType = u16;
708/// Unique identifier with a total mapping to process defs.
709pub trait Id <CTX> : Clone + Ord + Into <usize> + TryFrom <IdReprType> +
710  std::fmt::Debug + strum::IntoEnumIterator + strum::EnumCount
711where
712  CTX : session::Context <PID=Self>
713{
714  fn def (&self) -> Def <CTX>;
715  /// Must initialize the concrete process type start running the initial
716  /// closure.
717  fn spawn (inner : Inner <CTX>) -> std::thread::JoinHandle <Option <()>>;
718  /// Initialie the concrete proces type and return in a CTX::GPROC.
719  fn gproc (inner : Inner <CTX>) -> CTX::GPROC;
720}
721
722/// The global process type.
723pub trait Global <CTX> where
724  Self : Sized,
725  CTX  : session::Context <GPROC=Self>
726{
727  fn id (&self) -> CTX::PID;
728  fn run (&mut self);
729  //fn run_continue (mut self) -> Option <()>;
730}
731
732////////////////////////////////////////////////////////////////////////////////
733//  impls                                                                     //
734////////////////////////////////////////////////////////////////////////////////
735
736impl <CTX : session::Context> Def <CTX> {
737  /// The only method to create a valid process def struct. Checks for
738  /// duplicate sourcepoints or endpoints, self-loops, and restrictions on
739  /// process kind (asynchronous processes are incompatible with certain
740  /// combinations of backends).
741  ///
742  /// # Errors
743  ///
744  /// Duplicate sourcepoint:
745  ///
746  /// ```
747  /// # extern crate apis;
748  /// # use apis::{channel,message,process};
749  /// # use apis::session::mock::*;
750  /// # fn main() {
751  /// let result = process::Def::<Mycontext>::define (
752  ///   ProcessId::A,
753  ///   process::Kind::isochronous_default(),
754  ///   vec![ChannelId::X, ChannelId::Z, ChannelId::X],
755  ///   vec![ChannelId::Y]);
756  /// assert_eq!(
757  ///   result, Err (vec![process::DefineError::DuplicateSourcepoint]));
758  /// # }
759  /// ```
760  ///
761  /// Duplicate endpoint:
762  ///
763  /// ```
764  /// # extern crate apis;
765  /// # use apis::{channel,message,process};
766  /// # use apis::session::mock::*;
767  /// # fn main() {
768  /// let result = process::Def::<Mycontext>::define (
769  ///   ProcessId::A,
770  ///   process::Kind::isochronous_default(),
771  ///   vec![ChannelId::X, ChannelId::Z],
772  ///   vec![ChannelId::Y, ChannelId::Y]);
773  /// assert_eq!(
774  ///   result, Err (vec![process::DefineError::DuplicateEndpoint]));
775  /// # }
776  /// ```
777  ///
778  /// Self-loop:
779  ///
780  /// ```
781  /// # extern crate apis;
782  /// # use apis::{channel,message,process};
783  /// # use apis::session::mock::*;
784  /// # fn main() {
785  /// let result = process::Def::<Mycontext>::define (
786  ///   ProcessId::A,
787  ///   process::Kind::isochronous_default(),
788  ///   vec![ChannelId::X, ChannelId::Z],
789  ///   vec![ChannelId::Y, ChannelId::Z]);
790  /// assert_eq!(
791  ///   result, Err (vec![process::DefineError::SourcepointEqEndpoint]));
792  /// # }
793  /// ```
794  ///
795  /// Asynchronous process zero endpoints:
796  ///
797  /// ```
798  /// # extern crate apis;
799  /// # use apis::{channel,message,process};
800  /// # use apis::session::mock::*;
801  /// # use channel::Id;
802  /// # fn main() {
803  /// let result = process::Def::<Mycontext>::define (
804  ///   ProcessId::A,
805  ///   process::Kind::asynchronous_default(),
806  ///   vec![ChannelId::Z],
807  ///   vec![]);
808  /// assert_eq!(
809  ///   result,
810  ///   Err (vec![process::DefineError::AsynchronousZeroEndpoints]));
811  /// # }
812  /// ```
813  ///
814  /// Asynchronous process multiple endpoints:
815  ///
816  /// ```
817  /// # extern crate apis;
818  /// # use apis::{channel,message,process};
819  /// # use apis::session::mock::*;
820  /// # use channel::Id;
821  /// # fn main() {
822  /// let result = process::Def::<Mycontext>::define (
823  ///   ProcessId::A,
824  ///   process::Kind::asynchronous_default(),
825  ///   vec![ChannelId::Z],
826  ///   vec![ChannelId::X, ChannelId::Y]);
827  /// assert_eq!(
828  ///   result,
829  ///   Err (vec![process::DefineError::AsynchronousMultipleEndpoints]));
830  /// # }
831  /// ```
832  ///
833  pub fn define (
834    id           : CTX::PID,
835    kind         : Kind,
836    sourcepoints : Vec <CTX::CID>,
837    endpoints    : Vec <CTX::CID>
838  ) -> Result <Self, Vec <DefineError>> {
839    let def = Def {
840      id, kind, sourcepoints, endpoints
841    };
842    def.validate_role() ?;
843    Ok (def)
844  }
845
846  pub fn id (&self) -> &CTX::PID {
847    &self.id
848  }
849
850  pub fn kind (&self) -> &Kind {
851    &self.kind
852  }
853
854  pub fn sourcepoints (&self) -> &Vec <CTX::CID> {
855    &self.sourcepoints
856  }
857
858  pub fn endpoints (&self) -> &Vec <CTX::CID> {
859    &self.endpoints
860  }
861
862  fn validate_role (&self) -> Result <(), Vec <DefineError>> {
863    let mut errors = Vec::new();
864
865    // we will not check that a process has zero sourcepoints or endpoints
866
867    // duplicate sourcepoints
868    let mut producers_dedup = self.sourcepoints.clone();
869    producers_dedup.as_mut_slice().sort();
870    producers_dedup.dedup_by (|x,y| x == y);
871    if producers_dedup.len() < self.sourcepoints.len() {
872      errors.push (DefineError::DuplicateSourcepoint);
873    }
874
875    // duplicate endpoints
876    let mut consumers_dedup = self.endpoints.clone();
877    consumers_dedup.as_mut_slice().sort();
878    consumers_dedup.dedup_by (|x,y| x == y);
879    if consumers_dedup.len() < self.endpoints.len() {
880      errors.push (DefineError::DuplicateEndpoint);
881    }
882
883    // self-loops
884    let mut producers_and_consumers = producers_dedup.clone();
885    producers_and_consumers.append (&mut consumers_dedup.clone());
886    producers_and_consumers.as_mut_slice().sort();
887    producers_and_consumers.dedup_by (|x,y| x == y);
888    if producers_and_consumers.len()
889      < producers_dedup.len() + consumers_dedup.len()
890    {
891      errors.push (DefineError::SourcepointEqEndpoint);
892    }
893
894    // validate process kind
895    if let Err (mut errs)
896      = self.kind.validate_role::<CTX> (&self.sourcepoints, &self.endpoints)
897    {
898      errors.append (&mut errs);
899    }
900
901    if !errors.is_empty() {
902      Err (errors)
903    } else {
904      Ok (())
905    }
906  }
907}
908
909impl Kind {
910  pub fn asynchronous_default() -> Self {
911    const MESSAGES_PER_UPDATE : u32 = 1;
912    Kind::new_asynchronous (MESSAGES_PER_UPDATE).unwrap()
913  }
914
915  pub fn isochronous_default() -> Self {
916    const TICK_MS          : u32 = 1000;
917    const TICKS_PER_UPDATE : u32 = 1;
918    Kind::new_isochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
919  }
920
921  pub fn mesochronous_default() -> Self {
922    const TICK_MS          : u32 = 1000;
923    const TICKS_PER_UPDATE : u32 = 1;
924    Kind::new_mesochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
925  }
926
927  pub fn anisochronous_default() -> Self {
928    Kind::new_anisochronous()
929  }
930
931  pub fn new_asynchronous (messages_per_update : u32)
932    -> Result <Self, Vec <KindError>>
933  {
934    let mut errors = Vec::new();
935    if messages_per_update == 0 {
936      errors.push (KindError::AsynchronousZeroMessagesPerUpdate)
937    }
938    if !errors.is_empty() {
939      Err (errors)
940    } else {
941      Ok (Kind::Asynchronous { messages_per_update })
942    }
943  }
944
945  pub fn new_isochronous (tick_ms : u32, ticks_per_update : u32)
946    -> Result <Self, Vec <KindError>>
947  {
948    let mut errors = Vec::new();
949    if tick_ms == 0 {
950      errors.push (KindError::IsochronousZeroTickMs)
951    }
952    if ticks_per_update == 0 {
953      errors.push (KindError::IsochronousZeroTicksPerUpdate)
954    }
955    if !errors.is_empty() {
956      Err (errors)
957    } else {
958      Ok (Kind::Isochronous { tick_ms, ticks_per_update })
959    }
960  }
961
962  pub fn new_mesochronous (tick_ms : u32, ticks_per_update : u32)
963    -> Result <Self, Vec <KindError>>
964  {
965    let mut errors = Vec::new();
966    if tick_ms == 0 {
967      errors.push (KindError::MesochronousZeroTickMs)
968    }
969    if ticks_per_update == 0 {
970      errors.push (KindError::MesochronousZeroTicksPerUpdate)
971    }
972    if !errors.is_empty() {
973      Err (errors)
974    } else {
975      Ok (Kind::Isochronous { tick_ms, ticks_per_update })
976    }
977  }
978
979  #[inline]
980  pub fn new_anisochronous() -> Self {
981    Kind::Anisochronous
982  }
983
984  fn validate_role <CTX : session::Context> (&self,
985    _sourcepoints : &Vec <CTX::CID>,
986    endpoints     : &Vec <CTX::CID>
987  ) -> Result <(), Vec <DefineError>> {
988    let mut errors = Vec::new();
989
990    match *self {
991      Kind::Asynchronous {..} => {
992        // asynchronous processes must have exactly one endpoint
993        if endpoints.len() == 0 {
994          errors.push (DefineError::AsynchronousZeroEndpoints)
995        } else if 1 < endpoints.len() {
996          errors.push (DefineError::AsynchronousMultipleEndpoints)
997        }
998      }
999      Kind::Isochronous   {..} => { /* no restrictions */ }
1000      Kind::Mesochronous  {..} => { /* no restrictions */ }
1001      Kind::Anisochronous {..} => { /* no restrictions */ }
1002    }
1003
1004    if !errors.is_empty() {
1005      Err (errors)
1006    } else {
1007      Ok (())
1008    }
1009  }
1010
1011} // end impl Kind
1012
1013impl <M> From <Result <(), channel::SendError <M>>> for ControlFlow {
1014  fn from (send_result : Result <(), channel::SendError <M>>) -> Self {
1015    match send_result {
1016      Ok  (_) => ControlFlow::Continue,
1017      Err (_) => ControlFlow::Break
1018    }
1019  }
1020}
1021
1022////////////////////////////////////////////////////////////////////////////////
1023//  functions                                                                 //
1024////////////////////////////////////////////////////////////////////////////////
1025
1026//
1027//  public
1028//
1029pub fn report_sizes <CTX : session::Context> () where
1030  CTX : 'static
1031{
1032  println!("process report sizes...");
1033  println!("  size of process::Def: {}", std::mem::size_of::<Def <CTX>>());
1034  Inner::<CTX>::report_sizes();
1035  println!("...process report sizes");
1036}
1037
1038//
1039//  private
1040//
1041
1042//
1043//  fn poll_messages
1044//
1045/// Message polling loop for `Isochronous`, `Mesochronous`, and `Anisochronous`
1046/// processes.
1047#[inline]
1048fn poll_messages <CTX, P, RES> (
1049  process           : &mut P,
1050  endpoints         : &VecMap <Box <dyn channel::Endpoint <CTX>>>,
1051  open_channels     : &mut smallvec::SmallVec <[bool; 8]>,
1052  num_open_channels : &mut usize,
1053  message_count     : &mut usize)
1054where
1055  CTX : session::Context + 'static,
1056  P   : Process <CTX, RES> + Sized,
1057  RES : Presult <CTX, P>
1058{
1059  use message::Global;
1060  #[inline]
1061  fn channel_close (is_open : &mut bool, num_open : &mut usize) {
1062    debug_assert!(*is_open);
1063    debug_assert!(0 < *num_open);
1064    *is_open = false;
1065    *num_open -= 1;
1066  }
1067
1068  // for each open channel (outer loop), poll for messages with try_recv (inner
1069  // loop) until "empty" or "disconnected" is encountered
1070  let mut open_index = 0;
1071  'poll_outer: for (cid, endpoint) in endpoints.iter() {
1072    // NOTE: unwrap requires that err is debug
1073    let channel_id = match CTX::CID::try_from (cid as u16) {
1074      Ok  (cid) => cid,
1075      Err (_)   => unreachable!()
1076    };
1077    let channel_open = &mut open_channels[open_index];
1078    open_index += 1;
1079    if !*channel_open {
1080      continue 'poll_outer
1081    }
1082    'poll_inner: loop {
1083      match endpoint.try_recv() {
1084        Ok (message) => {
1085          log::debug!(
1086            process:?=process.id(),
1087            channel:?=channel_id,
1088            message=message.inner_name().as_str();
1089            "process received message");
1090          *message_count += 1;
1091          match process.handle_message (message) {
1092            ControlFlow::Continue => {}
1093            ControlFlow::Break    => {
1094              channel_close (channel_open, num_open_channels);
1095              // only transition to "ended" if this is the last channel to close
1096              if *num_open_channels == 0 {
1097                process.inner_mut().handle_event (
1098                  inner::EventParams::End{}.into()
1099                ).unwrap();
1100              }
1101              break 'poll_inner
1102            }
1103          }
1104        }
1105        Err (channel::TryRecvError::Empty) => { break 'poll_inner }
1106        Err (channel::TryRecvError::Disconnected) => {
1107          log::info!(process:?=process.id(), channel:?=channel_id;
1108            "process receive failed: sender disconnected");
1109          channel_close (channel_open, num_open_channels);
1110          if *num_open_channels == 0 {
1111            process.inner_mut().handle_event (inner::EventParams::End{}.into())
1112              .unwrap();
1113          }
1114          break 'poll_inner
1115        }
1116      } // end match try_recv
1117    } // end 'poll_inner
1118  } // end 'poll_outer
1119} // end fn poll_messages