#![allow(clippy::needless_range_loop)]
#![allow(dead_code)]
use std::collections::{HashMap, HashSet, VecDeque};
#[derive(Debug, Clone)]
pub struct Place {
pub name: String,
pub tokens: usize,
}
impl Place {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
tokens: 0,
}
}
pub fn with_tokens(name: impl Into<String>, tokens: usize) -> Self {
Self {
name: name.into(),
tokens,
}
}
}
#[derive(Clone)]
pub struct Transition {
pub name: String,
pub guard: Option<fn(&[usize]) -> bool>,
}
impl std::fmt::Debug for Transition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Transition")
.field("name", &self.name)
.field("guard", &self.guard.map(|_| "<fn>"))
.finish()
}
}
impl Transition {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
guard: None,
}
}
pub fn with_guard(name: impl Into<String>, guard: fn(&[usize]) -> bool) -> Self {
Self {
name: name.into(),
guard: Some(guard),
}
}
}
pub type Weight = usize;
#[derive(Debug, Clone)]
pub struct PetriNet {
pub places: Vec<Place>,
pub transitions: Vec<Transition>,
pub arcs_in: Vec<(usize, usize, Weight)>,
pub arcs_out: Vec<(usize, usize, Weight)>,
}
impl PetriNet {
pub fn new() -> Self {
Self {
places: Vec::new(),
transitions: Vec::new(),
arcs_in: Vec::new(),
arcs_out: Vec::new(),
}
}
pub fn add_place(&mut self, place: Place) -> usize {
let idx = self.places.len();
self.places.push(place);
idx
}
pub fn add_transition(&mut self, transition: Transition) -> usize {
let idx = self.transitions.len();
self.transitions.push(transition);
idx
}
pub fn add_arc_in(&mut self, t: usize, p: usize, weight: Weight) {
self.arcs_in.push((t, p, weight));
}
pub fn add_arc_out(&mut self, t: usize, p: usize, weight: Weight) {
self.arcs_out.push((t, p, weight));
}
pub fn marking(&self) -> Vec<usize> {
self.places.iter().map(|p| p.tokens).collect()
}
pub fn set_marking(&mut self, marking: &[usize]) {
for (p, &tok) in self.places.iter_mut().zip(marking.iter()) {
p.tokens = tok;
}
}
pub fn enabled_transitions(&self) -> Vec<usize> {
let mut enabled = Vec::new();
for (t_idx, transition) in self.transitions.iter().enumerate() {
if self.is_enabled(t_idx, transition) {
enabled.push(t_idx);
}
}
enabled
}
fn is_enabled(&self, t_idx: usize, transition: &Transition) -> bool {
let mut input_tokens: Vec<usize> = Vec::new();
for &(t, p, w) in &self.arcs_in {
if t == t_idx {
if self.places[p].tokens < w {
return false;
}
input_tokens.push(self.places[p].tokens);
}
}
if let Some(guard) = transition.guard
&& !guard(&input_tokens)
{
return false;
}
true
}
pub fn fire(&mut self, t_idx: usize) -> Result<(), String> {
if t_idx >= self.transitions.len() {
return Err(format!("transition index {} out of range", t_idx));
}
let transition = self.transitions[t_idx].clone();
if !self.is_enabled(t_idx, &transition) {
return Err(format!(
"transition {} is not enabled",
self.transitions[t_idx].name
));
}
for &(t, p, w) in &self.arcs_in {
if t == t_idx {
self.places[p].tokens -= w;
}
}
for &(t, p, w) in &self.arcs_out {
if t == t_idx {
self.places[p].tokens += w;
}
}
Ok(())
}
pub fn is_deadlocked(&self) -> bool {
self.enabled_transitions().is_empty()
}
pub fn marking_sum(&self) -> usize {
self.places.iter().map(|p| p.tokens).sum()
}
pub fn reachability_bfs(&self) -> HashSet<Vec<usize>> {
let mut visited: HashSet<Vec<usize>> = HashSet::new();
let mut queue: VecDeque<Vec<usize>> = VecDeque::new();
let initial = self.marking();
queue.push_back(initial.clone());
visited.insert(initial);
let mut sim = self.clone();
while let Some(marking) = queue.pop_front() {
sim.set_marking(&marking);
for t_idx in sim.enabled_transitions() {
let mut next_sim = sim.clone();
let _ = next_sim.fire(t_idx);
let next_marking = next_sim.marking();
if !visited.contains(&next_marking) {
visited.insert(next_marking.clone());
queue.push_back(next_marking);
}
}
}
visited
}
}
impl Default for PetriNet {
fn default() -> Self {
Self::new()
}
}
pub const OMEGA: usize = usize::MAX;
#[derive(Debug, Clone)]
pub struct CoverabilityNode {
pub marking: Vec<usize>,
pub parent: Option<usize>,
pub fired_transition: Option<usize>,
pub children: Vec<usize>,
}
#[derive(Debug)]
pub struct CoverabilityTree {
pub nodes: Vec<CoverabilityNode>,
}
impl CoverabilityTree {
pub fn build(net: &PetriNet) -> Self {
let mut tree = CoverabilityTree { nodes: Vec::new() };
let root_marking = net.marking();
let root = CoverabilityNode {
marking: root_marking,
parent: None,
fired_transition: None,
children: Vec::new(),
};
tree.nodes.push(root);
let mut frontier: VecDeque<usize> = VecDeque::new();
frontier.push_back(0);
let mut sim = net.clone();
while let Some(node_idx) = frontier.pop_front() {
let current_marking = tree.nodes[node_idx].marking.clone();
sim.set_marking(¤t_marking);
for t_idx in sim.enabled_transitions() {
let mut next_sim = sim.clone();
let tmp_marking: Vec<usize> = current_marking
.iter()
.map(|&x| if x == OMEGA { 1_000_000 } else { x })
.collect();
next_sim.set_marking(&tmp_marking);
if next_sim.fire(t_idx).is_err() {
continue;
}
let mut new_marking = next_sim.marking();
let mut anc_idx = Some(node_idx);
while let Some(ai) = anc_idx {
let anc_marking = &tree.nodes[ai].marking;
if anc_marking
.iter()
.zip(new_marking.iter())
.all(|(&a, &n)| a == OMEGA || a <= n)
&& anc_marking
.iter()
.zip(new_marking.iter())
.any(|(&a, &n)| a != OMEGA && a < n)
{
for (m, a) in new_marking.iter_mut().zip(anc_marking.iter()) {
if *a != OMEGA && *a < *m {
*m = OMEGA;
}
}
}
anc_idx = tree.nodes[ai].parent;
}
let already_visited = tree.nodes.iter().any(|n| n.marking == new_marking);
let child_idx = tree.nodes.len();
tree.nodes.push(CoverabilityNode {
marking: new_marking,
parent: Some(node_idx),
fired_transition: Some(t_idx),
children: Vec::new(),
});
tree.nodes[node_idx].children.push(child_idx);
if !already_visited {
frontier.push_back(child_idx);
}
}
}
tree
}
pub fn is_bounded(&self) -> bool {
self.nodes
.iter()
.all(|n| n.marking.iter().all(|&t| t != OMEGA))
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
}
#[derive(Debug, Clone)]
pub struct StochasticPetriNet {
pub net: PetriNet,
pub rates: Vec<f64>,
}
impl StochasticPetriNet {
pub fn new(net: PetriNet, rates: Vec<f64>) -> Self {
assert_eq!(
net.transitions.len(),
rates.len(),
"one rate per transition required"
);
Self { net, rates }
}
pub fn simulate_until(&mut self, t_end: f64) -> Vec<(f64, usize)> {
let mut events = Vec::new();
let mut t = 0.0_f64;
use rand::RngExt as _;
let mut rng = rand::rng();
loop {
let enabled = self.net.enabled_transitions();
if enabled.is_empty() {
break;
}
let total_rate: f64 = enabled.iter().map(|&i| self.rates[i]).sum();
if total_rate == 0.0 {
break;
}
let u: f64 = rng.random_range(0.0_f64..1.0_f64);
let dt = -u.ln() / total_rate;
t += dt;
if t > t_end {
break;
}
let selector: f64 = rng.random_range(0.0_f64..total_rate);
let mut cumulative = 0.0;
let mut chosen = enabled[0];
for &ti in &enabled {
cumulative += self.rates[ti];
if selector < cumulative {
chosen = ti;
break;
}
}
let _ = self.net.fire(chosen);
events.push((t, chosen));
}
events
}
pub fn steady_state_estimate(&self, t_end: f64, n_samples: usize) -> Vec<f64> {
let n_places = self.net.places.len();
let mut totals = vec![0.0_f64; n_places];
for _ in 0..n_samples {
let mut sim = self.clone();
let events = sim.simulate_until(t_end);
let _ = events; let marking = sim.net.marking();
for (i, &tok) in marking.iter().enumerate() {
totals[i] += tok as f64;
}
}
totals.iter().map(|&s| s / n_samples as f64).collect()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ColoredToken {
pub color: Vec<f64>,
}
impl ColoredToken {
pub fn new(color: Vec<f64>) -> Self {
Self { color }
}
}
#[derive(Debug, Clone)]
pub struct ColoredPlace {
pub name: String,
pub tokens: Vec<ColoredToken>,
}
impl ColoredPlace {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
tokens: Vec::new(),
}
}
pub fn add_token(&mut self, token: ColoredToken) {
self.tokens.push(token);
}
pub fn take_token(&mut self) -> Option<ColoredToken> {
if self.tokens.is_empty() {
None
} else {
Some(self.tokens.remove(0))
}
}
pub fn token_count(&self) -> usize {
self.tokens.len()
}
}
#[derive(Debug, Clone)]
pub struct ColoredTransition {
pub name: String,
pub color_guard: Option<fn(&ColoredToken) -> bool>,
}
impl ColoredTransition {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
color_guard: None,
}
}
pub fn with_guard(name: impl Into<String>, guard: fn(&ColoredToken) -> bool) -> Self {
Self {
name: name.into(),
color_guard: Some(guard),
}
}
}
#[derive(Debug, Clone)]
pub struct ColoredPetriNet {
pub places: Vec<ColoredPlace>,
pub transitions: Vec<ColoredTransition>,
pub arcs_in: Vec<(usize, usize)>,
pub arcs_out: Vec<(usize, usize)>,
}
impl ColoredPetriNet {
pub fn new() -> Self {
Self {
places: Vec::new(),
transitions: Vec::new(),
arcs_in: Vec::new(),
arcs_out: Vec::new(),
}
}
pub fn add_place(&mut self, place: ColoredPlace) -> usize {
let idx = self.places.len();
self.places.push(place);
idx
}
pub fn add_transition(&mut self, transition: ColoredTransition) -> usize {
let idx = self.transitions.len();
self.transitions.push(transition);
idx
}
pub fn add_arc_in(&mut self, t: usize, p: usize) {
self.arcs_in.push((t, p));
}
pub fn add_arc_out(&mut self, t: usize, p: usize) {
self.arcs_out.push((t, p));
}
pub fn enabled_transitions(&self) -> Vec<usize> {
let mut enabled = Vec::new();
'outer: for (t_idx, trans) in self.transitions.iter().enumerate() {
for &(t, p) in &self.arcs_in {
if t == t_idx {
if self.places[p].tokens.is_empty() {
continue 'outer;
}
if let Some(guard) = trans.color_guard
&& !guard(&self.places[p].tokens[0])
{
continue 'outer;
}
}
}
enabled.push(t_idx);
}
enabled
}
pub fn fire(&mut self, t_idx: usize) -> Result<(), String> {
if t_idx >= self.transitions.len() {
return Err(format!("transition index {} out of range", t_idx));
}
let mut consumed: Vec<(usize, ColoredToken)> = Vec::new();
for &(t, p) in &self.arcs_in {
if t == t_idx {
let tok = self.places[p]
.take_token()
.ok_or_else(|| format!("place {} has no tokens", p))?;
consumed.push((p, tok));
}
}
let token_to_pass = consumed
.first()
.map(|(_, tok)| tok.clone())
.unwrap_or_else(|| ColoredToken::new(vec![0.0]));
for &(t, p) in &self.arcs_out {
if t == t_idx {
self.places[p].add_token(token_to_pass.clone());
}
}
Ok(())
}
pub fn total_tokens(&self) -> usize {
self.places.iter().map(|p| p.token_count()).sum()
}
}
impl Default for ColoredPetriNet {
fn default() -> Self {
Self::new()
}
}
fn xml_attr_value<'a>(tag: &'a str, attr: &str) -> Option<&'a str> {
let needle = format!("{}=\"", attr);
tag.find(needle.as_str()).and_then(|start| {
let value_start = start + needle.len();
tag[value_start..]
.find('"')
.map(|end| &tag[value_start..value_start + end])
})
}
fn xml_text_content(src: &str) -> Option<&str> {
let open = src.find("<text>")?;
let content_start = open + "<text>".len();
let close = src[content_start..].find("</text>")?;
Some(src[content_start..content_start + close].trim())
}
pub fn parse_pnml(input: &str) -> PetriNet {
let mut net = PetriNet::new();
let mut place_id_map: HashMap<String, usize> = HashMap::new();
let mut trans_id_map: HashMap<String, usize> = HashMap::new();
#[derive(Clone)]
enum Pending {
Place {
id: String,
name: String,
tokens: usize,
},
Trans {
id: String,
name: String,
},
Arc {
source: String,
target: String,
weight: usize,
},
}
#[derive(Clone, Copy, PartialEq)]
enum Context {
None,
InitMarking,
Inscription,
}
let mut stack: Vec<Pending> = Vec::new();
let mut context = Context::None;
let mut pos = 0;
while pos < input.len() {
let Some(rel_open) = input[pos..].find('<') else {
break;
};
let tag_open = pos + rel_open;
if input[tag_open..].starts_with("<!--") {
pos = input[tag_open..]
.find("-->")
.map(|e| tag_open + e + 3)
.unwrap_or(input.len());
continue;
}
if input[tag_open..].starts_with("<?") {
pos = input[tag_open..]
.find("?>")
.map(|e| tag_open + e + 2)
.unwrap_or(input.len());
continue;
}
let Some(rel_close) = input[tag_open..].find('>') else {
break;
};
let tag_close = tag_open + rel_close + 1;
let raw_tag = &input[tag_open..tag_close];
pos = tag_close;
let is_closing = raw_tag.starts_with("</");
let is_self_closing = !is_closing
&& (raw_tag.ends_with("/>") || raw_tag.trim_end_matches('>').trim().ends_with('/'));
if is_closing {
let rest = &raw_tag[2..];
let name_len = rest
.find(|c: char| c == '>' || c.is_whitespace())
.unwrap_or(rest.len());
let closing = rest[..name_len].trim_end_matches('>');
match closing {
"initialMarking" => {
context = Context::None;
}
"inscription" => {
context = Context::None;
}
"place" => {
if let Some(pos_idx) = stack
.iter()
.rposition(|p| matches!(p, Pending::Place { .. }))
{
let pending = stack.remove(pos_idx);
if let Pending::Place { id, name, tokens } = pending {
let display = if name.is_empty() { &id } else { &name };
let idx = net.add_place(Place::with_tokens(display, tokens));
place_id_map.insert(id, idx);
}
}
}
"transition" => {
if let Some(pos_idx) = stack
.iter()
.rposition(|p| matches!(p, Pending::Trans { .. }))
{
let pending = stack.remove(pos_idx);
if let Pending::Trans { id, name } = pending {
let display = if name.is_empty() { &id } else { &name };
let idx = net.add_transition(Transition::new(display));
trans_id_map.insert(id, idx);
}
}
}
"arc" => {
if let Some(pos_idx) =
stack.iter().rposition(|p| matches!(p, Pending::Arc { .. }))
{
let pending = stack.remove(pos_idx);
if let Pending::Arc {
source,
target,
weight,
} = pending
{
add_pnml_arc(
&mut net,
&source,
&target,
weight,
&place_id_map,
&trans_id_map,
);
}
}
}
_ => {}
}
continue;
}
let interior = raw_tag
.trim_start_matches('<')
.trim_end_matches('>')
.trim_end_matches('/')
.trim();
let name_len = interior
.find(|c: char| c.is_whitespace())
.unwrap_or(interior.len());
let elem_name = &interior[..name_len];
match elem_name {
"place" => {
let id = xml_attr_value(raw_tag, "id").unwrap_or("").to_owned();
let name = xml_attr_value(raw_tag, "name").unwrap_or("").to_owned();
if is_self_closing {
let display = if name.is_empty() { &id } else { &name };
let idx = net.add_place(Place::new(display));
place_id_map.insert(id, idx);
} else {
stack.push(Pending::Place {
id,
name,
tokens: 0,
});
}
}
"transition" => {
let id = xml_attr_value(raw_tag, "id").unwrap_or("").to_owned();
let name = xml_attr_value(raw_tag, "name").unwrap_or("").to_owned();
if is_self_closing {
let display = if name.is_empty() { &id } else { &name };
let idx = net.add_transition(Transition::new(display));
trans_id_map.insert(id, idx);
} else {
stack.push(Pending::Trans { id, name });
}
}
"arc" => {
let source = xml_attr_value(raw_tag, "source").unwrap_or("").to_owned();
let target = xml_attr_value(raw_tag, "target").unwrap_or("").to_owned();
let weight: usize = xml_attr_value(raw_tag, "weight")
.and_then(|w| w.parse().ok())
.unwrap_or(1);
if is_self_closing {
add_pnml_arc(
&mut net,
&source,
&target,
weight,
&place_id_map,
&trans_id_map,
);
} else {
stack.push(Pending::Arc {
source,
target,
weight,
});
}
}
"initialMarking" if !is_self_closing => {
context = Context::InitMarking;
}
"inscription" if !is_self_closing => {
context = Context::Inscription;
}
"text" if !is_self_closing => {
if let Some(end_rel) = input[pos..].find("</text>") {
let text_val = input[pos..pos + end_rel].trim();
pos += end_rel + "</text>".len();
match context {
Context::InitMarking => {
if let (Ok(tok), Some(Pending::Place { tokens, .. })) = (
text_val.parse::<usize>(),
stack
.iter_mut()
.rfind(|p| matches!(p, Pending::Place { .. })),
) {
*tokens = tok;
}
}
Context::Inscription => {
if let (Ok(w), Some(Pending::Arc { weight, .. })) = (
text_val.parse::<usize>(),
stack.iter_mut().rfind(|p| matches!(p, Pending::Arc { .. })),
) {
*weight = w;
}
}
Context::None => {}
}
}
}
_ => {} }
}
net
}
fn add_pnml_arc(
net: &mut PetriNet,
src_id: &str,
tgt_id: &str,
weight: Weight,
place_id_map: &HashMap<String, usize>,
trans_id_map: &HashMap<String, usize>,
) {
match (place_id_map.get(src_id), trans_id_map.get(tgt_id)) {
(Some(&p_idx), Some(&t_idx)) => net.add_arc_in(t_idx, p_idx, weight),
_ => {
if let (Some(&t_idx), Some(&p_idx)) =
(trans_id_map.get(src_id), place_id_map.get(tgt_id))
{
net.add_arc_out(t_idx, p_idx, weight);
}
}
}
}
pub fn producer_consumer_net() -> PetriNet {
let mut net = PetriNet::new();
let buffer = net.add_place(Place::with_tokens("buffer", 0));
let ready = net.add_place(Place::with_tokens("consumer_ready", 1));
let produce = net.add_transition(Transition::new("produce"));
let consume = net.add_transition(Transition::new("consume"));
net.add_arc_in(consume, buffer, 1);
net.add_arc_in(consume, ready, 1);
net.add_arc_out(produce, buffer, 1);
net.add_arc_out(consume, ready, 1);
net
}
pub fn mutex_net() -> PetriNet {
let mut net = PetriNet::new();
let resource = net.add_place(Place::with_tokens("resource", 1));
let p1_idle = net.add_place(Place::with_tokens("p1_idle", 1));
let p2_idle = net.add_place(Place::with_tokens("p2_idle", 1));
let p1_cs = net.add_place(Place::with_tokens("p1_cs", 0));
let p2_cs = net.add_place(Place::with_tokens("p2_cs", 0));
let enter1 = net.add_transition(Transition::new("enter1"));
let exit1 = net.add_transition(Transition::new("exit1"));
let enter2 = net.add_transition(Transition::new("enter2"));
let exit2 = net.add_transition(Transition::new("exit2"));
net.add_arc_in(enter1, resource, 1);
net.add_arc_in(enter1, p1_idle, 1);
net.add_arc_out(enter1, p1_cs, 1);
net.add_arc_in(exit1, p1_cs, 1);
net.add_arc_out(exit1, resource, 1);
net.add_arc_out(exit1, p1_idle, 1);
net.add_arc_in(enter2, resource, 1);
net.add_arc_in(enter2, p2_idle, 1);
net.add_arc_out(enter2, p2_cs, 1);
net.add_arc_in(exit2, p2_cs, 1);
net.add_arc_out(exit2, resource, 1);
net.add_arc_out(exit2, p2_idle, 1);
net
}
pub fn incidence_matrix(net: &PetriNet) -> Vec<Vec<i64>> {
let nt = net.transitions.len();
let np = net.places.len();
let mut matrix = vec![vec![0i64; np]; nt];
for &(t, p, w) in &net.arcs_out {
matrix[t][p] += w as i64;
}
for &(t, p, w) in &net.arcs_in {
matrix[t][p] -= w as i64;
}
matrix
}
pub fn state_equation(net: &PetriNet, sigma: &[i64]) -> Vec<i64> {
let matrix = incidence_matrix(net);
let marking: Vec<i64> = net.places.iter().map(|p| p.tokens as i64).collect();
let np = net.places.len();
let mut result = marking;
for (t_idx, &fires) in sigma.iter().enumerate() {
if t_idx < matrix.len() {
for p in 0..np {
result[p] += matrix[t_idx][p] * fires;
}
}
}
result
}
pub fn is_trap(net: &PetriNet, place_set: &[usize]) -> bool {
let set: HashSet<usize> = place_set.iter().copied().collect();
for &(t, p_in, _) in &net.arcs_in {
if set.contains(&p_in) {
let produces_into_set = net
.arcs_out
.iter()
.any(|&(tout, pout, _)| tout == t && set.contains(&pout));
if !produces_into_set {
return false;
}
}
}
true
}
pub fn is_siphon(net: &PetriNet, place_set: &[usize]) -> bool {
let set: HashSet<usize> = place_set.iter().copied().collect();
for &(t, p_out, _) in &net.arcs_out {
if set.contains(&p_out) {
let takes_from_set = net
.arcs_in
.iter()
.any(|&(tin, pin, _)| tin == t && set.contains(&pin));
if !takes_from_set {
return false;
}
}
}
true
}
pub fn find_t_invariant(net: &PetriNet, max_count: usize) -> Option<Vec<i64>> {
let matrix = incidence_matrix(net);
let nt = net.transitions.len();
let np = net.places.len();
fn rec(
matrix: &[Vec<i64>],
nt: usize,
np: usize,
max_count: usize,
idx: usize,
current: &mut Vec<i64>,
) -> Option<Vec<i64>> {
if idx == nt {
let mut is_trivial = true;
for &c in current.iter() {
if c != 0 {
is_trivial = false;
break;
}
}
if is_trivial {
return None;
}
for p in 0..np {
let mut sum = 0i64;
for t in 0..nt {
sum += matrix[t][p] * current[t];
}
if sum != 0 {
return None;
}
}
return Some(current.clone());
}
for v in 0..=(max_count as i64) {
current[idx] = v;
if let Some(res) = rec(matrix, nt, np, max_count, idx + 1, current) {
return Some(res);
}
}
None
}
let mut current = vec![0i64; nt];
rec(&matrix, nt, np, max_count, 0, &mut current)
}
fn gcd(mut a: i64, mut b: i64) -> i64 {
a = a.abs();
b = b.abs();
while b != 0 {
let r = a % b;
a = b;
b = r;
}
a
}
fn lcm_checked(a: i64, b: i64) -> Option<i64> {
if a == 0 || b == 0 {
return Some(0);
}
let g = gcd(a, b);
(a / g).checked_mul(b)
}
fn integer_row_reduce(mat: &mut [Vec<i64>], n_c: usize) {
let n_rows = mat.len();
if n_rows == 0 || n_c == 0 {
return;
}
let n_cols = mat[0].len();
let mut pivot_row = 0usize;
for col in 0..n_c {
let Some(found) = (pivot_row..n_rows).find(|&r| mat[r][col] != 0) else {
continue;
};
mat.swap(pivot_row, found);
let pivot_val = mat[pivot_row][col];
for row in 0..n_rows {
if row == pivot_row {
continue;
}
let target = mat[row][col];
if target == 0 {
continue;
}
let g = gcd(pivot_val.abs(), target.abs());
let scale_row = pivot_val / g; let scale_pivot = target / g;
let updated: Vec<i64> = (0..n_cols)
.map(|c| {
mat[row][c]
.saturating_mul(scale_row)
.saturating_sub(mat[pivot_row][c].saturating_mul(scale_pivot))
})
.collect();
mat[row] = updated;
let row_gcd = mat[row].iter().fold(0i64, |acc, &v| gcd(acc, v.abs()));
if row_gcd > 1 {
for c in 0..n_cols {
mat[row][c] /= row_gcd;
}
}
}
pivot_row += 1;
}
}
pub fn t_invariants(net: &PetriNet) -> Vec<Vec<i64>> {
let n_t = net.transitions.len();
let n_p = net.places.len();
if n_t == 0 || n_p == 0 {
return Vec::new();
}
let mut mat: Vec<Vec<i64>> = (0..n_t)
.map(|t| {
let mut row: Vec<i64> = (0..n_p)
.map(|p| {
let post: i64 = net
.arcs_out
.iter()
.filter(|&&(ti, pi, _)| ti == t && pi == p)
.map(|&(_, _, w)| w as i64)
.sum();
let pre: i64 = net
.arcs_in
.iter()
.filter(|&&(ti, pi, _)| ti == t && pi == p)
.map(|&(_, _, w)| w as i64)
.sum();
post - pre
})
.collect();
for j in 0..n_t {
row.push(if j == t { 1 } else { 0 });
}
row
})
.collect();
integer_row_reduce(&mut mat, n_p);
let mut invariants: Vec<Vec<i64>> = Vec::new();
for row in &mat {
let ct_all_zero = row[..n_p].iter().all(|&x| x == 0);
if !ct_all_zero {
continue;
}
let v: Vec<i64> = row[n_p..].to_vec();
let has_positive = v.iter().any(|&x| x > 0);
let has_negative = v.iter().any(|&x| x < 0);
let candidate: Vec<i64> = if has_positive && !has_negative {
v
} else if has_negative && !has_positive {
v.iter().map(|&x| -x).collect()
} else {
continue;
};
let g = candidate.iter().fold(0i64, |acc, &x| gcd(acc, x.abs()));
let normalised: Vec<i64> = if g > 1 {
candidate.iter().map(|&x| x / g).collect()
} else {
candidate
};
invariants.push(normalised);
}
invariants
}
pub fn replay_sequence(net: &mut PetriNet, sequence: &[usize]) -> Result<Vec<usize>, usize> {
for (step, &t_idx) in sequence.iter().enumerate() {
if net.fire(t_idx).is_err() {
return Err(step);
}
}
Ok(net.marking())
}
#[derive(Debug, Clone)]
pub struct PetriNetMetrics {
pub n_places: usize,
pub n_transitions: usize,
pub n_arcs: usize,
pub avg_out_degree: f64,
pub has_source_transition: bool,
pub has_sink_transition: bool,
}
pub fn compute_metrics(net: &PetriNet) -> PetriNetMetrics {
let n_transitions = net.transitions.len();
let total_out: usize = net
.transitions
.iter()
.enumerate()
.map(|(t, _)| net.arcs_out.iter().filter(|&&(ti, _, _)| ti == t).count())
.sum();
let avg_out_degree = if n_transitions == 0 {
0.0
} else {
total_out as f64 / n_transitions as f64
};
let has_source_transition =
(0..n_transitions).any(|t| !net.arcs_in.iter().any(|&(ti, _, _)| ti == t));
let has_sink_transition =
(0..n_transitions).any(|t| !net.arcs_out.iter().any(|&(ti, _, _)| ti == t));
PetriNetMetrics {
n_places: net.places.len(),
n_transitions,
n_arcs: net.arcs_in.len() + net.arcs_out.len(),
avg_out_degree,
has_source_transition,
has_sink_transition,
}
}
pub fn disjoint_union(a: &PetriNet, b: &PetriNet) -> PetriNet {
let mut result = a.clone();
let place_offset = a.places.len();
let trans_offset = a.transitions.len();
for p in &b.places {
result.places.push(p.clone());
}
for t in &b.transitions {
result.transitions.push(t.clone());
}
for &(t, p, w) in &b.arcs_in {
result.arcs_in.push((t + trans_offset, p + place_offset, w));
}
for &(t, p, w) in &b.arcs_out {
result
.arcs_out
.push((t + trans_offset, p + place_offset, w));
}
result
}
pub fn is_live_approx(net: &PetriNet, t_idx: usize) -> bool {
let reachable = net.reachability_bfs();
let mut sim = net.clone();
for marking in &reachable {
sim.set_marking(marking);
if sim.enabled_transitions().contains(&t_idx) {
return true;
}
}
false
}
pub fn workflow_with_resource(capacity: usize) -> PetriNet {
let mut net = PetriNet::new();
let start = net.add_place(Place::with_tokens("start", 1));
let working = net.add_place(Place::with_tokens("working", 0));
let done = net.add_place(Place::with_tokens("done", 0));
let resource = net.add_place(Place::with_tokens("resource", capacity));
let begin = net.add_transition(Transition::new("begin"));
let finish = net.add_transition(Transition::new("finish"));
net.add_arc_in(begin, start, 1);
net.add_arc_in(begin, resource, 1);
net.add_arc_out(begin, working, 1);
net.add_arc_in(finish, working, 1);
net.add_arc_out(finish, done, 1);
net.add_arc_out(finish, resource, 1);
net
}
pub type SparseMarking = HashMap<String, usize>;
pub fn dense_to_sparse(net: &PetriNet, marking: &[usize]) -> SparseMarking {
net.places
.iter()
.zip(marking.iter())
.map(|(p, &tok)| (p.name.clone(), tok))
.collect()
}
pub fn sparse_to_dense(net: &PetriNet, sparse: &SparseMarking) -> Vec<usize> {
net.places
.iter()
.map(|p| *sparse.get(&p.name).unwrap_or(&0))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
fn simple_net() -> PetriNet {
let mut net = PetriNet::new();
let p0 = net.add_place(Place::with_tokens("p0", 1));
let p1 = net.add_place(Place::with_tokens("p1", 0));
let t0 = net.add_transition(Transition::new("t0"));
net.add_arc_in(t0, p0, 1);
net.add_arc_out(t0, p1, 1);
net
}
#[test]
fn test_place_tokens() {
let p = Place::with_tokens("p", 3);
assert_eq!(p.tokens, 3);
}
#[test]
fn test_transition_name() {
let t = Transition::new("t1");
assert_eq!(t.name, "t1");
}
#[test]
fn test_add_place_transition() {
let mut net = PetriNet::new();
let pi = net.add_place(Place::new("p"));
let ti = net.add_transition(Transition::new("t"));
assert_eq!(pi, 0);
assert_eq!(ti, 0);
}
#[test]
fn test_fire_basic() {
let mut net = simple_net();
assert_eq!(net.marking(), vec![1, 0]);
net.fire(0).unwrap();
assert_eq!(net.marking(), vec![0, 1]);
}
#[test]
fn test_fire_not_enabled() {
let mut net = simple_net();
net.fire(0).unwrap(); assert!(net.fire(0).is_err());
}
#[test]
fn test_enabled_transitions_empty() {
let mut net = simple_net();
net.fire(0).unwrap();
assert!(net.enabled_transitions().is_empty());
}
#[test]
fn test_is_deadlocked() {
let mut net = simple_net();
net.fire(0).unwrap();
assert!(net.is_deadlocked());
}
#[test]
fn test_marking_sum() {
let net = simple_net();
assert_eq!(net.marking_sum(), 1);
}
#[test]
fn test_reachability_bfs() {
let net = simple_net();
let reachable = net.reachability_bfs();
assert!(reachable.contains(&vec![1, 0]));
assert!(reachable.contains(&vec![0, 1]));
assert_eq!(reachable.len(), 2);
}
#[test]
fn test_mutex_net_reachability() {
let net = mutex_net();
let reachable = net.reachability_bfs();
for marking in &reachable {
assert!(
!(marking[3] >= 1 && marking[4] >= 1),
"mutual exclusion violated"
);
}
}
#[test]
fn test_producer_consumer_enabled() {
let mut net = producer_consumer_net();
let enabled = net.enabled_transitions();
assert_eq!(enabled.len(), 1); net.fire(enabled[0]).unwrap(); assert_eq!(net.marking_sum(), 2); }
#[test]
fn test_coverability_tree_bounded() {
let net = simple_net();
let tree = CoverabilityTree::build(&net);
assert!(tree.is_bounded());
}
#[test]
fn test_coverability_tree_unbounded() {
let mut net = PetriNet::new();
let p0 = net.add_place(Place::with_tokens("p0", 1));
let t0 = net.add_transition(Transition::new("t0"));
net.add_arc_in(t0, p0, 1);
net.add_arc_out(t0, p0, 2);
let tree = CoverabilityTree::build(&net);
assert!(!tree.is_bounded());
}
#[test]
fn test_stochastic_simulate() {
let mut net = PetriNet::new();
let p0 = net.add_place(Place::with_tokens("p0", 3));
let p1 = net.add_place(Place::with_tokens("p1", 0));
let t0 = net.add_transition(Transition::new("t0"));
net.add_arc_in(t0, p0, 1);
net.add_arc_out(t0, p1, 1);
let mut spn = StochasticPetriNet::new(net, vec![1.0]);
let events = spn.simulate_until(10.0);
assert_eq!(events.len(), 3);
}
#[test]
fn test_stochastic_steady_state() {
let net = simple_net();
let spn = StochasticPetriNet::new(net, vec![1.0]);
let ss = spn.steady_state_estimate(100.0, 5);
assert!((ss.iter().sum::<f64>() - 1.0).abs() < 1e-9);
}
#[test]
fn test_colored_place_tokens() {
let mut p = ColoredPlace::new("p");
p.add_token(ColoredToken::new(vec![1.0, 2.0]));
assert_eq!(p.token_count(), 1);
let tok = p.take_token().unwrap();
assert_eq!(tok.color, vec![1.0, 2.0]);
assert_eq!(p.token_count(), 0);
}
#[test]
fn test_colored_petri_net_fire() {
let mut cpn = ColoredPetriNet::new();
let p0 = cpn.add_place(ColoredPlace::new("p0"));
let p1 = cpn.add_place(ColoredPlace::new("p1"));
let t0 = cpn.add_transition(ColoredTransition::new("t0"));
cpn.add_arc_in(t0, p0);
cpn.add_arc_out(t0, p1);
cpn.places[p0].add_token(ColoredToken::new(vec![1.0]));
cpn.fire(t0).unwrap();
assert_eq!(cpn.places[p0].token_count(), 0);
assert_eq!(cpn.places[p1].token_count(), 1);
}
#[test]
fn test_parse_pnml_empty() {
let net = parse_pnml("<pnml/>");
assert!(net.places.is_empty());
}
#[test]
fn test_parse_pnml_places_and_transitions() {
let xml = r#"<pnml>
<net>
<place id="p1" name="Place1"><initialMarking><text>2</text></initialMarking></place>
<place id="p2" name="Place2"><initialMarking><text>0</text></initialMarking></place>
<transition id="t1" name="Trans1"/>
<arc id="a1" source="p1" target="t1"/>
<arc id="a2" source="t1" target="p2"/>
</net>
</pnml>"#;
let net = parse_pnml(xml);
assert_eq!(net.places.len(), 2, "should have 2 places");
assert_eq!(net.transitions.len(), 1, "should have 1 transition");
assert_eq!(net.arcs_in.len(), 1, "should have 1 input arc");
assert_eq!(net.arcs_out.len(), 1, "should have 1 output arc");
assert_eq!(net.places[0].tokens, 2, "Place1 should have 2 tokens");
assert_eq!(net.places[1].tokens, 0, "Place2 should have 0 tokens");
}
#[test]
fn test_parse_pnml_fire_sequence() {
let xml = r#"<pnml>
<net>
<place id="p0" name="p0"><initialMarking><text>1</text></initialMarking></place>
<place id="p1" name="p1"><initialMarking><text>0</text></initialMarking></place>
<transition id="t0" name="t0"/>
<arc id="a0" source="p0" target="t0"/>
<arc id="a1" source="t0" target="p1"/>
</net>
</pnml>"#;
let mut net = parse_pnml(xml);
assert_eq!(net.marking(), vec![1, 0]);
net.fire(0).expect("t0 should be enabled");
assert_eq!(net.marking(), vec![0, 1]);
}
#[test]
fn test_parse_pnml_arc_weight() {
let xml = r#"<pnml>
<net>
<place id="p0" name="p0"><initialMarking><text>3</text></initialMarking></place>
<place id="p1" name="p1"><initialMarking><text>0</text></initialMarking></place>
<transition id="t0" name="t0"/>
<arc id="a0" source="p0" target="t0"><inscription><text>2</text></inscription></arc>
<arc id="a1" source="t0" target="p1"><inscription><text>1</text></inscription></arc>
</net>
</pnml>"#;
let mut net = parse_pnml(xml);
assert_eq!(net.marking(), vec![3, 0]);
net.fire(0).expect("t0 should be enabled (3 >= 2)");
assert_eq!(net.marking(), vec![1, 1]);
}
#[test]
fn test_incidence_matrix() {
let net = simple_net();
let m = incidence_matrix(&net);
assert_eq!(m[0][0], -1);
assert_eq!(m[0][1], 1);
}
#[test]
fn test_state_equation() {
let net = simple_net();
let result = state_equation(&net, &[1]);
assert_eq!(result, vec![0, 1]);
}
#[test]
fn test_is_trap() {
let simple = simple_net();
assert!(is_trap(&simple, &[1]));
}
#[test]
fn test_is_siphon() {
let simple = simple_net();
assert!(is_siphon(&simple, &[0]));
}
#[test]
fn test_find_t_invariant() {
let mut net = PetriNet::new();
let p0 = net.add_place(Place::with_tokens("p0", 1));
let p1 = net.add_place(Place::with_tokens("p1", 0));
let t0 = net.add_transition(Transition::new("t0"));
let t1 = net.add_transition(Transition::new("t1"));
net.add_arc_in(t0, p0, 1);
net.add_arc_out(t0, p1, 1);
net.add_arc_in(t1, p1, 1);
net.add_arc_out(t1, p0, 1);
let inv = find_t_invariant(&net, 2);
assert!(inv.is_some());
}
#[test]
fn test_t_invariants_loop_net() {
let mut net = PetriNet::new();
let p0 = net.add_place(Place::with_tokens("p0", 1));
let p1 = net.add_place(Place::with_tokens("p1", 0));
let t0 = net.add_transition(Transition::new("t0"));
let t1 = net.add_transition(Transition::new("t1"));
net.add_arc_in(t0, p0, 1);
net.add_arc_out(t0, p1, 1);
net.add_arc_in(t1, p1, 1);
net.add_arc_out(t1, p0, 1);
let invs = t_invariants(&net);
assert!(
!invs.is_empty(),
"loop net must have at least one T-invariant"
);
assert!(
invs.iter().any(|v| v == &[1i64, 1]),
"expected [1,1] in invariants, got {:?}",
invs
);
}
#[test]
fn test_t_invariants_producer_consumer() {
let net = producer_consumer_net();
let invs = t_invariants(&net);
assert!(
!invs.is_empty(),
"producer-consumer net should have T-invariants"
);
let mat = incidence_matrix(&net);
for inv in &invs {
let n_p = net.places.len();
for p in 0..n_p {
let sum: i64 = mat.iter().zip(inv.iter()).map(|(row, &x)| row[p] * x).sum();
assert_eq!(
sum, 0,
"T-invariant {:?} must satisfy C*x=0, failed at place {}",
inv, p
);
}
}
}
#[test]
fn test_t_invariants_empty_net() {
let net = PetriNet::new();
let invs = t_invariants(&net);
assert!(invs.is_empty());
}
#[test]
fn test_t_invariants_no_cycle() {
let net = simple_net();
let invs = t_invariants(&net);
for inv in &invs {
let mat = incidence_matrix(&net);
let n_p = net.places.len();
for p in 0..n_p {
let sum: i64 = mat.iter().zip(inv.iter()).map(|(row, &x)| row[p] * x).sum();
assert_eq!(sum, 0, "Invariant {:?} must satisfy C*x=0", inv);
}
}
}
#[test]
fn test_replay_sequence_ok() {
let mut net = simple_net();
let result = replay_sequence(&mut net, &[0]);
assert_eq!(result, Ok(vec![0, 1]));
}
#[test]
fn test_replay_sequence_fail() {
let mut net = simple_net();
let result = replay_sequence(&mut net, &[0, 0]);
assert_eq!(result, Err(1));
}
#[test]
fn test_compute_metrics() {
let net = simple_net();
let m = compute_metrics(&net);
assert_eq!(m.n_places, 2);
assert_eq!(m.n_transitions, 1);
assert_eq!(m.n_arcs, 2);
}
#[test]
fn test_disjoint_union() {
let a = simple_net();
let b = simple_net();
let u = disjoint_union(&a, &b);
assert_eq!(u.places.len(), 4);
assert_eq!(u.transitions.len(), 2);
}
#[test]
fn test_is_live_approx() {
let net = simple_net();
assert!(is_live_approx(&net, 0));
}
#[test]
fn test_workflow_net() {
let net = workflow_with_resource(2);
assert_eq!(net.places.len(), 4);
assert_eq!(net.transitions.len(), 2);
}
#[test]
fn test_dense_sparse_roundtrip() {
let net = simple_net();
let marking = net.marking();
let sparse = dense_to_sparse(&net, &marking);
let dense = sparse_to_dense(&net, &sparse);
assert_eq!(marking, dense);
}
#[test]
fn test_guarded_transition() {
let mut net = PetriNet::new();
let p0 = net.add_place(Place::with_tokens("p0", 2));
let p1 = net.add_place(Place::with_tokens("p1", 0));
let t0 = net.add_transition(Transition::with_guard("t0", |tokens| {
tokens.iter().all(|&t| t >= 2)
}));
net.add_arc_in(t0, p0, 1);
net.add_arc_out(t0, p1, 1);
let enabled = net.enabled_transitions();
assert_eq!(enabled.len(), 1);
net.fire(0).unwrap();
assert!(net.enabled_transitions().is_empty());
}
#[test]
fn test_coverability_tree_node_count() {
let net = simple_net();
let tree = CoverabilityTree::build(&net);
assert!(tree.node_count() >= 2);
}
#[test]
fn test_colored_transition_guard() {
let mut cpn = ColoredPetriNet::new();
let p0 = cpn.add_place(ColoredPlace::new("p0"));
let p1 = cpn.add_place(ColoredPlace::new("p1"));
let t0 = cpn.add_transition(ColoredTransition::with_guard("t0", |tok| {
tok.color.first().copied().unwrap_or(0.0) > 0.5
}));
cpn.add_arc_in(t0, p0);
cpn.add_arc_out(t0, p1);
cpn.places[p0].add_token(ColoredToken::new(vec![0.1]));
assert!(cpn.enabled_transitions().is_empty());
cpn.places[p0].tokens.clear();
cpn.places[p0].add_token(ColoredToken::new(vec![0.9]));
assert_eq!(cpn.enabled_transitions(), vec![0]);
}
}