use {std, either, log, smallvec};
use crate::{channel, message, session, Message};
use std::convert::TryFrom;
use std::sync::mpsc;
use std::time;
use vec_map::VecMap;
pub mod inner;
pub mod presult;
pub use self::inner::Inner;
pub use self::presult::Presult;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Def <CTX : session::Context> {
id : CTX::PID,
kind : Kind,
sourcepoints : Vec <CTX::CID>,
endpoints : Vec <CTX::CID>
}
pub struct Handle <CTX : session::Context> {
pub result_rx : mpsc::Receiver <CTX::GPRES>,
pub continuation_tx : mpsc::Sender <session::Continuation <CTX>>,
pub join_or_continue : either::Either <
std::thread::JoinHandle <Option <()>>, Option <session::Continuation <CTX>>>
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Kind {
Asynchronous {
messages_per_update : u32
},
Isochronous {
tick_ms : u32,
ticks_per_update : u32
},
Mesochronous {
tick_ms : u32,
ticks_per_update : u32
},
Anisochronous
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ControlFlow {
Continue,
Break
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum KindError {
AsynchronousZeroMessagesPerUpdate,
IsochronousZeroTickMs,
IsochronousZeroTicksPerUpdate,
MesochronousZeroTickMs,
MesochronousZeroTicksPerUpdate
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DefineError {
DuplicateSourcepoint,
DuplicateEndpoint,
SourcepointEqEndpoint,
AsynchronousZeroEndpoints,
AsynchronousMultipleEndpoints
}
pub trait Process <CTX, RES> where
CTX : session::Context,
RES : Presult <CTX, Self>,
Self : TryFrom <CTX::GPROC> + Into <CTX::GPROC>
{
fn new (inner : Inner <CTX>) -> Self;
fn inner_ref (&self) -> &Inner <CTX>;
fn inner_mut (&mut self) -> &mut Inner <CTX>;
fn result_ref (&self) -> &RES;
fn result_mut (&mut self) -> &mut RES;
fn global_result (&mut self) -> CTX::GPRES;
fn extract_result (session_results : &mut VecMap <CTX::GPRES>)
-> Result <RES, String>;
fn handle_message (&mut self, message : CTX::GMSG) -> ControlFlow;
fn update (&mut self) -> ControlFlow;
fn initialize (&mut self) { }
fn terminate (&mut self) { }
#[inline]
fn id (&self) -> &CTX::PID where CTX : 'static {
self.def().id()
}
#[inline]
fn kind (&self) -> &Kind where CTX : 'static {
self.def().kind()
}
#[inline]
fn state_id (&self) -> inner::StateId {
self.inner_ref().state().id().clone()
}
#[inline]
fn def (&self) -> &Def <CTX> {
&self.inner_ref().extended_state().def
}
#[inline]
fn sourcepoints (&self) -> &VecMap <Box <dyn channel::Sourcepoint <CTX>>> {
&self.inner_ref().extended_state().sourcepoints
}
#[inline]
fn sourcepoints_mut (&mut self)
-> &mut VecMap <Box <dyn channel::Sourcepoint <CTX>>>
{
&mut self.inner_mut().extended_state_mut().sourcepoints
}
#[inline]
#[expect(mismatched_lifetime_syntaxes)]
fn endpoints (&self)
-> std::cell::Ref <Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
{
self.inner_ref().extended_state().endpoints.borrow()
}
#[inline]
#[expect(mismatched_lifetime_syntaxes)]
fn endpoints_mut (&mut self) -> std::cell::RefMut
<Option <VecMap <Box <dyn channel::Endpoint <CTX>>>>>
{
self.inner_ref().extended_state().endpoints.borrow_mut()
}
#[inline]
fn take_endpoints (&self) -> VecMap <Box <dyn channel::Endpoint <CTX>>> {
self.inner_ref().extended_state().endpoints.borrow_mut().take().unwrap()
}
#[inline]
fn put_endpoints (&self,
endpoints : VecMap <Box <dyn channel::Endpoint <CTX>>>
) {
*self.inner_ref().extended_state().endpoints.borrow_mut()
= Some (endpoints);
}
fn send <M : Message <CTX>> (&self, channel_id : CTX::CID, message : M)
-> Result <(), channel::SendError <CTX::GMSG>>
where CTX : 'static {
let message_name = message.name();
log::debug!(
process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
"process sending message");
let cid : usize = channel_id.clone().into();
self.sourcepoints()[cid].send (message.into()).inspect_err (|_|
log::warn!(
process:?=self.id(), channel:?=channel_id, message=message_name.as_str();
"process send error: receiver disconnected"))
}
fn send_to <M : Message <CTX>> (
&self, channel_id : CTX::CID, recipient : CTX::PID, message : M
) -> Result <(), channel::SendError <CTX::GMSG>>
where CTX : 'static
{
let message_name = message.name();
log::debug!(
process:?=self.id(),
channel:?=channel_id,
peer:?=recipient,
message=message_name.as_str();
"process sending message to peer");
let cid : usize = channel_id.clone().into();
self.sourcepoints()[cid].send_to (message.into(), recipient.clone()).inspect_err (
|_| log::warn!(
process:?=self.id(),
channel:?=channel_id,
peer:?=recipient,
message=message_name.as_str();
"process send to peer error: receiver disconnected"))
}
#[inline]
fn run (&mut self) where
Self : Sized + 'static,
CTX : 'static
{
use message::Global;
debug_assert_eq!(self.state_id(), inner::StateId::Ready);
self.initialize();
match *self.kind() {
Kind::Asynchronous {..} => self.run_asynchronous(),
Kind::Isochronous {..} => self.run_isochronous(),
Kind::Mesochronous {..} => self.run_mesochronous(),
Kind::Anisochronous => self.run_anisochronous()
}
debug_assert_eq!(self.state_id(), inner::StateId::Ended);
self.terminate();
self.sourcepoints_mut().clear();
{ let endpoints = self.take_endpoints();
let mut unhandled_count = 0;
for (cid, endpoint) in endpoints.iter() {
#[expect(clippy::cast_possible_truncation)]
let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
else { unreachable!() };
while let Ok (message) = endpoint.try_recv() {
log::warn!(
process:?=self.id(),
channel:?=channel_id,
message=format!("{:?}({})", message.id(), message.inner_name()).as_str();
"process unhandled message");
unhandled_count += 1;
}
}
if unhandled_count > 0 {
log::warn!(process:?=self.id(), unhandled_message_count=unhandled_count;
"process ended with unhandled messages");
}
}
debug_assert!(self.sourcepoints().is_empty());
debug_assert!(self.endpoints().is_none());
let gpresult = self.global_result();
let session_handle = &self.inner_ref().as_ref().session_handle;
session_handle.result_tx.send (gpresult).unwrap();
}
#[inline]
fn run_continue (mut self) -> Option <()> where
Self : Sized + 'static,
CTX : 'static
{
self.run();
let continuation : session::Continuation <CTX> = {
let session_handle = &self.inner_ref().as_ref().session_handle;
session_handle.continuation_rx.recv().unwrap()
};
continuation (self.into())
}
fn run_asynchronous (&mut self) where
Self : Sized,
CTX : 'static
{
use message::Global;
self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
let messages_per_update = {
match *self.kind() {
Kind::Asynchronous { messages_per_update } => messages_per_update,
_ => unreachable!(
"run asynchronous: process kind does not match run function")
}
};
let _t_start = time::Instant::now();
log::debug!(process:?=self.id(), kind="asynchronous", messages_per_update;
"process start");
debug_assert!(1 <= messages_per_update);
let mut _message_count = 0;
let mut update_count = 0;
let mut messages_since_update = 0;
let endpoints = self.take_endpoints();
{ let (cid, endpoint) = endpoints.iter().next().unwrap();
#[expect(clippy::cast_possible_truncation)]
let Ok (channel_id) = CTX::CID::try_from (cid as channel::IdReprType)
else { unreachable!() };
'_run_loop: while self.state_id() == inner::StateId::Running {
match endpoint.recv() {
Ok (message) => {
log::debug!(
process:?=self.id(),
channel:?=channel_id,
message=message.inner_name().as_str();
"process received message");
let handle_message_result = self.handle_message (message);
match handle_message_result {
ControlFlow::Continue => {}
ControlFlow::Break => {
if self.state_id() == inner::StateId::Running {
self.inner_mut().handle_event (inner::EventParams::End{}.into())
.unwrap();
}
}
}
_message_count += 1;
messages_since_update += 1;
}
Err (channel::RecvError) => {
log::info!(process:?=self.id(), channel:?=channel_id;
"process receive failed: sender disconnected");
if self.state_id() == inner::StateId::Running {
self.inner_mut().handle_event (inner::EventParams::End{}.into())
.unwrap();
}
}
}
if messages_per_update <= messages_since_update {
log::trace!(process:?=self.id(), update=update_count;
"process update");
let update_result = self.update();
match update_result {
ControlFlow::Continue => {}
ControlFlow::Break => {
if self.state_id() == inner::StateId::Running {
self.inner_mut().handle_event (inner::EventParams::End{}.into())
.unwrap();
}
}
}
update_count += 1;
messages_since_update = 0;
}
} } self.put_endpoints (endpoints);
}
fn run_isochronous (&mut self) where
Self : Sized,
CTX : 'static
{
self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
let t_start = time::Instant::now();
let (tick_ms, ticks_per_update) = {
match *self.kind() {
Kind::Isochronous { tick_ms, ticks_per_update }
=> (tick_ms, ticks_per_update),
_ => unreachable!(
"run synchronous: process kind does not match run function")
}
};
log::debug!(
process:?=self.id(), kind="isochronous", tick_ms, ticks_per_update;
"process start");
debug_assert!(1 <= tick_ms);
debug_assert!(1 <= ticks_per_update);
let tick_dur = time::Duration::from_millis (tick_ms as u64);
let mut t_last = t_start - tick_dur;
let mut t_next = t_start;
let mut ticks_since_update = 0;
let mut tick_count = 0;
let mut message_count = 0;
let mut update_count = 0;
let endpoints = self.take_endpoints();
let mut num_open_channels = endpoints.len();
let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
let mut v = Vec::with_capacity (num_open_channels);
v.resize (num_open_channels, true);
v
});
'_run_loop: while self.state_id() == inner::StateId::Running {
let t_now = time::Instant::now();
if t_next < t_now {
log::trace!(
process:?=self.id(),
tick=tick_count,
since_ns=t_now.duration_since (t_next).as_nanos();
"process tick");
t_last += tick_dur;
t_next += tick_dur;
poll_messages (self,
&endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
tick_count += 1;
ticks_since_update += 1;
debug_assert!(ticks_since_update <= ticks_per_update);
if ticks_since_update == ticks_per_update {
log::trace!(process:?=self.id(), update=update_count;
"process update");
let update_result = self.update();
match update_result {
ControlFlow::Continue => {}
ControlFlow::Break => {
if self.state_id() == inner::StateId::Running {
self.inner_mut().handle_event (inner::EventParams::End{}.into())
.unwrap();
}
}
}
update_count += 1;
ticks_since_update = 0;
}
} else {
log::warn!(
process:?=self.id(),
tick=tick_count,
until_ns=t_next.duration_since (t_now).as_nanos();
"process tick too early");
}
let t_after = time::Instant::now();
if t_after < t_next {
let t_until = t_next.duration_since (t_after);
std::thread::sleep (time::Duration::from_millis (
1 + t_until.as_secs()*1000 +
t_until.subsec_nanos() as u64/1_000_000))
} else {
log::warn!(
process:?=self.id(),
tick=tick_count,
after_ns=t_after.duration_since (t_next).as_nanos();
"process late tick");
}
} self.put_endpoints (endpoints);
}
fn run_mesochronous (&mut self) where
Self : Sized,
CTX : 'static
{
self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
let t_start = time::Instant::now();
let (tick_ms, ticks_per_update) = {
match *self.kind() {
Kind::Mesochronous { tick_ms, ticks_per_update }
=> (tick_ms, ticks_per_update),
_ => unreachable!(
"run synchronous: process kind does not match run function")
}
};
log::debug!(
process:?=self.id(), kind="mesochronous", tick_ms, ticks_per_update;
"process start");
debug_assert!(1 <= tick_ms);
debug_assert!(1 <= ticks_per_update);
let tick_dur = time::Duration::from_millis (tick_ms as u64);
let mut _t_last = t_start - tick_dur;
let mut t_next = t_start;
let mut ticks_since_update = 0;
let mut tick_count = 0;
let mut message_count = 0;
let mut update_count = 0;
let endpoints = self.take_endpoints();
let mut num_open_channels = endpoints.len();
let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
let mut v = Vec::with_capacity (num_open_channels);
v.resize (num_open_channels, true);
v
});
'_run_loop: while self.state_id() == inner::StateId::Running {
let t_now = time::Instant::now();
if t_next < t_now {
log::trace!(
process:?=self.id(),
tick=tick_count,
since_ns=t_now.duration_since (t_next).as_nanos();
"process tick");
_t_last = t_now;
t_next = t_now + tick_dur;
poll_messages (self,
&endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
tick_count += 1;
ticks_since_update += 1;
debug_assert!(ticks_since_update <= ticks_per_update);
if ticks_since_update == ticks_per_update {
log::trace!(process:?=self.id(), update=update_count;
"process update");
let update_result = self.update();
match update_result {
ControlFlow::Continue => {}
ControlFlow::Break => {
if self.state_id() == inner::StateId::Running {
self.inner_mut().handle_event (inner::EventParams::End{}.into())
.unwrap();
}
}
}
update_count += 1;
ticks_since_update = 0;
}
} else {
log::warn!(
process:?=self.id(),
tick=tick_count,
until_ns=t_next.duration_since (t_now).as_nanos();
"process tick too early");
}
let t_after = time::Instant::now();
if t_after < t_next {
let t_until = t_next.duration_since (t_after);
std::thread::sleep (time::Duration::from_millis (
1 + t_until.as_secs()*1000 +
t_until.subsec_nanos() as u64/1_000_000))
} else {
log::warn!(
process:?=self.id(),
tick=tick_count,
after_ns=t_after.duration_since (t_next).as_nanos();
"process late tick");
}
} self.put_endpoints (endpoints);
}
fn run_anisochronous (&mut self) where
Self : Sized,
CTX : 'static
{
self.inner_mut().handle_event (inner::EventParams::Run{}.into()).unwrap();
let _t_start = time::Instant::now();
debug_assert_eq!(Kind::Anisochronous, *self.kind());
log::debug!(process:?=self.id(), kind="anisochronous"; "process start");
let mut message_count = 0;
let mut update_count = 0;
let endpoints = self.take_endpoints();
let mut num_open_channels = endpoints.len();
let mut open_channels = smallvec::SmallVec::<[bool; 8]>::from_vec ({
let mut v = Vec::with_capacity (num_open_channels);
v.resize (num_open_channels, true);
v
});
'_run_loop: while self.state_id() == inner::StateId::Running {
poll_messages (self,
&endpoints, &mut open_channels, &mut num_open_channels, &mut message_count);
log::trace!(process:?=self.id(), update=update_count; "process update");
let update_result = self.update();
match update_result {
ControlFlow::Continue => {}
ControlFlow::Break => {
if self.state_id() == inner::StateId::Running {
self.inner_mut().handle_event (inner::EventParams::End{}.into())
.unwrap()
}
}
}
update_count += 1;
} self.put_endpoints (endpoints);
}
}
pub type IdReprType = u16;
pub trait Id <CTX> : Clone + Ord + Into <usize> + TryFrom <IdReprType> +
std::fmt::Debug + strum::IntoEnumIterator + strum::EnumCount
where
CTX : session::Context <PID=Self>
{
fn def (&self) -> Def <CTX>;
fn spawn (inner : Inner <CTX>) -> std::thread::JoinHandle <Option <()>>;
fn gproc (inner : Inner <CTX>) -> CTX::GPROC;
}
pub trait Global <CTX> where
Self : Sized,
CTX : session::Context <GPROC=Self>
{
fn id (&self) -> CTX::PID;
fn run (&mut self);
}
impl <CTX : session::Context> Def <CTX> {
pub fn define (
id : CTX::PID,
kind : Kind,
sourcepoints : Vec <CTX::CID>,
endpoints : Vec <CTX::CID>
) -> Result <Self, Vec <DefineError>> {
let def = Def {
id, kind, sourcepoints, endpoints
};
def.validate_role() ?;
Ok (def)
}
pub const fn id (&self) -> &CTX::PID {
&self.id
}
pub const fn kind (&self) -> &Kind {
&self.kind
}
pub const fn sourcepoints (&self) -> &Vec <CTX::CID> {
&self.sourcepoints
}
pub const fn endpoints (&self) -> &Vec <CTX::CID> {
&self.endpoints
}
fn validate_role (&self) -> Result <(), Vec <DefineError>> {
let mut errors = Vec::new();
let mut producers_dedup = self.sourcepoints.clone();
producers_dedup.as_mut_slice().sort();
producers_dedup.dedup_by (|x,y| x == y);
if producers_dedup.len() < self.sourcepoints.len() {
errors.push (DefineError::DuplicateSourcepoint);
}
let mut consumers_dedup = self.endpoints.clone();
consumers_dedup.as_mut_slice().sort();
consumers_dedup.dedup_by (|x,y| x == y);
if consumers_dedup.len() < self.endpoints.len() {
errors.push (DefineError::DuplicateEndpoint);
}
let mut producers_and_consumers = producers_dedup.clone();
producers_and_consumers.append (&mut consumers_dedup.clone());
producers_and_consumers.as_mut_slice().sort();
producers_and_consumers.dedup_by (|x,y| x == y);
if producers_and_consumers.len()
< producers_dedup.len() + consumers_dedup.len()
{
errors.push (DefineError::SourcepointEqEndpoint);
}
if let Err (mut errs)
= self.kind.validate_role::<CTX> (&self.sourcepoints, &self.endpoints)
{
errors.append (&mut errs);
}
if !errors.is_empty() {
Err (errors)
} else {
Ok (())
}
}
}
impl Kind {
pub fn asynchronous_default() -> Self {
const MESSAGES_PER_UPDATE : u32 = 1;
Kind::new_asynchronous (MESSAGES_PER_UPDATE).unwrap()
}
pub fn isochronous_default() -> Self {
const TICK_MS : u32 = 1000;
const TICKS_PER_UPDATE : u32 = 1;
Kind::new_isochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
}
pub fn mesochronous_default() -> Self {
const TICK_MS : u32 = 1000;
const TICKS_PER_UPDATE : u32 = 1;
Kind::new_mesochronous (TICK_MS, TICKS_PER_UPDATE).unwrap()
}
pub const fn anisochronous_default() -> Self {
Kind::new_anisochronous()
}
pub fn new_asynchronous (messages_per_update : u32)
-> Result <Self, Vec <KindError>>
{
let mut errors = Vec::new();
if messages_per_update == 0 {
errors.push (KindError::AsynchronousZeroMessagesPerUpdate)
}
if !errors.is_empty() {
Err (errors)
} else {
Ok (Kind::Asynchronous { messages_per_update })
}
}
pub fn new_isochronous (tick_ms : u32, ticks_per_update : u32)
-> Result <Self, Vec <KindError>>
{
let mut errors = Vec::new();
if tick_ms == 0 {
errors.push (KindError::IsochronousZeroTickMs)
}
if ticks_per_update == 0 {
errors.push (KindError::IsochronousZeroTicksPerUpdate)
}
if !errors.is_empty() {
Err (errors)
} else {
Ok (Kind::Isochronous { tick_ms, ticks_per_update })
}
}
pub fn new_mesochronous (tick_ms : u32, ticks_per_update : u32)
-> Result <Self, Vec <KindError>>
{
let mut errors = Vec::new();
if tick_ms == 0 {
errors.push (KindError::MesochronousZeroTickMs)
}
if ticks_per_update == 0 {
errors.push (KindError::MesochronousZeroTicksPerUpdate)
}
if !errors.is_empty() {
Err (errors)
} else {
Ok (Kind::Isochronous { tick_ms, ticks_per_update })
}
}
#[inline]
pub const fn new_anisochronous() -> Self {
Kind::Anisochronous
}
fn validate_role <CTX : session::Context> (&self,
_sourcepoints : &[CTX::CID],
endpoints : &[CTX::CID]
) -> Result <(), Vec <DefineError>> {
let mut errors = Vec::new();
match *self {
Kind::Asynchronous {..} => {
if endpoints.is_empty() {
errors.push (DefineError::AsynchronousZeroEndpoints)
} else if 1 < endpoints.len() {
errors.push (DefineError::AsynchronousMultipleEndpoints)
}
}
Kind::Isochronous {..} |
Kind::Mesochronous {..} |
Kind::Anisochronous => { }
}
if !errors.is_empty() {
Err (errors)
} else {
Ok (())
}
}
}
impl <M> From <Result <(), channel::SendError <M>>> for ControlFlow {
fn from (send_result : Result <(), channel::SendError <M>>) -> Self {
match send_result {
Ok (()) => ControlFlow::Continue,
Err (_) => ControlFlow::Break
}
}
}
pub fn report_sizes <CTX : session::Context + 'static> () {
println!("process report sizes...");
println!(" size of process::Def: {}", size_of::<Def <CTX>>());
Inner::<CTX>::report_sizes();
println!("...process report sizes");
}
#[inline]
fn poll_messages <CTX, P, RES> (
process : &mut P,
endpoints : &VecMap <Box <dyn channel::Endpoint <CTX>>>,
open_channels : &mut smallvec::SmallVec <[bool; 8]>,
num_open_channels : &mut usize,
message_count : &mut usize)
where
CTX : session::Context + 'static,
P : Process <CTX, RES> + Sized,
RES : Presult <CTX, P>
{
use message::Global;
#[inline]
fn channel_close (is_open : &mut bool, num_open : &mut usize) {
debug_assert!(*is_open);
debug_assert!(0 < *num_open);
*is_open = false;
*num_open -= 1;
}
'poll_outer: for (open_index, (cid, endpoint)) in endpoints.iter().enumerate() {
#[expect(clippy::cast_possible_truncation)]
let Ok (channel_id) = CTX::CID::try_from (cid as u16) else { unreachable!() };
let channel_open = &mut open_channels[open_index];
if !*channel_open {
continue 'poll_outer
}
'poll_inner: loop {
match endpoint.try_recv() {
Ok (message) => {
log::debug!(
process:?=process.id(),
channel:?=channel_id,
message=message.inner_name().as_str();
"process received message");
*message_count += 1;
match process.handle_message (message) {
ControlFlow::Continue => {}
ControlFlow::Break => {
channel_close (channel_open, num_open_channels);
if *num_open_channels == 0 {
process.inner_mut().handle_event (
inner::EventParams::End{}.into()
).unwrap();
}
break 'poll_inner
}
}
}
Err (channel::TryRecvError::Empty) => { break 'poll_inner }
Err (channel::TryRecvError::Disconnected) => {
log::info!(process:?=process.id(), channel:?=channel_id;
"process receive failed: sender disconnected");
channel_close (channel_open, num_open_channels);
if *num_open_channels == 0 {
process.inner_mut().handle_event (inner::EventParams::End{}.into())
.unwrap();
}
break 'poll_inner
}
} } } }