Skip to main content

netrun_sim/
graph.rs

1//! Graph topology types for flow-based development networks.
2//!
3//! This module defines the static structure of a network: nodes, ports, edges,
4//! and the conditions that govern packet flow (salvo conditions).
5
6use indexmap::IndexMap;
7use std::collections::{HashMap, HashSet};
8
9/// The name of a port on a node.
10pub type PortName = String;
11
12/// Specifies the capacity of a port (how many packets it can hold).
13#[derive(Debug, Clone)]
14pub enum PortSlotSpec {
15    /// Port can hold unlimited packets.
16    Infinite,
17    /// Port can hold at most this many packets.
18    Finite(u64),
19}
20
21/// A port on a node where packets can enter or exit.
22#[derive(Debug, Clone)]
23pub struct Port {
24    /// The capacity specification for this port.
25    pub slots_spec: PortSlotSpec,
26}
27
28/// A predicate on the state of a port, used in salvo conditions.
29///
30/// These predicates test the current packet count at a port against
31/// various conditions like empty, full, or numeric comparisons.
32#[derive(Debug, Clone)]
33pub enum PortState {
34    /// Port has zero packets.
35    Empty,
36    /// Port is at capacity (always false for infinite ports).
37    Full,
38    /// Port has at least one packet.
39    NonEmpty,
40    /// Port is below capacity (always true for infinite ports).
41    NonFull,
42    /// Port has exactly this many packets.
43    Equals(u64),
44    /// Port has fewer than this many packets.
45    LessThan(u64),
46    /// Port has more than this many packets.
47    GreaterThan(u64),
48    /// Port has at most this many packets.
49    EqualsOrLessThan(u64),
50    /// Port has at least this many packets.
51    EqualsOrGreaterThan(u64),
52}
53
54/// Specifies how many packets to take from a port in a salvo.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum PacketCount {
57    /// Take all packets from the port.
58    All,
59    /// Take at most this many packets (takes fewer if port has fewer).
60    Count(u64),
61}
62
63/// Specifies the maximum number of times a salvo condition can trigger.
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum MaxSalvos {
66    /// No limit on how many times the condition can trigger.
67    Infinite,
68    /// Can trigger at most this many times.
69    Finite(u64),
70}
71
72/// A boolean expression over port states, used to define when salvos can trigger.
73///
74/// This forms a simple expression tree that can combine port state checks
75/// with logical operators (And, Or, Not).
76#[derive(Debug, Clone)]
77pub enum SalvoConditionTerm {
78    /// Always true. Useful for source nodes with no input ports.
79    True,
80    /// Always false. Useful as a placeholder or with Not.
81    False,
82    /// Check if a specific port matches a state predicate.
83    Port { port_name: String, state: PortState },
84    /// All sub-terms must be true.
85    And(Vec<Self>),
86    /// At least one sub-term must be true.
87    Or(Vec<Self>),
88    /// The sub-term must be false.
89    Not(Box<Self>),
90}
91
92/// Evaluates a salvo condition term against the current port packet counts.
93///
94/// # Arguments
95/// * `term` - The condition term to evaluate
96/// * `port_packet_counts` - Map of port names to their current packet counts
97/// * `ports` - Map of port names to their definitions (needed for capacity checks)
98///
99/// # Returns
100/// `true` if the condition is satisfied, `false` otherwise.
101pub fn evaluate_salvo_condition(
102    term: &SalvoConditionTerm,
103    port_packet_counts: &HashMap<PortName, u64>,
104    ports: &HashMap<PortName, Port>,
105) -> bool {
106    match term {
107        SalvoConditionTerm::True => true,
108        SalvoConditionTerm::False => false,
109        SalvoConditionTerm::Port { port_name, state } => {
110            debug_assert!(
111                port_packet_counts.contains_key(port_name),
112                "Port '{}' not found in packet counts — Graph.validate() should have caught this",
113                port_name
114            );
115            let count = *port_packet_counts.get(port_name).unwrap_or(&0);
116
117            debug_assert!(
118                ports.contains_key(port_name),
119                "Port '{}' not found in port definitions — Graph.validate() should have caught this",
120                port_name
121            );
122            let port = ports.get(port_name);
123
124            match state {
125                PortState::Empty => count == 0,
126                PortState::Full => match port {
127                    Some(p) => match p.slots_spec {
128                        PortSlotSpec::Infinite => false, // Infinite port can never be full
129                        PortSlotSpec::Finite(max) => count >= max,
130                    },
131                    None => false,
132                },
133                PortState::NonEmpty => count > 0,
134                PortState::NonFull => match port {
135                    Some(p) => match p.slots_spec {
136                        PortSlotSpec::Infinite => true, // Infinite port is always non-full
137                        PortSlotSpec::Finite(max) => count < max,
138                    },
139                    None => true,
140                },
141                PortState::Equals(n) => count == *n,
142                PortState::LessThan(n) => count < *n,
143                PortState::GreaterThan(n) => count > *n,
144                PortState::EqualsOrLessThan(n) => count <= *n,
145                PortState::EqualsOrGreaterThan(n) => count >= *n,
146            }
147        }
148        SalvoConditionTerm::And(terms) => terms
149            .iter()
150            .all(|t| evaluate_salvo_condition(t, port_packet_counts, ports)),
151        SalvoConditionTerm::Or(terms) => terms
152            .iter()
153            .any(|t| evaluate_salvo_condition(t, port_packet_counts, ports)),
154        SalvoConditionTerm::Not(inner) => {
155            !evaluate_salvo_condition(inner, port_packet_counts, ports)
156        }
157    }
158}
159
160/// The name of a salvo condition.
161pub type SalvoConditionName = String;
162
163/// A condition that defines when packets can trigger an epoch or be sent.
164///
165/// Salvo conditions are attached to nodes and control the flow of packets:
166/// - **Input salvo conditions**: Define when packets at input ports can trigger a new epoch
167/// - **Output salvo conditions**: Define when packets at output ports can be sent out
168#[derive(Debug, Clone)]
169pub struct SalvoCondition {
170    /// Maximum number of times this condition can trigger per epoch.
171    /// For input salvo conditions, this must be `Finite(1)`.
172    pub max_salvos: MaxSalvos,
173    /// The ports whose packets are included when this condition triggers,
174    /// and how many packets to take from each port.
175    pub ports: HashMap<PortName, PacketCount>,
176    /// The boolean condition that must be satisfied for this salvo to trigger.
177    pub term: SalvoConditionTerm,
178}
179
180/// Extracts all port names referenced in a SalvoConditionTerm.
181fn collect_ports_from_term(term: &SalvoConditionTerm, ports: &mut HashSet<PortName>) {
182    match term {
183        SalvoConditionTerm::True | SalvoConditionTerm::False => {
184            // No ports referenced
185        }
186        SalvoConditionTerm::Port { port_name, .. } => {
187            ports.insert(port_name.clone());
188        }
189        SalvoConditionTerm::And(terms) | SalvoConditionTerm::Or(terms) => {
190            for t in terms {
191                collect_ports_from_term(t, ports);
192            }
193        }
194        SalvoConditionTerm::Not(inner) => {
195            collect_ports_from_term(inner, ports);
196        }
197    }
198}
199
200/// Errors that can occur during graph validation
201#[derive(Debug, Clone, PartialEq, thiserror::Error)]
202pub enum GraphValidationError {
203    /// Multiple edges from same output port (fan-out not allowed)
204    #[error("output port {output_port} has {edge_count} outgoing edges (only 1 allowed)")]
205    MultipleEdgesFromOutputPort {
206        output_port: PortRef,
207        edge_count: usize,
208    },
209    /// Edge references a node that doesn't exist
210    #[error("edge {edge_source} -> {edge_target} references non-existent node '{missing_node}'")]
211    EdgeReferencesNonexistentNode {
212        edge_source: PortRef,
213        edge_target: PortRef,
214        missing_node: NodeName,
215    },
216    /// Edge references a port that doesn't exist on the node
217    #[error("edge {edge_source} -> {edge_target} references non-existent port {missing_port}")]
218    EdgeReferencesNonexistentPort {
219        edge_source: PortRef,
220        edge_target: PortRef,
221        missing_port: PortRef,
222    },
223    /// Edge source is not an output port
224    #[error("edge source {edge_source} must be an output port")]
225    EdgeSourceNotOutputPort {
226        edge_source: PortRef,
227        edge_target: PortRef,
228    },
229    /// Edge target is not an input port
230    #[error("edge target {edge_target} must be an input port")]
231    EdgeTargetNotInputPort {
232        edge_source: PortRef,
233        edge_target: PortRef,
234    },
235    /// SalvoCondition.ports references a port that doesn't exist
236    #[error("{condition_type} salvo condition '{condition_name}' on node '{node_name}' references non-existent port '{missing_port}'", condition_type = if *is_input_condition { "input" } else { "output" })]
237    SalvoConditionReferencesNonexistentPort {
238        node_name: NodeName,
239        condition_name: SalvoConditionName,
240        is_input_condition: bool,
241        missing_port: PortName,
242    },
243    /// SalvoCondition.term references a port that doesn't exist
244    #[error("{condition_type} salvo condition '{condition_name}' on node '{node_name}' has term referencing non-existent port '{missing_port}'", condition_type = if *is_input_condition { "input" } else { "output" })]
245    SalvoConditionTermReferencesNonexistentPort {
246        node_name: NodeName,
247        condition_name: SalvoConditionName,
248        is_input_condition: bool,
249        missing_port: PortName,
250    },
251    /// Input salvo condition has max_salvos != Finite(1)
252    #[error(
253        "input salvo condition '{condition_name}' on node '{node_name}' has max_salvos={max_salvos:?}, but must be Finite(1). Input salvos must have exactly one packet to trigger an epoch."
254    )]
255    InputSalvoConditionInvalidMaxSalvos {
256        node_name: NodeName,
257        condition_name: SalvoConditionName,
258        max_salvos: MaxSalvos,
259    },
260}
261
262/// The name of a node in the graph.
263pub type NodeName = String;
264
265/// A processing unit in the graph with input and output ports.
266///
267/// Nodes are the fundamental building blocks of a flow-based network.
268/// They have:
269/// - Input ports where packets arrive
270/// - Output ports where packets are sent
271/// - Input salvo conditions that define when arriving packets trigger an epoch
272/// - Output salvo conditions that define when output packets can be sent
273#[derive(Debug, Clone)]
274pub struct Node {
275    /// The unique name of this node.
276    pub name: NodeName,
277    /// Input ports where packets can arrive.
278    pub in_ports: HashMap<PortName, Port>,
279    /// Output ports where packets can be sent.
280    pub out_ports: HashMap<PortName, Port>,
281    /// Conditions that trigger new epochs when satisfied by packets at input ports.
282    /// Uses IndexMap to preserve insertion order — first satisfied condition wins.
283    pub in_salvo_conditions: IndexMap<SalvoConditionName, SalvoCondition>,
284    /// Conditions that must be satisfied to send packets from output ports.
285    /// Uses IndexMap to preserve insertion order — first satisfied condition wins.
286    pub out_salvo_conditions: IndexMap<SalvoConditionName, SalvoCondition>,
287}
288
289/// Whether a port is for input or output.
290#[derive(Debug, Clone, PartialEq, Eq, Hash)]
291#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int, frozen, hash))]
292pub enum PortType {
293    /// An input port (packets flow into the node).
294    Input,
295    /// An output port (packets flow out of the node).
296    Output,
297}
298
299#[cfg(feature = "python")]
300#[pyo3::pymethods]
301impl PortType {
302    fn __repr__(&self) -> String {
303        match self {
304            PortType::Input => "PortType.Input".to_string(),
305            PortType::Output => "PortType.Output".to_string(),
306        }
307    }
308}
309
310/// A reference to a specific port on a specific node.
311#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312#[cfg_attr(feature = "python", pyo3::pyclass(eq, frozen, hash, get_all))]
313pub struct PortRef {
314    /// The name of the node containing this port.
315    pub node_name: NodeName,
316    /// Whether this is an input or output port.
317    pub port_type: PortType,
318    /// The name of the port on the node.
319    pub port_name: PortName,
320}
321
322#[cfg(feature = "python")]
323#[pyo3::pymethods]
324impl PortRef {
325    #[new]
326    fn py_new(node_name: String, port_type: PortType, port_name: String) -> Self {
327        PortRef {
328            node_name,
329            port_type,
330            port_name,
331        }
332    }
333
334    fn __repr__(&self) -> String {
335        format!(
336            "PortRef('{}', {:?}, '{}')",
337            self.node_name, self.port_type, self.port_name
338        )
339    }
340
341    fn __str__(&self) -> String {
342        self.to_string()
343    }
344}
345
346impl std::fmt::Display for PortRef {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        let port_type_str = match self.port_type {
349            PortType::Input => "in",
350            PortType::Output => "out",
351        };
352        write!(f, "{}.{}.{}", self.node_name, port_type_str, self.port_name)
353    }
354}
355
356/// A connection between two ports in the graph.
357///
358/// Edges connect output ports to input ports, allowing packets to flow
359/// between nodes.
360#[derive(Debug, Clone, PartialEq, Eq, Hash)]
361#[cfg_attr(feature = "python", pyo3::pyclass(eq, frozen, hash, get_all))]
362pub struct Edge {
363    /// The output port where this edge originates.
364    pub source: PortRef,
365    /// The input port where this edge terminates.
366    pub target: PortRef,
367}
368
369#[cfg(feature = "python")]
370#[pyo3::pymethods]
371impl Edge {
372    #[new]
373    fn py_new(source: PortRef, target: PortRef) -> Self {
374        Edge { source, target }
375    }
376
377    fn __repr__(&self) -> String {
378        format!("Edge({}, {})", self.source, self.target)
379    }
380
381    fn __str__(&self) -> String {
382        self.to_string()
383    }
384}
385
386impl std::fmt::Display for Edge {
387    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388        write!(f, "{} -> {}", self.source, self.target)
389    }
390}
391
392/// The static topology of a flow-based network.
393///
394/// A Graph defines the structure of a network: which nodes exist, how they're
395/// connected, and what conditions govern packet flow. The graph is immutable
396/// after creation.
397///
398/// # Example
399///
400/// ```
401/// use netrun_sim::graph::{Graph, Node, Edge, PortRef, PortType, Port, PortSlotSpec};
402/// use indexmap::IndexMap;
403/// use std::collections::HashMap;
404///
405/// // Create a simple A -> B graph
406/// let node_a = Node {
407///     name: "A".to_string(),
408///     in_ports: HashMap::new(),
409///     out_ports: [("out".to_string(), Port { slots_spec: PortSlotSpec::Infinite })].into(),
410///     in_salvo_conditions: IndexMap::new(),
411///     out_salvo_conditions: IndexMap::new(),
412/// };
413/// let node_b = Node {
414///     name: "B".to_string(),
415///     in_ports: [("in".to_string(), Port { slots_spec: PortSlotSpec::Infinite })].into(),
416///     out_ports: HashMap::new(),
417///     in_salvo_conditions: IndexMap::new(),
418///     out_salvo_conditions: IndexMap::new(),
419/// };
420///
421/// let edge = Edge {
422///     source: PortRef { node_name: "A".to_string(), port_type: PortType::Output, port_name: "out".to_string() },
423///     target: PortRef { node_name: "B".to_string(), port_type: PortType::Input, port_name: "in".to_string() },
424/// };
425///
426/// let graph = Graph::new(vec![node_a, node_b], vec![edge]);
427/// assert!(graph.validate().is_empty());
428/// ```
429#[derive(Debug, Clone)]
430pub struct Graph {
431    nodes: HashMap<NodeName, Node>,
432    edges: HashSet<Edge>,
433    edges_by_tail: HashMap<PortRef, Edge>,
434    edges_by_head: HashMap<PortRef, Vec<Edge>>,
435}
436
437impl Graph {
438    /// Creates a new Graph from a list of nodes and edges.
439    ///
440    /// Builds internal indexes for efficient edge lookups by source (tail) and target (head) ports.
441    pub fn new(nodes: Vec<Node>, edges: Vec<Edge>) -> Self {
442        let mut nodes_map: HashMap<NodeName, Node> = HashMap::with_capacity(nodes.len());
443        for node in nodes {
444            if nodes_map.contains_key(&node.name) {
445                panic!("Duplicate node name: '{}'", node.name);
446            }
447            nodes_map.insert(node.name.clone(), node);
448        }
449
450        let mut edges_set: HashSet<Edge> = HashSet::new();
451        let mut edges_by_tail: HashMap<PortRef, Edge> = HashMap::new();
452        let mut edges_by_head: HashMap<PortRef, Vec<Edge>> = HashMap::new();
453
454        for edge in edges {
455            edges_by_tail.insert(edge.source.clone(), edge.clone());
456            edges_by_head
457                .entry(edge.target.clone())
458                .or_default()
459                .push(edge.clone());
460            edges_set.insert(edge);
461        }
462
463        Graph {
464            nodes: nodes_map,
465            edges: edges_set,
466            edges_by_tail,
467            edges_by_head,
468        }
469    }
470
471    /// Returns a reference to all nodes in the graph, keyed by name.
472    pub fn nodes(&self) -> &HashMap<NodeName, Node> {
473        &self.nodes
474    }
475
476    /// Returns a reference to all edges in the graph.
477    pub fn edges(&self) -> &HashSet<Edge> {
478        &self.edges
479    }
480
481    /// Returns the edge that has the given output port as its source (tail).
482    pub fn get_edge_by_tail(&self, output_port_ref: &PortRef) -> Option<&Edge> {
483        self.edges_by_tail.get(output_port_ref)
484    }
485
486    /// Returns all edges that have the given input port as their target (head).
487    /// Fan-in is allowed, so multiple edges can connect to the same input port.
488    pub fn get_edges_by_head(&self, input_port_ref: &PortRef) -> &[Edge] {
489        self.edges_by_head
490            .get(input_port_ref)
491            .map(|v| v.as_slice())
492            .unwrap_or(&[])
493    }
494
495    /// Validates the graph structure.
496    ///
497    /// Returns a list of all validation errors found. An empty list means the graph is valid.
498    pub fn validate(&self) -> Vec<GraphValidationError> {
499        let mut errors = Vec::new();
500
501        // Validate edges (edges is a HashSet, so duplicates are already impossible)
502        for edge in &self.edges {
503            let source = &edge.source;
504            let target = &edge.target;
505
506            // Validate source node exists
507            let source_node = match self.nodes.get(&source.node_name) {
508                Some(node) => node,
509                None => {
510                    errors.push(GraphValidationError::EdgeReferencesNonexistentNode {
511                        edge_source: source.clone(),
512                        edge_target: target.clone(),
513                        missing_node: source.node_name.clone(),
514                    });
515                    continue;
516                }
517            };
518
519            // Validate target node exists
520            let target_node = match self.nodes.get(&target.node_name) {
521                Some(node) => node,
522                None => {
523                    errors.push(GraphValidationError::EdgeReferencesNonexistentNode {
524                        edge_source: source.clone(),
525                        edge_target: target.clone(),
526                        missing_node: target.node_name.clone(),
527                    });
528                    continue;
529                }
530            };
531
532            // Validate source is an output port
533            if source.port_type != PortType::Output {
534                errors.push(GraphValidationError::EdgeSourceNotOutputPort {
535                    edge_source: source.clone(),
536                    edge_target: target.clone(),
537                });
538            } else if !source_node.out_ports.contains_key(&source.port_name) {
539                errors.push(GraphValidationError::EdgeReferencesNonexistentPort {
540                    edge_source: source.clone(),
541                    edge_target: target.clone(),
542                    missing_port: source.clone(),
543                });
544            }
545
546            // Validate target is an input port
547            if target.port_type != PortType::Input {
548                errors.push(GraphValidationError::EdgeTargetNotInputPort {
549                    edge_source: source.clone(),
550                    edge_target: target.clone(),
551                });
552            } else if !target_node.in_ports.contains_key(&target.port_name) {
553                errors.push(GraphValidationError::EdgeReferencesNonexistentPort {
554                    edge_source: source.clone(),
555                    edge_target: target.clone(),
556                    missing_port: target.clone(),
557                });
558            }
559        }
560
561        // Check for multiple edges from same output port (fan-out not allowed)
562        let mut edges_from_source: HashMap<&PortRef, usize> = HashMap::new();
563        for edge in &self.edges {
564            *edges_from_source.entry(&edge.source).or_insert(0) += 1;
565        }
566        for (port_ref, count) in edges_from_source {
567            if count > 1 {
568                errors.push(GraphValidationError::MultipleEdgesFromOutputPort {
569                    output_port: port_ref.clone(),
570                    edge_count: count,
571                });
572            }
573        }
574
575        // Validate nodes and their salvo conditions
576        for (node_name, node) in &self.nodes {
577            // Validate input salvo conditions
578            for (cond_name, condition) in &node.in_salvo_conditions {
579                // Input salvo conditions must have max_salvos == Finite(1)
580                if condition.max_salvos != MaxSalvos::Finite(1) {
581                    errors.push(GraphValidationError::InputSalvoConditionInvalidMaxSalvos {
582                        node_name: node_name.clone(),
583                        condition_name: cond_name.clone(),
584                        max_salvos: condition.max_salvos.clone(),
585                    });
586                }
587
588                // Validate ports in condition.ports exist as input ports
589                for port_name in condition.ports.keys() {
590                    if !node.in_ports.contains_key(port_name) {
591                        errors.push(
592                            GraphValidationError::SalvoConditionReferencesNonexistentPort {
593                                node_name: node_name.clone(),
594                                condition_name: cond_name.clone(),
595                                is_input_condition: true,
596                                missing_port: port_name.clone(),
597                            },
598                        );
599                    }
600                }
601
602                // Validate ports in condition.term exist as input ports
603                let mut term_ports = HashSet::new();
604                collect_ports_from_term(&condition.term, &mut term_ports);
605                for port_name in term_ports {
606                    if !node.in_ports.contains_key(&port_name) {
607                        errors.push(
608                            GraphValidationError::SalvoConditionTermReferencesNonexistentPort {
609                                node_name: node_name.clone(),
610                                condition_name: cond_name.clone(),
611                                is_input_condition: true,
612                                missing_port: port_name,
613                            },
614                        );
615                    }
616                }
617            }
618
619            // Validate output salvo conditions
620            for (cond_name, condition) in &node.out_salvo_conditions {
621                // Validate ports in condition.ports exist as output ports
622                for port_name in condition.ports.keys() {
623                    if !node.out_ports.contains_key(port_name) {
624                        errors.push(
625                            GraphValidationError::SalvoConditionReferencesNonexistentPort {
626                                node_name: node_name.clone(),
627                                condition_name: cond_name.clone(),
628                                is_input_condition: false,
629                                missing_port: port_name.clone(),
630                            },
631                        );
632                    }
633                }
634
635                // Validate ports in condition.term exist as output ports
636                let mut term_ports = HashSet::new();
637                collect_ports_from_term(&condition.term, &mut term_ports);
638                for port_name in term_ports {
639                    if !node.out_ports.contains_key(&port_name) {
640                        errors.push(
641                            GraphValidationError::SalvoConditionTermReferencesNonexistentPort {
642                                node_name: node_name.clone(),
643                                condition_name: cond_name.clone(),
644                                is_input_condition: false,
645                                missing_port: port_name,
646                            },
647                        );
648                    }
649                }
650            }
651        }
652
653        errors
654    }
655}
656
657#[cfg(test)]
658#[path = "graph_tests.rs"]
659mod tests;