apis/session/
mod.rs

1//! Main session datatype.
2
3use {std, either, vec_map};
4use std::convert::TryFrom;
5use macro_machines::def_machine_nodefault;
6use strum::{EnumCount, IntoEnumIterator};
7use crate::{channel, message, process};
8
9mod macro_def;
10
11////////////////////////////////////////////////////////////////////////////////
12//  typedefs
13////////////////////////////////////////////////////////////////////////////////
14
15pub type Continuation <CTX> =
16  Box <dyn FnOnce (<CTX as Context>::GPROC) -> Option <()> + Send>;
17
18////////////////////////////////////////////////////////////////////////////////
19//  structs
20////////////////////////////////////////////////////////////////////////////////
21
22//
23//  struct Session
24//
25def_machine_nodefault! {
26  Session <CTX : { Context }> (
27    def             : Def <CTX>,
28    process_handles : vec_map::VecMap <process::Handle <CTX>>,
29    main_process    : Option <Box <CTX::GPROC>>
30  ) @ _session {
31    STATES [
32      state Ready   ()
33      state Running ()
34      state Ended   ()
35    ]
36    EVENTS [
37      event Run <Ready>   => <Running> ()
38      event End <Running> => <Ended>   ()
39    ]
40    initial_state:  Ready
41    terminal_state: Ended {
42      terminate_success: {
43        _session.finish();
44      }
45      terminate_failure: {
46        panic!("session dropped in state: {:?}", _session.state_id());
47      }
48    }
49  }
50}
51
52/// Session metainformation.
53#[derive(Clone, Debug, Eq, PartialEq)]
54pub struct Def <CTX : Context> {
55  name        : &'static str,
56  channel_def : vec_map::VecMap <channel::Def <CTX>>,
57  process_def : vec_map::VecMap <process::Def <CTX>>
58}
59
60/// Handle to the session held by processes.
61pub struct Handle <CTX : Context> {
62  pub result_tx       : std::sync::mpsc::Sender <CTX::GPRES>,
63  pub continuation_rx : std::sync::mpsc::Receiver <Continuation <CTX>>
64}
65
66////////////////////////////////////////////////////////////////////////////////
67//  enums
68////////////////////////////////////////////////////////////////////////////////
69
70/// Error in `Def` definition.
71///
72/// There needs to be a one-to-one correspondence between the consumers and
73/// producers specified in the channel infos and the sourcepoints and
74/// endpoints as specified in the process infos.
75#[derive(Clone, Debug, Eq, PartialEq)]
76pub enum DefineError {
77  ProducerSourcepointMismatch,
78  ConsumerEndpointMismatch
79}
80
81////////////////////////////////////////////////////////////////////////////////
82//  traits
83////////////////////////////////////////////////////////////////////////////////
84
85/// Trait specifying types in session context with a method to attempt to create
86/// a valid session def struct from those types.
87pub trait Context where Self : Clone + PartialEq + Sized + std::fmt::Debug {
88  type MID   : message::Id;
89  type CID   : channel::Id <Self>;
90  type PID   : process::Id <Self>;
91  /// The global message type.
92  type GMSG  : message::Global <Self>;
93  /// The global process type.
94  type GPROC : process::Global <Self>;
95  /// The global process result type.
96  type GPRES : process::presult::Global <Self>;
97
98  //required
99  fn maybe_main () -> Option <Self::PID>;
100  fn name       () -> &'static str;
101  // helper functions for dotfile creation
102  fn process_field_names()     -> Vec <Vec <&'static str>>;
103  fn process_field_types()     -> Vec <Vec <&'static str>>;
104  fn process_field_defaults()  -> Vec <Vec <&'static str>>;
105  fn process_result_types()    -> Vec <&'static str>;
106  fn process_result_defaults() -> Vec <&'static str>;
107  fn channel_local_types()     -> Vec <&'static str>;
108
109  // provided
110  //
111  //  fn def()
112  //
113  /// Return a session def struct if the defined context is valid.
114
115  /// # Errors
116  ///
117  /// Process sourcepoints do not correspond one-to-one with channel producers:
118  ///
119  /// ```
120  /// extern crate apis;
121  ///
122  /// apis::def_session! {
123  ///   context Mycontext {
124  ///     PROCESSES where
125  ///       let process    = self,
126  ///       let message_in = message_in
127  ///     [
128  ///       process A () {
129  ///         kind { apis::process::Kind::isochronous_default() }
130  ///         sourcepoints [X]
131  ///         endpoints    []
132  ///         handle_message { apis::process::ControlFlow::Break }
133  ///         update         { apis::process::ControlFlow::Break }
134  ///       }
135  ///       process B () {
136  ///         kind { apis::process::Kind::isochronous_default() }
137  ///         sourcepoints []
138  ///         endpoints    [X, Y]
139  ///         handle_message { apis::process::ControlFlow::Break }
140  ///         update         { apis::process::ControlFlow::Break }
141  ///       }
142  ///     ]
143  ///     CHANNELS  [
144  ///       channel X <T> (Simplex) {
145  ///         producers [A]
146  ///         consumers [B]
147  ///       }
148  ///       channel Y <T> (Sink) {
149  ///         producers [A]
150  ///         consumers [B]
151  ///       }
152  ///     ]
153  ///     MESSAGES [
154  ///       message T {}
155  ///     ]
156  ///   }
157  /// }
158  ///
159  /// fn main() {
160  ///   use apis::session::Context;
161  ///   assert_eq!(
162  ///     Mycontext::def(),
163  ///     Err (vec![apis::session::DefineError::ProducerSourcepointMismatch]));
164  /// }
165  /// ```
166
167  /// Process endpoints do not correspond one-to-one with channel consumers:
168  ///
169  /// ```
170  /// extern crate apis;
171  ///
172  /// apis::def_session! {
173  ///   context Mycontext {
174  ///     PROCESSES where
175  ///       let process    = self,
176  ///       let message_in = message_in
177  ///     [
178  ///       process A () {
179  ///         kind { apis::process::Kind::isochronous_default() }
180  ///         sourcepoints [X,Y]
181  ///         endpoints    []
182  ///         handle_message { apis::process::ControlFlow::Break }
183  ///         update         { apis::process::ControlFlow::Break }
184  ///       }
185  ///       process B () {
186  ///         kind { apis::process::Kind::isochronous_default() }
187  ///         sourcepoints []
188  ///         endpoints    [X]
189  ///         handle_message { apis::process::ControlFlow::Break }
190  ///         update         { apis::process::ControlFlow::Break }
191  ///       }
192  ///     ]
193  ///     CHANNELS  [
194  ///       channel X <T> (Simplex) {
195  ///         producers [A]
196  ///         consumers [B]
197  ///       }
198  ///       channel Y <T> (Sink) {
199  ///         producers [A]
200  ///         consumers [B]
201  ///       }
202  ///     ]
203  ///     MESSAGES [
204  ///       message T {}
205  ///     ]
206  ///   }
207  /// }
208  ///
209  /// fn main() {
210  ///   use apis::session::Context;
211  ///   assert_eq!(
212  ///     Mycontext::def(),
213  ///     Err (vec![apis::session::DefineError::ConsumerEndpointMismatch]));
214  /// }
215  /// ```
216
217  fn def() -> Result <Def <Self>, Vec <DefineError>> {
218    let mut channel_def = vec_map::VecMap::new();
219    for cid in Self::CID::iter() {
220      assert!{
221        channel_def.insert (
222          cid.clone().into(), channel::Id::def (&cid)
223        ).is_none()
224      }
225    }
226    let mut process_def = vec_map::VecMap::new();
227    for pid in Self::PID::iter() {
228      assert!{
229        process_def.insert (
230          pid.clone().into(), process::Id::def (&pid)
231        ).is_none()
232      }
233    }
234
235    Def::define (Self::name(), channel_def, process_def)
236  }
237
238}
239
240////////////////////////////////////////////////////////////////////////////////
241//  impls
242////////////////////////////////////////////////////////////////////////////////
243
244impl <CTX : Context> Session <CTX> {
245  pub fn def (&self) -> &Def <CTX> {
246    &self.extended_state().def
247  }
248
249  // MachineDotfile::name conflicts
250  #[expect(clippy::same_name_method)]
251  pub fn name (&self) -> &'static str {
252    self.def().name
253  }
254
255  /// Creates a new session and runs to completion.
256  ///
257  /// Transitions from `Ready` to `Running`, starts processes not already
258  /// running (those present in the `process_handles` argument), waits for
259  /// results and finally transitions to `Ended`.
260  pub fn run (&mut self) -> vec_map::VecMap <CTX::GPRES> {
261    let channels = self.as_ref().def.create_channels();
262    self.run_with (channels, vec_map::VecMap::new(), None)
263  }
264
265  /// Run a session with given channels and handles to processes that are
266  /// running in a continuation from a previous session.
267  pub fn run_with (&mut self,
268    channels        : vec_map::VecMap <channel::Channel <CTX>>,
269    process_handles : vec_map::VecMap <process::Handle <CTX>>,
270    main_process    : Option <Box <CTX::GPROC>>
271  ) -> vec_map::VecMap <CTX::GPRES> {
272    use process::Global;
273
274    self.start (process_handles, channels, main_process);
275    if let Some (ref mut main_gproc) = self.as_mut().main_process {
276      main_gproc.run();
277    }
278    let mut results = vec_map::VecMap::with_capacity (CTX::PID::COUNT);
279    for (pid, process_handle) in self.as_mut().process_handles.iter() {
280      assert!{
281        results.insert (pid, process_handle.result_rx.recv().unwrap()).is_none()
282      }
283    }
284    self.handle_event (EventParams::End{}.into()).unwrap();
285    results
286  }
287
288  /// Spawn processes.
289  fn start (&mut self,
290    mut process_handles : vec_map::VecMap <process::Handle <CTX>>,
291    mut channels        : vec_map::VecMap <channel::Channel <CTX>>,
292    mut main_process    : Option <Box <CTX::GPROC>>
293  ) {
294    if cfg!(debug_assertions) && let Some (gproc) = main_process.as_ref() {
295      use process::Global;
296      assert!(process_handles.contains_key (gproc.id().into()));
297    }
298
299    { // spawn processes not found in input process handles
300      let extended_state = self.as_mut();
301      for (pid, process_def) in extended_state.def.process_def.iter() {
302        let process_handle = process_handles.remove (pid).unwrap_or_else (||{
303          // peer channels
304          let mut sourcepoints
305            : vec_map::VecMap <Box <dyn channel::Sourcepoint <CTX>>>
306            = vec_map::VecMap::new();
307          let mut endpoints
308            : vec_map::VecMap <Box <dyn channel::Endpoint <CTX>>>
309            = vec_map::VecMap::new();
310          for (cid, channel) in channels.iter_mut() {
311            if let Some (sourcepoint) = channel.sourcepoints.remove (pid) {
312              assert!(sourcepoints.insert (cid, sourcepoint).is_none());
313            }
314            if let Some (endpoint) = channel.endpoints.remove (pid) {
315              assert!(endpoints.insert (cid, endpoint).is_none());
316            }
317          }
318          // session control channels
319          let (result_tx, result_rx) = std::sync::mpsc::channel::<CTX::GPRES>();
320          let (continuation_tx, continuation_rx) =
321            std::sync::mpsc::channel::<Continuation <CTX>>();
322          // create the process
323          let session_handle = Handle::<CTX> { result_tx, continuation_rx };
324          let inner = process::Inner::new (process::inner::ExtendedState::new (
325            Some (process_def.clone()),
326            Some (session_handle),
327            Some (sourcepoints),
328            Some (std::cell::RefCell::new (Some (endpoints)))
329          ).unwrap());
330          // if the process is the main process, only create it and don't spawn
331          if let Some (main_process_id) = CTX::maybe_main()
332            && *inner.as_ref().def.id() == main_process_id
333          {
334            // this code should only be hit when a main process was not provided as an
335            // input since it should be accompanied by a process handle
336            debug_assert!(main_process.is_none());
337            main_process = Some (Box::new (process::Id::gproc (inner)));
338            return process::Handle {
339              result_rx, continuation_tx,
340              join_or_continue: either::Either::Right (None)
341            }
342          }
343          // spawn the process
344          let join_handle = process::Id::spawn (inner);
345          process::Handle {
346            result_rx, continuation_tx,
347            join_or_continue: either::Either::Left (join_handle)
348          }
349        });
350        // store the process handle
351        assert!(extended_state.process_handles.insert (pid, process_handle).is_none());
352      }
353      // take the main process if one was created
354      extended_state.main_process = main_process.take();
355    } // end spawn all processes not found in input process handles
356    self.handle_event (EventParams::Run{}.into()).unwrap();
357
358    log::debug!(session=self.name(); "session started");
359  }
360
361  /// Send continuations and wait until terminated threads have joined.
362  fn finish (&mut self) where Self : Sized {
363    for (_, process_handle) in self.as_mut().process_handles.drain() {
364      match process_handle.join_or_continue {
365        either::Either::Left (join_handle) => {
366          // terminate
367          process_handle.continuation_tx.send (
368           Box::new (|_ : CTX::GPROC| Some (()))
369          ).unwrap();
370          join_handle.join().unwrap().unwrap()
371        }
372        either::Either::Right (Some (continuation)) => {
373          process_handle.continuation_tx.send (continuation).unwrap();
374        }
375        either::Either::Right (None) => { /* do nothing */ }
376      }
377    }
378    log::debug!(session=self.name(); "session finished");
379  }
380} // end impl Session
381
382impl <CTX : Context> std::fmt::Debug for Session <CTX> {
383  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
384    write!(f, "{}({:?})", self.name(), self.state_id())
385  }
386}
387
388
389impl <CTX : Context> Def <CTX> {
390  pub fn create_channels (&self) -> vec_map::VecMap <channel::Channel <CTX>> {
391    let mut channels = vec_map::VecMap::new();
392    for (cid, channel_def) in self.channel_def.iter() {
393      debug_assert_eq!(cid, channel_def.id().clone().into());
394      assert!(channels.insert (cid, channel::Id::create (channel_def.clone()))
395        .is_none());
396    }
397    channels
398  }
399
400  /// Generate a graphviz DOT file of the data flow diagram for the defined
401  /// session
402  #[inline]
403  pub fn dotfile (&self) -> String {
404    self.session_dotfile (true)
405  }
406
407  /// Generate a graphviz diagram of the data flow diagram for the defined
408  /// session with default expressions shown
409  #[inline]
410  pub fn dotfile_show_defaults (&self) -> String {
411    self.session_dotfile (false)
412  }
413
414  //
415  //  private functions
416  //
417
418  /// The only method to create a valid session def struct.  Validates the
419  /// channel def and process def definitions for one-to-one correspondence
420  /// of producers and consumers to sourcepoints and endpoints, respectively.
421  ///
422  /// See public trait method `Context::def` for errors.
423  fn define (
424    name        :  &'static str,
425    channel_def : vec_map::VecMap <channel::Def <CTX>>,
426    process_def : vec_map::VecMap <process::Def <CTX>>
427  ) -> Result <Self, Vec <DefineError>> {
428    let def = Def {
429      name,
430      channel_def,
431      process_def
432    };
433    def.validate_roles() ?;
434    Ok (def)
435  }
436
437  fn validate_roles (&self) -> Result <(), Vec <DefineError>> {
438    let mut errors = Vec::new();
439
440    // create empty vec maps representing the sourcepoints and endpoints for
441    // each process
442    let mut sourcepoints_from_channels : vec_map::VecMap <Vec <CTX::CID>> = {
443      let mut v = vec_map::VecMap::new();
444      for pid in CTX::PID::iter() {
445        assert!(v.insert (pid.into(), Vec::new()).is_none());
446      }
447      v
448    };
449    let mut endpoints_from_channels : vec_map::VecMap <Vec <CTX::CID>>
450      = sourcepoints_from_channels.clone();
451
452    // fill the sourcepoint and endpoint vec maps according to the channel def
453    // specifications
454    for (cid, channel_def) in self.channel_def.iter() {
455      #[expect(clippy::cast_possible_truncation)]
456      // NOTE: unwrap requires that err is debug
457      let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
458        else { unreachable!() };
459      debug_assert_eq!(channel_id, *channel_def.id());
460      for producer_id in channel_def.producers().iter() {
461        let pid : usize  = producer_id.clone().into();
462        let sourcepoints = &mut sourcepoints_from_channels[pid];
463        sourcepoints.push (channel_id.clone());
464      }
465      for consumer_id in channel_def.consumers().iter() {
466        let pid : usize = consumer_id.clone().into();
467        let endpoints   = &mut endpoints_from_channels[pid];
468        endpoints.push (channel_id.clone());
469      }
470    }
471
472    // compare the resulting sourcepoint and endpoint vec maps against those
473    // defined in the process infos
474    for (pid, process_def) in self.process_def.iter() {
475      debug_assert_eq!(pid, process_def.id().clone().into());
476      let sourcepoints_from_channels = &mut sourcepoints_from_channels[pid];
477      sourcepoints_from_channels.as_mut_slice().sort();
478      let mut sourcepoints = process_def.sourcepoints().clone();
479      sourcepoints.as_mut_slice().sort();
480      if sourcepoints != *sourcepoints_from_channels {
481        errors.push (DefineError::ProducerSourcepointMismatch);
482        break
483      }
484    }
485
486    for (pid, process_def) in self.process_def.iter() {
487      debug_assert_eq!(pid, process_def.id().clone().into());
488      let endpoints_from_channels = &mut endpoints_from_channels[pid];
489      endpoints_from_channels.as_mut_slice().sort();
490      let mut endpoints = process_def.endpoints().clone();
491      endpoints.as_mut_slice().sort();
492      if endpoints != *endpoints_from_channels {
493        errors.push (DefineError::ConsumerEndpointMismatch);
494        break
495      }
496    }
497
498    if !errors.is_empty() {
499      Err (errors)
500    } else {
501      Ok (())
502    }
503  }
504
505  fn session_dotfile (&self, hide_defaults : bool) -> String {
506    /// Escape HTML special characters
507    #[inline]
508    fn escape (s : String) -> String {
509      use marksman_escape::Escape;
510      String::from_utf8 (Escape::new (s.bytes()).collect()).unwrap()
511    }
512    let mut s = String::new();
513
514    // begin graph
515    s.push_str (
516      "digraph {\
517     \n  overlap=scale\
518     \n  rankdir=LR\
519     \n  node [shape=hexagon, fontname=\"Sans Bold\"]\
520     \n  edge [style=dashed, arrowhead=vee, fontname=\"Sans\"]\n");
521
522    // begin subgraph
523    debug_assert_eq!(self.name, CTX::name());
524    let context_str = self.name;
525    s.push_str (format!(
526      "  subgraph cluster_{context_str} {{\n").as_str());
527    s.push_str (format!(
528      "    label=<{context_str}>").as_str());
529    s.push_str ( "\
530     \n    shape=record\
531     \n    style=rounded\
532     \n    fontname=\"Sans Bold Italic\"\n");
533
534    // nodes (processes)
535    let process_field_names     = CTX::process_field_names();
536    let process_field_types     = CTX::process_field_types();
537    let process_field_defaults  = CTX::process_field_defaults();
538    let process_result_types    = CTX::process_result_types();
539    let process_result_defaults = CTX::process_result_defaults();
540    debug_assert_eq!(process_field_names.len(), process_field_types.len());
541    debug_assert_eq!(process_field_types.len(), process_field_defaults.len());
542    debug_assert_eq!(process_field_defaults.len(), process_result_types.len());
543    debug_assert_eq!(process_result_types.len(), process_result_defaults.len());
544    for (pid, process_def) in self.process_def.iter() {
545      let process_id = process_def.id();
546      s.push_str (format!(
547        "    {process_id:?} [label=<<TABLE BORDER=\"0\"><TR><TD><B>{process_id:?}</B></TD></TR>").as_str());
548
549      let process_field_names    = &process_field_names[pid];
550      let process_field_types    = &process_field_types[pid];
551      let process_field_defaults = &process_field_defaults[pid];
552      debug_assert_eq!(process_field_names.len(), process_field_types.len());
553      debug_assert_eq!(process_field_types.len(), process_field_defaults.len());
554
555      let mut mono_font          = false;
556      if !process_field_names.is_empty() {
557        s.push_str ("<TR><TD><FONT FACE=\"Mono\"><BR/>");
558        mono_font = true;
559
560        //
561        //  for each data field, print a line
562        //
563        // TODO: we are manually aligning the columns of the field name, field
564        // type, and default values, is there a better way ? (record node, html
565        // table, format width?)
566        let mut field_string = String::new();
567        let separator = ",<BR ALIGN=\"LEFT\"/>";
568
569        let longest_fieldname = process_field_names.iter()
570          .fold (0, |longest, fieldname| std::cmp::max (longest, fieldname.len()));
571
572        let longest_typename = process_field_types.iter()
573          .fold (0, |longest, typename| std::cmp::max (longest, typename.len()));
574
575        for (i,f) in process_field_names.iter().enumerate() {
576          let spacer1 : String = std::iter::repeat_n (' ', longest_fieldname - f.len())
577            .collect();
578          let spacer2 : String = std::iter::repeat_n (
579            ' ', longest_typename - process_field_types[i].len()
580          ).collect();
581
582          if !hide_defaults {
583            field_string.push_str (escape (format!(
584              "{}{} : {}{} = {}",
585              f, spacer1, process_field_types[i], spacer2, process_field_defaults[i]
586            )).as_str());
587          } else {
588            field_string.push_str (escape (format!(
589              "{}{} : {}", f, spacer1, process_field_types[i]
590            )).as_str());
591          }
592          field_string.push_str (separator.to_string().as_str());
593        }
594
595        let len = field_string.len();
596        field_string.truncate (len - separator.len());
597        s.push_str (field_string.as_str());
598      } // end print line for each field
599
600      let result_type = process_result_types[pid];
601      if !result_type.is_empty() {
602        if !mono_font {
603          s.push_str ("<TR><TD><FONT FACE=\"Mono\"><BR/>");
604          mono_font = true;
605        } else {
606          s.push_str ("<BR ALIGN=\"LEFT\"/></FONT></TD></TR>\
607            <TR><TD><FONT FACE=\"Mono\"><BR/>");
608        }
609        let result_default = process_result_defaults[pid];
610        if !hide_defaults {
611          s.push_str (escape (format!(
612            "-> {result_type} = {result_default}"
613          )).as_str());
614        } else {
615          s.push_str (escape (format!("-> {result_type}")).as_str());
616        }
617      }
618
619      /*
620      if s.chars().last().unwrap() == '>' {
621        let len = s.len();
622        s.truncate (len-5);
623      } else {
624        s.push_str ("</FONT>");
625      }
626      */
627
628      if mono_font {
629        s.push_str ("<BR ALIGN=\"LEFT\"/></FONT></TD></TR>");
630      }
631
632      s.push_str ("</TABLE>>]\n");
633    } // end node for each process
634
635    // channels (edges)
636    let channel_local_types = CTX::channel_local_types();
637    for (cid, channel_def) in self.channel_def.iter() {
638      let channel_id     = channel_def.id();
639      let producers      = channel_def.producers();
640      let consumers      = channel_def.consumers();
641      let kind           = channel_def.kind();
642      let local_type     = channel_local_types[cid];
643      let channel_string = escape (format!("{channel_id:?} <{local_type}>"));
644      match *kind {
645        channel::Kind::Simplex => {
646          debug_assert_eq!(producers.len(), 1);
647          debug_assert_eq!(consumers.len(), 1);
648          s.push_str (format!(
649            "    {:?} -> {:?} [label=<<FONT FACE=\"Sans Italic\">{}</FONT>>]\n",
650            producers[0],
651            consumers[0],
652            channel_string).as_str());
653        }
654        channel::Kind::Source => {
655          debug_assert_eq!(producers.len(), 1);
656          // create a node
657          s.push_str (format!(
658            "    {channel_id:?} [label=<<B>+</B>>,\
659           \n      shape=diamond, style=\"\",\
660           \n      xlabel=<<FONT FACE=\"Sans Italic\">{channel_string}</FONT>>]\n").as_str());
661          // edges
662          s.push_str (format!(
663            "    {:?} -> {:?} []\n", producers[0], channel_id
664          ).as_str());
665          for consumer in consumers.as_slice() {
666            s.push_str (format!(
667              "    {channel_id:?} -> {consumer:?} []\n"
668            ).as_str());
669          }
670        }
671        channel::Kind::Sink => {
672          debug_assert_eq!(consumers.len(), 1);
673          // create a node
674          s.push_str (format!(
675            "    {channel_id:?} [label=<<B>+</B>>,\n      \
676                shape=diamond, style=\"\",\n      \
677                xlabel=<<FONT FACE=\"Sans Italic\">{channel_string}</FONT>>]\n").as_str());
678          // edges
679          s.push_str (format!(
680            "    {:?} -> {:?} []\n", channel_id, consumers[0]
681          ).as_str());
682          for producer in producers.as_slice() {
683            s.push_str (format!(
684              "    {producer:?} -> {channel_id:?} []\n"
685            ).as_str());
686          }
687        }
688      }
689    } // end edge for each channel
690
691    //  end graph
692    s.push_str (
693      "  }\n\
694      }");
695    s
696  } // end fn session_dotfile
697} // end impl Def
698
699impl <CTX : Context> From <Def <CTX>> for Session <CTX> {
700  fn from (def : Def <CTX>) -> Self {
701    Self::new (ExtendedState::new (
702      Some (def),
703      Some (vec_map::VecMap::new()),
704      Some (None)
705    ).unwrap())
706  }
707}
708
709////////////////////////////////////////////////////////////////////////////////
710//  functions                                                                 //
711////////////////////////////////////////////////////////////////////////////////
712
713pub fn report_sizes <CTX : Context> () {
714  println!("session report sizes...");
715  println!("  size of Session: {}", size_of::<Session <CTX>>());
716  println!("  size of session::Def: {}", size_of::<Def <CTX>>());
717  println!("...session report sizes");
718}
719
720////////////////////////////////////////////////////////////////////////////////
721//  test mock                                                                 //
722////////////////////////////////////////////////////////////////////////////////
723
724#[cfg(any(feature = "test", test))]
725pub mod mock {
726  use crate::{def_session, process};
727  def_session! {
728    context Mycontext {
729      PROCESSES where
730        let process    = self,
731        let message_in = message_in
732      [
733        process A () {
734          kind           { process::Kind::isochronous_default() }
735          sourcepoints   []
736          endpoints      []
737          handle_message { process::ControlFlow::Break }
738          update         { process::ControlFlow::Break }
739        }
740        process B () {
741          kind           { process::Kind::isochronous_default() }
742          sourcepoints   []
743          endpoints      []
744          handle_message { process::ControlFlow::Break }
745          update         { process::ControlFlow::Break }
746        }
747        process C () {
748          kind           { process::Kind::isochronous_default() }
749          sourcepoints   []
750          endpoints      []
751          handle_message { process::ControlFlow::Break }
752          update         { process::ControlFlow::Break }
753        }
754        process D () {
755          kind           { process::Kind::isochronous_default() }
756          sourcepoints   []
757          endpoints      []
758          handle_message { process::ControlFlow::Break }
759          update         { process::ControlFlow::Break }
760        }
761      ]
762      CHANNELS  [
763        channel X <T> (Simplex) {
764          producers [A]
765          consumers [B]
766        }
767        channel Y <U> (Source) {
768          producers [A]
769          consumers [B]
770        }
771        channel Z <V> (Sink) {
772          producers [A]
773          consumers [B]
774        }
775      ]
776      MESSAGES [
777        message T {}
778        message U {}
779        message V {}
780      ]
781    }
782  }
783}