mod builder;
use crate::program_graph::{
Action as PgAction, Clock as PgClock, Location as PgLocation, Var as PgVar, *,
};
use crate::{Time, grammar::*};
pub use builder::*;
use rand::rngs::SmallRng;
use rand::seq::IteratorRandom;
use rand::{RngExt, SeedableRng};
use smallvec::SmallVec;
use std::collections::VecDeque;
use thiserror::Error;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct PgId(u16);
impl From<PgId> for u16 {
#[inline]
fn from(val: PgId) -> Self {
val.0
}
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct Channel(u16);
impl From<Channel> for u16 {
#[inline]
fn from(val: Channel) -> Self {
val.0
}
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct Location(PgId, PgLocation);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct Action(PgId, PgAction);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct Var(PgId, PgVar);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct Clock(PgId, PgClock);
type TimeConstraint = (Clock, Option<Time>, Option<Time>);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Message {
Send,
Receive,
ProbeEmptyQueue,
ProbeFullQueue,
}
#[derive(Debug, Clone, Copy, Error)]
pub enum CsError {
#[error("error from program graph {0:?}")]
ProgramGraph(PgId, #[source] PgError),
#[error("program graph {0:?} does not belong to the channel system")]
MissingPg(PgId),
#[error("channel {0:?} is at full capacity")]
OutOfCapacity(Channel),
#[error("the channel still has free space {0:?}")]
NotFull(Channel),
#[error("channel {0:?} is empty")]
Empty(Channel),
#[error("channel {0:?} is not empty")]
NotEmpty(Channel),
#[error("communication {0:?} has not been defined")]
NoCommunication(Action),
#[error("action {0:?} does not belong to program graph {1:?}")]
ActionNotInPg(Action, PgId),
#[error("variable {0:?} does not belong to program graph {1:?}")]
VarNotInPg(Var, PgId),
#[error("location {0:?} does not belong to program graph {1:?}")]
LocationNotInPg(Location, PgId),
#[error("clock {0:?} does not belong to program graph {1:?}")]
ClockNotInPg(Clock, PgId),
#[error("program graphs {0:?} and {1:?} do not match")]
DifferentPgs(PgId, PgId),
#[error("action {0:?} is a communication")]
ActionIsCommunication(Action),
#[error("channel {0:?} does not exists")]
MissingChannel(Channel),
#[error("cannot probe handshake {0:?}")]
ProbingHandshakeChannel(Channel),
#[error("cannot probe for fullness the infinite capacity {0:?}")]
ProbingInfiniteQueue(Channel),
#[error("type error")]
Type(#[source] TypeError),
}
#[derive(Debug, Clone, PartialEq)]
pub struct Event {
pub pg_id: PgId,
pub channel: Channel,
pub event_type: EventType,
}
#[derive(Debug, Clone, PartialEq)]
pub enum EventType {
Send(SmallVec<[Val; 2]>),
Receive(SmallVec<[Val; 2]>),
ProbeEmptyQueue,
ProbeFullQueue,
}
#[derive(Debug, Clone)]
pub struct ChannelSystem {
channels: Vec<(Vec<Type>, Option<usize>)>,
communications: Vec<Option<(Channel, Message)>>,
communications_pg_idxs: Vec<usize>,
program_graphs: Vec<ProgramGraph>,
}
impl ChannelSystem {
pub fn new_instance<'def>(&'def self) -> ChannelSystemRun<'def> {
ChannelSystemRun {
rng: rand::make_rng(),
time: 0,
program_graphs: Vec::from_iter(
self.program_graphs.iter().map(|pgdef| pgdef.new_instance()),
),
message_queue: Vec::from_iter(self.channels.iter().map(|(types, cap)| {
cap.map_or_else(VecDeque::new, |cap| {
VecDeque::with_capacity(types.len() * cap)
})
})),
def: self,
}
}
#[inline]
fn communication(&self, pg_id: PgId, pg_action: PgAction) -> Option<(Channel, Message)> {
if pg_action == EPSILON {
None
} else {
let start = self.communications_pg_idxs[pg_id.0 as usize];
self.communications[start + ActionIdx::from(pg_action) as usize]
}
}
#[inline]
pub fn channels(&self) -> &Vec<(Vec<Type>, Option<usize>)> {
&self.channels
}
}
#[derive(Debug, Clone)]
pub struct ChannelSystemRun<'def> {
rng: SmallRng,
time: Time,
message_queue: Vec<VecDeque<Val>>,
program_graphs: Vec<ProgramGraphRun<'def>>,
def: &'def ChannelSystem,
}
impl<'def> ChannelSystemRun<'def> {
#[inline]
pub fn time(&self) -> Time {
self.time
}
pub fn possible_transitions(
&self,
) -> impl Iterator<
Item = (
PgId,
Action,
impl Iterator<Item = impl Iterator<Item = Location> + '_> + '_,
),
> + '_ {
self.program_graphs
.iter()
.enumerate()
.flat_map(move |(id, pg)| {
let pg_id = PgId(id as u16);
pg.possible_transitions().filter_map(move |(action, post)| {
let action = Action(pg_id, action);
self.check_communication(pg_id, action).ok().map(move |()| {
let post = post.map(move |locs| locs.map(move |loc| Location(pg_id, loc)));
(pg_id, action, post)
})
})
})
}
pub(crate) fn montecarlo_execution(&mut self) -> Option<Event> {
let pgs = self.program_graphs.len();
let mut pg_vec = Vec::from_iter((0..pgs as u16).map(PgId));
let mut rand1 = SmallRng::from_rng(&mut self.rng);
let mut rand2 = SmallRng::from_rng(&mut self.rng);
while !pg_vec.is_empty() {
let pg_id = pg_vec.swap_remove(self.rng.random_range(0..pg_vec.len()));
if self.program_graphs[pg_id.0 as usize].current_states().len() == 1 {
while let Some((action, post_state)) = self.program_graphs[pg_id.0 as usize]
.nosync_possible_transitions()
.filter(|&(action, _)| {
self.def
.communication(pg_id, action)
.is_none_or(|(channel, message)| self.check_message(channel, message))
})
.filter_map(|(action, post_states)| {
post_states
.choose(&mut rand1)
.map(|loc| (action, Location(pg_id, loc)))
})
.choose(&mut rand2)
{
let event = self
.transition(pg_id, Action(pg_id, action), &[post_state])
.expect("successful transition");
if event.is_some() {
return event;
}
}
} else {
while let Some((action, post_states)) = self.program_graphs[pg_id.0 as usize]
.possible_transitions()
.filter(|&(action, _)| {
self.def
.communication(pg_id, action)
.is_none_or(|(channel, message)| self.check_message(channel, message))
})
.filter_map(|(action, post_states)| {
post_states
.map(|locs| locs.choose(&mut rand1).map(|loc| Location(pg_id, loc)))
.collect::<Option<SmallVec<[Location; 4]>>>()
.map(|locs| (action, locs))
})
.choose(&mut rand2)
{
let event = self
.transition(pg_id, Action(pg_id, action), post_states.as_slice())
.expect("successful transition");
if event.is_some() {
return event;
}
}
}
}
None
}
#[inline]
fn check_message(&self, channel: Channel, message: Message) -> bool {
let channel_idx = channel.0 as usize;
let (_, capacity) = self.def.channels[channel_idx];
let len = self.message_queue[channel_idx].len();
debug_assert!(capacity.is_none_or(|cap| len <= cap));
match message {
Message::Send => capacity.is_none_or(|cap| len < cap),
Message::Receive => len > 0,
Message::ProbeFullQueue => capacity.is_some_and(|cap| len == cap),
Message::ProbeEmptyQueue => len == 0,
}
}
fn check_communication(&self, pg_id: PgId, action: Action) -> Result<(), CsError> {
if pg_id.0 >= self.program_graphs.len() as u16 {
Err(CsError::MissingPg(pg_id))
} else if action.0 != pg_id {
Err(CsError::ActionNotInPg(action, pg_id))
} else if let Some((channel, message)) = self.def.communication(pg_id, action.1) {
let (_, capacity) = self.def.channels[channel.0 as usize];
let len = self.message_queue[channel.0 as usize].len();
assert!(capacity.is_none_or(|cap| len <= cap));
match message {
Message::Send if capacity.is_some_and(|cap| len >= cap) => {
Err(CsError::OutOfCapacity(channel))
}
Message::Receive if len == 0 => Err(CsError::Empty(channel)),
Message::ProbeEmptyQueue | Message::ProbeFullQueue
if matches!(capacity, Some(0)) =>
{
Err(CsError::ProbingHandshakeChannel(channel))
}
Message::ProbeFullQueue if capacity.is_none() => {
Err(CsError::ProbingInfiniteQueue(channel))
}
Message::ProbeEmptyQueue if len > 0 => Err(CsError::NotEmpty(channel)),
Message::ProbeFullQueue if capacity.is_some_and(|cap| len < cap) => {
Err(CsError::NotFull(channel))
}
_ => Ok(()),
}
} else {
Ok(())
}
}
pub fn transition(
&mut self,
pg_id: PgId,
action: Action,
post: &[Location],
) -> Result<Option<Event>, CsError> {
if pg_id.0 >= self.program_graphs.len() as u16 {
return Err(CsError::MissingPg(pg_id));
} else if action.0 != pg_id {
return Err(CsError::ActionNotInPg(action, pg_id));
} else if let Some(post) = post.iter().find(|l| l.0 != pg_id) {
return Err(CsError::LocationNotInPg(*post, pg_id));
}
if let Some((channel, message)) = self.def.communication(pg_id, action.1) {
let (_, capacity) = self.def.channels[channel.0 as usize];
let event_type = match message {
Message::Send
if capacity
.is_some_and(|cap| self.message_queue[channel.0 as usize].len() >= cap) =>
{
return Err(CsError::OutOfCapacity(channel));
}
Message::Send => {
let vals = self.program_graphs[pg_id.0 as usize]
.send(
action.1,
post.iter()
.map(|loc| loc.1)
.collect::<SmallVec<[PgLocation; 8]>>()
.as_slice(),
&mut self.rng,
)
.map_err(|err| CsError::ProgramGraph(pg_id, err))?;
self.message_queue[channel.0 as usize].extend(vals.iter().copied());
EventType::Send(vals)
}
Message::Receive if self.message_queue[channel.0 as usize].is_empty() => {
return Err(CsError::Empty(channel));
}
Message::Receive => {
let (types, _) = &self.def.channels[channel.0 as usize];
let vals = self.message_queue[channel.0 as usize]
.drain(..types.len())
.collect::<SmallVec<[Val; 2]>>();
self.program_graphs[pg_id.0 as usize]
.receive(
action.1,
post.iter()
.map(|loc| loc.1)
.collect::<SmallVec<[PgLocation; 8]>>()
.as_slice(),
vals.as_slice(),
)
.expect("communication has been verified before");
EventType::Receive(vals)
}
Message::ProbeEmptyQueue | Message::ProbeFullQueue
if matches!(capacity, Some(0)) =>
{
return Err(CsError::ProbingHandshakeChannel(channel));
}
Message::ProbeEmptyQueue if !self.message_queue[channel.0 as usize].is_empty() => {
return Err(CsError::NotEmpty(channel));
}
Message::ProbeEmptyQueue => {
let _ = self.program_graphs[pg_id.0 as usize]
.send(
action.1,
post.iter()
.map(|loc| loc.1)
.collect::<SmallVec<[PgLocation; 8]>>()
.as_slice(),
&mut self.rng,
)
.map_err(|err| CsError::ProgramGraph(pg_id, err))?;
EventType::ProbeEmptyQueue
}
Message::ProbeFullQueue
if capacity
.is_some_and(|cap| self.message_queue[channel.0 as usize].len() < cap) =>
{
return Err(CsError::NotFull(channel));
}
Message::ProbeFullQueue if capacity.is_none() => {
return Err(CsError::ProbingInfiniteQueue(channel));
}
Message::ProbeFullQueue => {
let _ = self.program_graphs[pg_id.0 as usize]
.send(
action.1,
post.iter()
.map(|loc| loc.1)
.collect::<SmallVec<[PgLocation; 8]>>()
.as_slice(),
&mut self.rng,
)
.map_err(|err| CsError::ProgramGraph(pg_id, err))?;
EventType::ProbeFullQueue
}
};
Ok(Some(Event {
pg_id,
channel,
event_type,
}))
} else {
self.program_graphs[pg_id.0 as usize]
.transition(
action.1,
post.iter()
.map(|loc| loc.1)
.collect::<SmallVec<[PgLocation; 8]>>()
.as_slice(),
&mut self.rng,
)
.map_err(|err| CsError::ProgramGraph(pg_id, err))
.map(|()| None)
}
}
pub fn wait(&mut self, delta: Time) -> Result<(), CsError> {
if let Some(pg) = self
.program_graphs
.iter()
.position(|pg| !pg.can_wait(delta))
{
Err(CsError::ProgramGraph(PgId(pg as u16), PgError::Invariant))
} else {
self.program_graphs.iter_mut().for_each(|pg| {
pg.wait(delta).expect("wait");
});
self.time += delta;
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder() {
let _cs: ChannelSystemBuilder = ChannelSystemBuilder::new();
}
#[test]
fn new_pg() {
let mut cs = ChannelSystemBuilder::new();
let _ = cs.new_program_graph();
}
#[test]
fn new_action() -> Result<(), CsError> {
let mut cs = ChannelSystemBuilder::new();
let pg = cs.new_program_graph();
let _action = cs.new_action(pg)?;
Ok(())
}
#[test]
fn new_var() -> Result<(), CsError> {
let mut cs = ChannelSystemBuilder::new();
let pg = cs.new_program_graph();
let _var1 = cs.new_var(pg, Val::from(false))?;
let _var2 = cs.new_var(pg, Val::from(0i64))?;
Ok(())
}
#[test]
fn add_effect() -> Result<(), CsError> {
let mut cs = ChannelSystemBuilder::new();
let pg = cs.new_program_graph();
let action = cs.new_action(pg)?;
let var1 = cs.new_var(pg, Val::from(false))?;
let var2 = cs.new_var(pg, Val::from(0i64))?;
let effect_1 = CsExpression::from(2i64);
cs.add_effect(pg, action, var1, effect_1.clone())
.expect_err("type mismatch");
let effect_2 = CsExpression::from(true);
cs.add_effect(pg, action, var1, effect_2.clone())?;
cs.add_effect(pg, action, var2, effect_2)
.expect_err("type mismatch");
cs.add_effect(pg, action, var2, effect_1)?;
Ok(())
}
#[test]
fn new_location() -> Result<(), CsError> {
let mut cs = ChannelSystemBuilder::new();
let pg = cs.new_program_graph();
let initial = cs.new_initial_location(pg)?;
let location = cs.new_location(pg)?;
assert_ne!(initial, location);
Ok(())
}
#[test]
fn add_transition() -> Result<(), CsError> {
let mut cs = ChannelSystemBuilder::new();
let pg = cs.new_program_graph();
let initial = cs.new_initial_location(pg)?;
let action = cs.new_action(pg)?;
let var1 = cs.new_var(pg, Val::from(false))?;
let var2 = cs.new_var(pg, Val::from(0i64))?;
let effect_1 = CsExpression::from(0i64);
let effect_2 = CsExpression::from(true);
cs.add_effect(pg, action, var1, effect_2)?;
cs.add_effect(pg, action, var2, effect_1)?;
let post = cs.new_location(pg)?;
cs.add_transition(pg, initial, action, post, None)?;
Ok(())
}
#[test]
fn add_communication() -> Result<(), CsError> {
let mut cs = ChannelSystemBuilder::new();
let ch = cs.new_channel(vec![Type::Boolean], Some(1));
let pg1 = cs.new_program_graph();
let initial1 = cs.new_initial_location(pg1)?;
let post1 = cs.new_location(pg1)?;
let effect = CsExpression::from(true);
let send = cs.new_send(pg1, ch, vec![effect.clone()])?;
let _ = cs.new_send(pg1, ch, vec![effect])?;
cs.add_transition(pg1, initial1, send, post1, None)?;
let var1 = cs.new_var(pg1, Val::from(0i64))?;
let effect = CsExpression::from(0i64);
cs.add_effect(pg1, send, var1, effect)
.expect_err("send is a message so it cannot have effects");
let pg2 = cs.new_program_graph();
let initial2 = cs.new_initial_location(pg2)?;
let post2 = cs.new_location(pg2)?;
let var2 = cs.new_var(pg2, Val::from(false))?;
let receive = cs.new_receive(pg2, ch, vec![var2])?;
let _ = cs.new_receive(pg2, ch, vec![var2])?;
let _ = cs.new_receive(pg2, ch, vec![var2])?;
cs.add_transition(pg2, initial2, receive, post2, None)?;
let cs_def = cs.build();
let mut cs = cs_def.new_instance();
assert_eq!(cs.def.communications_pg_idxs, vec![0, 2, 5]);
cs.transition(pg1, send, &[post1])?;
cs.transition(pg2, receive, &[post2])?;
Ok(())
}
}