1use {std, either, vec_map};
4use std::convert::TryFrom;
5use macro_machines::def_machine_nodefault;
6use strum::{EnumCount, IntoEnumIterator};
7use crate::{channel, message, process};
8
9mod macro_def;
10
11pub type Continuation <CTX> =
16 Box <dyn FnOnce (<CTX as Context>::GPROC) -> Option <()> + Send>;
17
18def_machine_nodefault! {
26 Session <CTX : { Context }> (
27 def : Def <CTX>,
28 process_handles : vec_map::VecMap <process::Handle <CTX>>,
29 main_process : Option <Box <CTX::GPROC>>
30 ) @ _session {
31 STATES [
32 state Ready ()
33 state Running ()
34 state Ended ()
35 ]
36 EVENTS [
37 event Run <Ready> => <Running> ()
38 event End <Running> => <Ended> ()
39 ]
40 initial_state: Ready
41 terminal_state: Ended {
42 terminate_success: {
43 _session.finish();
44 }
45 terminate_failure: {
46 panic!("session dropped in state: {:?}", _session.state_id());
47 }
48 }
49 }
50}
51
52#[derive(Clone, Debug, Eq, PartialEq)]
54pub struct Def <CTX : Context> {
55 name : &'static str,
56 channel_def : vec_map::VecMap <channel::Def <CTX>>,
57 process_def : vec_map::VecMap <process::Def <CTX>>
58}
59
60pub struct Handle <CTX : Context> {
62 pub result_tx : std::sync::mpsc::Sender <CTX::GPRES>,
63 pub continuation_rx : std::sync::mpsc::Receiver <Continuation <CTX>>
64}
65
66#[derive(Clone, Debug, Eq, PartialEq)]
76pub enum DefineError {
77 ProducerSourcepointMismatch,
78 ConsumerEndpointMismatch
79}
80
81pub trait Context where Self : Clone + PartialEq + Sized + std::fmt::Debug {
88 type MID : message::Id;
89 type CID : channel::Id <Self>;
90 type PID : process::Id <Self>;
91 type GMSG : message::Global <Self>;
93 type GPROC : process::Global <Self>;
95 type GPRES : process::presult::Global <Self>;
97
98 fn maybe_main () -> Option <Self::PID>;
100 fn name () -> &'static str;
101 fn process_field_names() -> Vec <Vec <&'static str>>;
103 fn process_field_types() -> Vec <Vec <&'static str>>;
104 fn process_field_defaults() -> Vec <Vec <&'static str>>;
105 fn process_result_types() -> Vec <&'static str>;
106 fn process_result_defaults() -> Vec <&'static str>;
107 fn channel_local_types() -> Vec <&'static str>;
108
109 fn def() -> Result <Def <Self>, Vec <DefineError>> {
218 let mut channel_def = vec_map::VecMap::new();
219 for cid in Self::CID::iter() {
220 assert!{
221 channel_def.insert (
222 cid.clone().into(), channel::Id::def (&cid)
223 ).is_none()
224 }
225 }
226 let mut process_def = vec_map::VecMap::new();
227 for pid in Self::PID::iter() {
228 assert!{
229 process_def.insert (
230 pid.clone().into(), process::Id::def (&pid)
231 ).is_none()
232 }
233 }
234
235 Def::define (Self::name(), channel_def, process_def)
236 }
237
238}
239
240impl <CTX : Context> Session <CTX> {
245 pub fn def (&self) -> &Def <CTX> {
246 &self.extended_state().def
247 }
248
249 #[expect(clippy::same_name_method)]
251 pub fn name (&self) -> &'static str {
252 self.def().name
253 }
254
255 pub fn run (&mut self) -> vec_map::VecMap <CTX::GPRES> {
261 let channels = self.as_ref().def.create_channels();
262 self.run_with (channels, vec_map::VecMap::new(), None)
263 }
264
265 pub fn run_with (&mut self,
268 channels : vec_map::VecMap <channel::Channel <CTX>>,
269 process_handles : vec_map::VecMap <process::Handle <CTX>>,
270 main_process : Option <Box <CTX::GPROC>>
271 ) -> vec_map::VecMap <CTX::GPRES> {
272 use process::Global;
273
274 self.start (process_handles, channels, main_process);
275 if let Some (ref mut main_gproc) = self.as_mut().main_process {
276 main_gproc.run();
277 }
278 let mut results = vec_map::VecMap::with_capacity (CTX::PID::COUNT);
279 for (pid, process_handle) in self.as_mut().process_handles.iter() {
280 assert!{
281 results.insert (pid, process_handle.result_rx.recv().unwrap()).is_none()
282 }
283 }
284 self.handle_event (EventParams::End{}.into()).unwrap();
285 results
286 }
287
288 fn start (&mut self,
290 mut process_handles : vec_map::VecMap <process::Handle <CTX>>,
291 mut channels : vec_map::VecMap <channel::Channel <CTX>>,
292 mut main_process : Option <Box <CTX::GPROC>>
293 ) {
294 if cfg!(debug_assertions) && let Some (gproc) = main_process.as_ref() {
295 use process::Global;
296 assert!(process_handles.contains_key (gproc.id().into()));
297 }
298
299 { let extended_state = self.as_mut();
301 for (pid, process_def) in extended_state.def.process_def.iter() {
302 let process_handle = process_handles.remove (pid).unwrap_or_else (||{
303 let mut sourcepoints
305 : vec_map::VecMap <Box <dyn channel::Sourcepoint <CTX>>>
306 = vec_map::VecMap::new();
307 let mut endpoints
308 : vec_map::VecMap <Box <dyn channel::Endpoint <CTX>>>
309 = vec_map::VecMap::new();
310 for (cid, channel) in channels.iter_mut() {
311 if let Some (sourcepoint) = channel.sourcepoints.remove (pid) {
312 assert!(sourcepoints.insert (cid, sourcepoint).is_none());
313 }
314 if let Some (endpoint) = channel.endpoints.remove (pid) {
315 assert!(endpoints.insert (cid, endpoint).is_none());
316 }
317 }
318 let (result_tx, result_rx) = std::sync::mpsc::channel::<CTX::GPRES>();
320 let (continuation_tx, continuation_rx) =
321 std::sync::mpsc::channel::<Continuation <CTX>>();
322 let session_handle = Handle::<CTX> { result_tx, continuation_rx };
324 let inner = process::Inner::new (process::inner::ExtendedState::new (
325 Some (process_def.clone()),
326 Some (session_handle),
327 Some (sourcepoints),
328 Some (std::cell::RefCell::new (Some (endpoints)))
329 ).unwrap());
330 if let Some (main_process_id) = CTX::maybe_main()
332 && *inner.as_ref().def.id() == main_process_id
333 {
334 debug_assert!(main_process.is_none());
337 main_process = Some (Box::new (process::Id::gproc (inner)));
338 return process::Handle {
339 result_rx, continuation_tx,
340 join_or_continue: either::Either::Right (None)
341 }
342 }
343 let join_handle = process::Id::spawn (inner);
345 process::Handle {
346 result_rx, continuation_tx,
347 join_or_continue: either::Either::Left (join_handle)
348 }
349 });
350 assert!(extended_state.process_handles.insert (pid, process_handle).is_none());
352 }
353 extended_state.main_process = main_process.take();
355 } self.handle_event (EventParams::Run{}.into()).unwrap();
357
358 log::debug!(session=self.name(); "session started");
359 }
360
361 fn finish (&mut self) where Self : Sized {
363 for (_, process_handle) in self.as_mut().process_handles.drain() {
364 match process_handle.join_or_continue {
365 either::Either::Left (join_handle) => {
366 process_handle.continuation_tx.send (
368 Box::new (|_ : CTX::GPROC| Some (()))
369 ).unwrap();
370 join_handle.join().unwrap().unwrap()
371 }
372 either::Either::Right (Some (continuation)) => {
373 process_handle.continuation_tx.send (continuation).unwrap();
374 }
375 either::Either::Right (None) => { }
376 }
377 }
378 log::debug!(session=self.name(); "session finished");
379 }
380} impl <CTX : Context> std::fmt::Debug for Session <CTX> {
383 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
384 write!(f, "{}({:?})", self.name(), self.state_id())
385 }
386}
387
388
389impl <CTX : Context> Def <CTX> {
390 pub fn create_channels (&self) -> vec_map::VecMap <channel::Channel <CTX>> {
391 let mut channels = vec_map::VecMap::new();
392 for (cid, channel_def) in self.channel_def.iter() {
393 debug_assert_eq!(cid, channel_def.id().clone().into());
394 assert!(channels.insert (cid, channel::Id::create (channel_def.clone()))
395 .is_none());
396 }
397 channels
398 }
399
400 #[inline]
403 pub fn dotfile (&self) -> String {
404 self.session_dotfile (true)
405 }
406
407 #[inline]
410 pub fn dotfile_show_defaults (&self) -> String {
411 self.session_dotfile (false)
412 }
413
414 fn define (
424 name : &'static str,
425 channel_def : vec_map::VecMap <channel::Def <CTX>>,
426 process_def : vec_map::VecMap <process::Def <CTX>>
427 ) -> Result <Self, Vec <DefineError>> {
428 let def = Def {
429 name,
430 channel_def,
431 process_def
432 };
433 def.validate_roles() ?;
434 Ok (def)
435 }
436
437 fn validate_roles (&self) -> Result <(), Vec <DefineError>> {
438 let mut errors = Vec::new();
439
440 let mut sourcepoints_from_channels : vec_map::VecMap <Vec <CTX::CID>> = {
443 let mut v = vec_map::VecMap::new();
444 for pid in CTX::PID::iter() {
445 assert!(v.insert (pid.into(), Vec::new()).is_none());
446 }
447 v
448 };
449 let mut endpoints_from_channels : vec_map::VecMap <Vec <CTX::CID>>
450 = sourcepoints_from_channels.clone();
451
452 for (cid, channel_def) in self.channel_def.iter() {
455 #[expect(clippy::cast_possible_truncation)]
456 let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
458 else { unreachable!() };
459 debug_assert_eq!(channel_id, *channel_def.id());
460 for producer_id in channel_def.producers().iter() {
461 let pid : usize = producer_id.clone().into();
462 let sourcepoints = &mut sourcepoints_from_channels[pid];
463 sourcepoints.push (channel_id.clone());
464 }
465 for consumer_id in channel_def.consumers().iter() {
466 let pid : usize = consumer_id.clone().into();
467 let endpoints = &mut endpoints_from_channels[pid];
468 endpoints.push (channel_id.clone());
469 }
470 }
471
472 for (pid, process_def) in self.process_def.iter() {
475 debug_assert_eq!(pid, process_def.id().clone().into());
476 let sourcepoints_from_channels = &mut sourcepoints_from_channels[pid];
477 sourcepoints_from_channels.as_mut_slice().sort();
478 let mut sourcepoints = process_def.sourcepoints().clone();
479 sourcepoints.as_mut_slice().sort();
480 if sourcepoints != *sourcepoints_from_channels {
481 errors.push (DefineError::ProducerSourcepointMismatch);
482 break
483 }
484 }
485
486 for (pid, process_def) in self.process_def.iter() {
487 debug_assert_eq!(pid, process_def.id().clone().into());
488 let endpoints_from_channels = &mut endpoints_from_channels[pid];
489 endpoints_from_channels.as_mut_slice().sort();
490 let mut endpoints = process_def.endpoints().clone();
491 endpoints.as_mut_slice().sort();
492 if endpoints != *endpoints_from_channels {
493 errors.push (DefineError::ConsumerEndpointMismatch);
494 break
495 }
496 }
497
498 if !errors.is_empty() {
499 Err (errors)
500 } else {
501 Ok (())
502 }
503 }
504
505 fn session_dotfile (&self, hide_defaults : bool) -> String {
506 #[inline]
508 fn escape (s : String) -> String {
509 use marksman_escape::Escape;
510 String::from_utf8 (Escape::new (s.bytes()).collect()).unwrap()
511 }
512 let mut s = String::new();
513
514 s.push_str (
516 "digraph {\
517 \n overlap=scale\
518 \n rankdir=LR\
519 \n node [shape=hexagon, fontname=\"Sans Bold\"]\
520 \n edge [style=dashed, arrowhead=vee, fontname=\"Sans\"]\n");
521
522 debug_assert_eq!(self.name, CTX::name());
524 let context_str = self.name;
525 s.push_str (format!(
526 " subgraph cluster_{context_str} {{\n").as_str());
527 s.push_str (format!(
528 " label=<{context_str}>").as_str());
529 s.push_str ( "\
530 \n shape=record\
531 \n style=rounded\
532 \n fontname=\"Sans Bold Italic\"\n");
533
534 let process_field_names = CTX::process_field_names();
536 let process_field_types = CTX::process_field_types();
537 let process_field_defaults = CTX::process_field_defaults();
538 let process_result_types = CTX::process_result_types();
539 let process_result_defaults = CTX::process_result_defaults();
540 debug_assert_eq!(process_field_names.len(), process_field_types.len());
541 debug_assert_eq!(process_field_types.len(), process_field_defaults.len());
542 debug_assert_eq!(process_field_defaults.len(), process_result_types.len());
543 debug_assert_eq!(process_result_types.len(), process_result_defaults.len());
544 for (pid, process_def) in self.process_def.iter() {
545 let process_id = process_def.id();
546 s.push_str (format!(
547 " {process_id:?} [label=<<TABLE BORDER=\"0\"><TR><TD><B>{process_id:?}</B></TD></TR>").as_str());
548
549 let process_field_names = &process_field_names[pid];
550 let process_field_types = &process_field_types[pid];
551 let process_field_defaults = &process_field_defaults[pid];
552 debug_assert_eq!(process_field_names.len(), process_field_types.len());
553 debug_assert_eq!(process_field_types.len(), process_field_defaults.len());
554
555 let mut mono_font = false;
556 if !process_field_names.is_empty() {
557 s.push_str ("<TR><TD><FONT FACE=\"Mono\"><BR/>");
558 mono_font = true;
559
560 let mut field_string = String::new();
567 let separator = ",<BR ALIGN=\"LEFT\"/>";
568
569 let longest_fieldname = process_field_names.iter()
570 .fold (0, |longest, fieldname| std::cmp::max (longest, fieldname.len()));
571
572 let longest_typename = process_field_types.iter()
573 .fold (0, |longest, typename| std::cmp::max (longest, typename.len()));
574
575 for (i,f) in process_field_names.iter().enumerate() {
576 let spacer1 : String = std::iter::repeat_n (' ', longest_fieldname - f.len())
577 .collect();
578 let spacer2 : String = std::iter::repeat_n (
579 ' ', longest_typename - process_field_types[i].len()
580 ).collect();
581
582 if !hide_defaults {
583 field_string.push_str (escape (format!(
584 "{}{} : {}{} = {}",
585 f, spacer1, process_field_types[i], spacer2, process_field_defaults[i]
586 )).as_str());
587 } else {
588 field_string.push_str (escape (format!(
589 "{}{} : {}", f, spacer1, process_field_types[i]
590 )).as_str());
591 }
592 field_string.push_str (separator.to_string().as_str());
593 }
594
595 let len = field_string.len();
596 field_string.truncate (len - separator.len());
597 s.push_str (field_string.as_str());
598 } let result_type = process_result_types[pid];
601 if !result_type.is_empty() {
602 if !mono_font {
603 s.push_str ("<TR><TD><FONT FACE=\"Mono\"><BR/>");
604 mono_font = true;
605 } else {
606 s.push_str ("<BR ALIGN=\"LEFT\"/></FONT></TD></TR>\
607 <TR><TD><FONT FACE=\"Mono\"><BR/>");
608 }
609 let result_default = process_result_defaults[pid];
610 if !hide_defaults {
611 s.push_str (escape (format!(
612 "-> {result_type} = {result_default}"
613 )).as_str());
614 } else {
615 s.push_str (escape (format!("-> {result_type}")).as_str());
616 }
617 }
618
619 if mono_font {
629 s.push_str ("<BR ALIGN=\"LEFT\"/></FONT></TD></TR>");
630 }
631
632 s.push_str ("</TABLE>>]\n");
633 } let channel_local_types = CTX::channel_local_types();
637 for (cid, channel_def) in self.channel_def.iter() {
638 let channel_id = channel_def.id();
639 let producers = channel_def.producers();
640 let consumers = channel_def.consumers();
641 let kind = channel_def.kind();
642 let local_type = channel_local_types[cid];
643 let channel_string = escape (format!("{channel_id:?} <{local_type}>"));
644 match *kind {
645 channel::Kind::Simplex => {
646 debug_assert_eq!(producers.len(), 1);
647 debug_assert_eq!(consumers.len(), 1);
648 s.push_str (format!(
649 " {:?} -> {:?} [label=<<FONT FACE=\"Sans Italic\">{}</FONT>>]\n",
650 producers[0],
651 consumers[0],
652 channel_string).as_str());
653 }
654 channel::Kind::Source => {
655 debug_assert_eq!(producers.len(), 1);
656 s.push_str (format!(
658 " {channel_id:?} [label=<<B>+</B>>,\
659 \n shape=diamond, style=\"\",\
660 \n xlabel=<<FONT FACE=\"Sans Italic\">{channel_string}</FONT>>]\n").as_str());
661 s.push_str (format!(
663 " {:?} -> {:?} []\n", producers[0], channel_id
664 ).as_str());
665 for consumer in consumers.as_slice() {
666 s.push_str (format!(
667 " {channel_id:?} -> {consumer:?} []\n"
668 ).as_str());
669 }
670 }
671 channel::Kind::Sink => {
672 debug_assert_eq!(consumers.len(), 1);
673 s.push_str (format!(
675 " {channel_id:?} [label=<<B>+</B>>,\n \
676 shape=diamond, style=\"\",\n \
677 xlabel=<<FONT FACE=\"Sans Italic\">{channel_string}</FONT>>]\n").as_str());
678 s.push_str (format!(
680 " {:?} -> {:?} []\n", channel_id, consumers[0]
681 ).as_str());
682 for producer in producers.as_slice() {
683 s.push_str (format!(
684 " {producer:?} -> {channel_id:?} []\n"
685 ).as_str());
686 }
687 }
688 }
689 } s.push_str (
693 " }\n\
694 }");
695 s
696 } } impl <CTX : Context> From <Def <CTX>> for Session <CTX> {
700 fn from (def : Def <CTX>) -> Self {
701 Self::new (ExtendedState::new (
702 Some (def),
703 Some (vec_map::VecMap::new()),
704 Some (None)
705 ).unwrap())
706 }
707}
708
709pub fn report_sizes <CTX : Context> () {
714 println!("session report sizes...");
715 println!(" size of Session: {}", size_of::<Session <CTX>>());
716 println!(" size of session::Def: {}", size_of::<Def <CTX>>());
717 println!("...session report sizes");
718}
719
720#[cfg(any(feature = "test", test))]
725pub mod mock {
726 use crate::{def_session, process};
727 def_session! {
728 context Mycontext {
729 PROCESSES where
730 let process = self,
731 let message_in = message_in
732 [
733 process A () {
734 kind { process::Kind::isochronous_default() }
735 sourcepoints []
736 endpoints []
737 handle_message { process::ControlFlow::Break }
738 update { process::ControlFlow::Break }
739 }
740 process B () {
741 kind { process::Kind::isochronous_default() }
742 sourcepoints []
743 endpoints []
744 handle_message { process::ControlFlow::Break }
745 update { process::ControlFlow::Break }
746 }
747 process C () {
748 kind { process::Kind::isochronous_default() }
749 sourcepoints []
750 endpoints []
751 handle_message { process::ControlFlow::Break }
752 update { process::ControlFlow::Break }
753 }
754 process D () {
755 kind { process::Kind::isochronous_default() }
756 sourcepoints []
757 endpoints []
758 handle_message { process::ControlFlow::Break }
759 update { process::ControlFlow::Break }
760 }
761 ]
762 CHANNELS [
763 channel X <T> (Simplex) {
764 producers [A]
765 consumers [B]
766 }
767 channel Y <U> (Source) {
768 producers [A]
769 consumers [B]
770 }
771 channel Z <V> (Sink) {
772 producers [A]
773 consumers [B]
774 }
775 ]
776 MESSAGES [
777 message T {}
778 message U {}
779 message V {}
780 ]
781 }
782 }
783}