Skip to main content

netrun_sim/
graph.rs

1//! Graph topology types for flow-based development networks.
2//!
3//! This module defines the static structure of a network: nodes, ports, edges,
4//! and the conditions that govern packet flow (salvo conditions).
5
6use std::collections::{HashMap, HashSet};
7
8/// The name of a port on a node.
9pub type PortName = String;
10
11/// Specifies the capacity of a port (how many packets it can hold).
12#[derive(Debug, Clone)]
13pub enum PortSlotSpec {
14    /// Port can hold unlimited packets.
15    Infinite,
16    /// Port can hold at most this many packets.
17    Finite(u64),
18}
19
20/// A port on a node where packets can enter or exit.
21#[derive(Debug, Clone)]
22pub struct Port {
23    /// The capacity specification for this port.
24    pub slots_spec: PortSlotSpec,
25}
26
27/// A predicate on the state of a port, used in salvo conditions.
28///
29/// These predicates test the current packet count at a port against
30/// various conditions like empty, full, or numeric comparisons.
31#[derive(Debug, Clone)]
32pub enum PortState {
33    /// Port has zero packets.
34    Empty,
35    /// Port is at capacity (always false for infinite ports).
36    Full,
37    /// Port has at least one packet.
38    NonEmpty,
39    /// Port is below capacity (always true for infinite ports).
40    NonFull,
41    /// Port has exactly this many packets.
42    Equals(u64),
43    /// Port has fewer than this many packets.
44    LessThan(u64),
45    /// Port has more than this many packets.
46    GreaterThan(u64),
47    /// Port has at most this many packets.
48    EqualsOrLessThan(u64),
49    /// Port has at least this many packets.
50    EqualsOrGreaterThan(u64),
51}
52
53/// Specifies how many packets to take from a port in a salvo.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum PacketCount {
56    /// Take all packets from the port.
57    All,
58    /// Take at most this many packets (takes fewer if port has fewer).
59    Count(u64),
60}
61
62/// Specifies the maximum number of times a salvo condition can trigger.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum MaxSalvos {
65    /// No limit on how many times the condition can trigger.
66    Infinite,
67    /// Can trigger at most this many times.
68    Finite(u64),
69}
70
71/// A boolean expression over port states, used to define when salvos can trigger.
72///
73/// This forms a simple expression tree that can combine port state checks
74/// with logical operators (And, Or, Not).
75#[derive(Debug, Clone)]
76pub enum SalvoConditionTerm {
77    /// Always true. Useful for source nodes with no input ports.
78    True,
79    /// Always false. Useful as a placeholder or with Not.
80    False,
81    /// Check if a specific port matches a state predicate.
82    Port { port_name: String, state: PortState },
83    /// All sub-terms must be true.
84    And(Vec<Self>),
85    /// At least one sub-term must be true.
86    Or(Vec<Self>),
87    /// The sub-term must be false.
88    Not(Box<Self>),
89}
90
91/// Evaluates a salvo condition term against the current port packet counts.
92///
93/// # Arguments
94/// * `term` - The condition term to evaluate
95/// * `port_packet_counts` - Map of port names to their current packet counts
96/// * `ports` - Map of port names to their definitions (needed for capacity checks)
97///
98/// # Returns
99/// `true` if the condition is satisfied, `false` otherwise.
100pub fn evaluate_salvo_condition(
101    term: &SalvoConditionTerm,
102    port_packet_counts: &HashMap<PortName, u64>,
103    ports: &HashMap<PortName, Port>,
104) -> bool {
105    match term {
106        SalvoConditionTerm::True => true,
107        SalvoConditionTerm::False => false,
108        SalvoConditionTerm::Port { port_name, state } => {
109            let count = *port_packet_counts.get(port_name).unwrap_or(&0);
110            let port = ports.get(port_name);
111
112            match state {
113                PortState::Empty => count == 0,
114                PortState::Full => match port {
115                    Some(p) => match p.slots_spec {
116                        PortSlotSpec::Infinite => false, // Infinite port can never be full
117                        PortSlotSpec::Finite(max) => count >= max,
118                    },
119                    None => false,
120                },
121                PortState::NonEmpty => count > 0,
122                PortState::NonFull => match port {
123                    Some(p) => match p.slots_spec {
124                        PortSlotSpec::Infinite => true, // Infinite port is always non-full
125                        PortSlotSpec::Finite(max) => count < max,
126                    },
127                    None => true,
128                },
129                PortState::Equals(n) => count == *n,
130                PortState::LessThan(n) => count < *n,
131                PortState::GreaterThan(n) => count > *n,
132                PortState::EqualsOrLessThan(n) => count <= *n,
133                PortState::EqualsOrGreaterThan(n) => count >= *n,
134            }
135        }
136        SalvoConditionTerm::And(terms) => terms
137            .iter()
138            .all(|t| evaluate_salvo_condition(t, port_packet_counts, ports)),
139        SalvoConditionTerm::Or(terms) => terms
140            .iter()
141            .any(|t| evaluate_salvo_condition(t, port_packet_counts, ports)),
142        SalvoConditionTerm::Not(inner) => {
143            !evaluate_salvo_condition(inner, port_packet_counts, ports)
144        }
145    }
146}
147
148/// The name of a salvo condition.
149pub type SalvoConditionName = String;
150
151/// A condition that defines when packets can trigger an epoch or be sent.
152///
153/// Salvo conditions are attached to nodes and control the flow of packets:
154/// - **Input salvo conditions**: Define when packets at input ports can trigger a new epoch
155/// - **Output salvo conditions**: Define when packets at output ports can be sent out
156#[derive(Debug, Clone)]
157pub struct SalvoCondition {
158    /// Maximum number of times this condition can trigger per epoch.
159    /// For input salvo conditions, this must be `Finite(1)`.
160    pub max_salvos: MaxSalvos,
161    /// The ports whose packets are included when this condition triggers,
162    /// and how many packets to take from each port.
163    pub ports: HashMap<PortName, PacketCount>,
164    /// The boolean condition that must be satisfied for this salvo to trigger.
165    pub term: SalvoConditionTerm,
166}
167
168/// Extracts all port names referenced in a SalvoConditionTerm.
169fn collect_ports_from_term(term: &SalvoConditionTerm, ports: &mut HashSet<PortName>) {
170    match term {
171        SalvoConditionTerm::True | SalvoConditionTerm::False => {
172            // No ports referenced
173        }
174        SalvoConditionTerm::Port { port_name, .. } => {
175            ports.insert(port_name.clone());
176        }
177        SalvoConditionTerm::And(terms) | SalvoConditionTerm::Or(terms) => {
178            for t in terms {
179                collect_ports_from_term(t, ports);
180            }
181        }
182        SalvoConditionTerm::Not(inner) => {
183            collect_ports_from_term(inner, ports);
184        }
185    }
186}
187
188/// Errors that can occur during graph validation
189#[derive(Debug, Clone, PartialEq, thiserror::Error)]
190pub enum GraphValidationError {
191    /// Multiple edges from same output port (fan-out not allowed)
192    #[error("output port {output_port} has {edge_count} outgoing edges (only 1 allowed)")]
193    MultipleEdgesFromOutputPort {
194        output_port: PortRef,
195        edge_count: usize,
196    },
197    /// Edge references a node that doesn't exist
198    #[error("edge {edge_source} -> {edge_target} references non-existent node '{missing_node}'")]
199    EdgeReferencesNonexistentNode {
200        edge_source: PortRef,
201        edge_target: PortRef,
202        missing_node: NodeName,
203    },
204    /// Edge references a port that doesn't exist on the node
205    #[error("edge {edge_source} -> {edge_target} references non-existent port {missing_port}")]
206    EdgeReferencesNonexistentPort {
207        edge_source: PortRef,
208        edge_target: PortRef,
209        missing_port: PortRef,
210    },
211    /// Edge source is not an output port
212    #[error("edge source {edge_source} must be an output port")]
213    EdgeSourceNotOutputPort {
214        edge_source: PortRef,
215        edge_target: PortRef,
216    },
217    /// Edge target is not an input port
218    #[error("edge target {edge_target} must be an input port")]
219    EdgeTargetNotInputPort {
220        edge_source: PortRef,
221        edge_target: PortRef,
222    },
223    /// SalvoCondition.ports references a port that doesn't exist
224    #[error("{condition_type} salvo condition '{condition_name}' on node '{node_name}' references non-existent port '{missing_port}'", condition_type = if *is_input_condition { "input" } else { "output" })]
225    SalvoConditionReferencesNonexistentPort {
226        node_name: NodeName,
227        condition_name: SalvoConditionName,
228        is_input_condition: bool,
229        missing_port: PortName,
230    },
231    /// SalvoCondition.term references a port that doesn't exist
232    #[error("{condition_type} salvo condition '{condition_name}' on node '{node_name}' has term referencing non-existent port '{missing_port}'", condition_type = if *is_input_condition { "input" } else { "output" })]
233    SalvoConditionTermReferencesNonexistentPort {
234        node_name: NodeName,
235        condition_name: SalvoConditionName,
236        is_input_condition: bool,
237        missing_port: PortName,
238    },
239    /// Input salvo condition has max_salvos != Finite(1)
240    #[error(
241        "input salvo condition '{condition_name}' on node '{node_name}' has max_salvos={max_salvos:?}, but must be Finite(1). Input salvos must have exactly one packet to trigger an epoch."
242    )]
243    InputSalvoConditionInvalidMaxSalvos {
244        node_name: NodeName,
245        condition_name: SalvoConditionName,
246        max_salvos: MaxSalvos,
247    },
248    /// Duplicate edge (same source and target)
249    #[error("duplicate edge: {edge_source} -> {edge_target}")]
250    DuplicateEdge {
251        edge_source: PortRef,
252        edge_target: PortRef,
253    },
254}
255
256/// The name of a node in the graph.
257pub type NodeName = String;
258
259/// A processing unit in the graph with input and output ports.
260///
261/// Nodes are the fundamental building blocks of a flow-based network.
262/// They have:
263/// - Input ports where packets arrive
264/// - Output ports where packets are sent
265/// - Input salvo conditions that define when arriving packets trigger an epoch
266/// - Output salvo conditions that define when output packets can be sent
267#[derive(Debug, Clone)]
268pub struct Node {
269    /// The unique name of this node.
270    pub name: NodeName,
271    /// Input ports where packets can arrive.
272    pub in_ports: HashMap<PortName, Port>,
273    /// Output ports where packets can be sent.
274    pub out_ports: HashMap<PortName, Port>,
275    /// Conditions that trigger new epochs when satisfied by packets at input ports.
276    pub in_salvo_conditions: HashMap<SalvoConditionName, SalvoCondition>,
277    /// Conditions that must be satisfied to send packets from output ports.
278    pub out_salvo_conditions: HashMap<SalvoConditionName, SalvoCondition>,
279}
280
281/// Whether a port is for input or output.
282#[derive(Debug, Clone, PartialEq, Eq, Hash)]
283#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int, frozen, hash))]
284pub enum PortType {
285    /// An input port (packets flow into the node).
286    Input,
287    /// An output port (packets flow out of the node).
288    Output,
289}
290
291#[cfg(feature = "python")]
292#[pyo3::pymethods]
293impl PortType {
294    fn __repr__(&self) -> String {
295        match self {
296            PortType::Input => "PortType.Input".to_string(),
297            PortType::Output => "PortType.Output".to_string(),
298        }
299    }
300}
301
302/// A reference to a specific port on a specific node.
303#[derive(Debug, Clone, PartialEq, Eq, Hash)]
304#[cfg_attr(feature = "python", pyo3::pyclass(eq, frozen, hash, get_all))]
305pub struct PortRef {
306    /// The name of the node containing this port.
307    pub node_name: NodeName,
308    /// Whether this is an input or output port.
309    pub port_type: PortType,
310    /// The name of the port on the node.
311    pub port_name: PortName,
312}
313
314#[cfg(feature = "python")]
315#[pyo3::pymethods]
316impl PortRef {
317    #[new]
318    fn py_new(node_name: String, port_type: PortType, port_name: String) -> Self {
319        PortRef {
320            node_name,
321            port_type,
322            port_name,
323        }
324    }
325
326    fn __repr__(&self) -> String {
327        format!(
328            "PortRef('{}', {:?}, '{}')",
329            self.node_name, self.port_type, self.port_name
330        )
331    }
332
333    fn __str__(&self) -> String {
334        self.to_string()
335    }
336}
337
338impl std::fmt::Display for PortRef {
339    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340        let port_type_str = match self.port_type {
341            PortType::Input => "in",
342            PortType::Output => "out",
343        };
344        write!(f, "{}.{}.{}", self.node_name, port_type_str, self.port_name)
345    }
346}
347
348/// A connection between two ports in the graph.
349///
350/// Edges connect output ports to input ports, allowing packets to flow
351/// between nodes.
352#[derive(Debug, Clone, PartialEq, Eq, Hash)]
353#[cfg_attr(feature = "python", pyo3::pyclass(eq, frozen, hash, get_all))]
354pub struct Edge {
355    /// The output port where this edge originates.
356    pub source: PortRef,
357    /// The input port where this edge terminates.
358    pub target: PortRef,
359}
360
361#[cfg(feature = "python")]
362#[pyo3::pymethods]
363impl Edge {
364    #[new]
365    fn py_new(source: PortRef, target: PortRef) -> Self {
366        Edge { source, target }
367    }
368
369    fn __repr__(&self) -> String {
370        format!("Edge({}, {})", self.source, self.target)
371    }
372
373    fn __str__(&self) -> String {
374        self.to_string()
375    }
376}
377
378impl std::fmt::Display for Edge {
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        write!(f, "{} -> {}", self.source, self.target)
381    }
382}
383
384/// The static topology of a flow-based network.
385///
386/// A Graph defines the structure of a network: which nodes exist, how they're
387/// connected, and what conditions govern packet flow. The graph is immutable
388/// after creation.
389///
390/// # Example
391///
392/// ```
393/// use netrun_sim::graph::{Graph, Node, Edge, PortRef, PortType, Port, PortSlotSpec};
394/// use std::collections::HashMap;
395///
396/// // Create a simple A -> B graph
397/// let node_a = Node {
398///     name: "A".to_string(),
399///     in_ports: HashMap::new(),
400///     out_ports: [("out".to_string(), Port { slots_spec: PortSlotSpec::Infinite })].into(),
401///     in_salvo_conditions: HashMap::new(),
402///     out_salvo_conditions: HashMap::new(),
403/// };
404/// let node_b = Node {
405///     name: "B".to_string(),
406///     in_ports: [("in".to_string(), Port { slots_spec: PortSlotSpec::Infinite })].into(),
407///     out_ports: HashMap::new(),
408///     in_salvo_conditions: HashMap::new(),
409///     out_salvo_conditions: HashMap::new(),
410/// };
411///
412/// let edge = Edge {
413///     source: PortRef { node_name: "A".to_string(), port_type: PortType::Output, port_name: "out".to_string() },
414///     target: PortRef { node_name: "B".to_string(), port_type: PortType::Input, port_name: "in".to_string() },
415/// };
416///
417/// let graph = Graph::new(vec![node_a, node_b], vec![edge]);
418/// assert!(graph.validate().is_empty());
419/// ```
420#[derive(Debug, Clone)]
421pub struct Graph {
422    nodes: HashMap<NodeName, Node>,
423    edges: HashSet<Edge>,
424    edges_by_tail: HashMap<PortRef, Edge>,
425    edges_by_head: HashMap<PortRef, Vec<Edge>>,
426}
427
428impl Graph {
429    /// Creates a new Graph from a list of nodes and edges.
430    ///
431    /// Builds internal indexes for efficient edge lookups by source (tail) and target (head) ports.
432    pub fn new(nodes: Vec<Node>, edges: Vec<Edge>) -> Self {
433        let nodes_map: HashMap<NodeName, Node> = nodes
434            .into_iter()
435            .map(|node| (node.name.clone(), node))
436            .collect();
437
438        let mut edges_set: HashSet<Edge> = HashSet::new();
439        let mut edges_by_tail: HashMap<PortRef, Edge> = HashMap::new();
440        let mut edges_by_head: HashMap<PortRef, Vec<Edge>> = HashMap::new();
441
442        for edge in edges {
443            edges_by_tail.insert(edge.source.clone(), edge.clone());
444            edges_by_head
445                .entry(edge.target.clone())
446                .or_default()
447                .push(edge.clone());
448            edges_set.insert(edge);
449        }
450
451        Graph {
452            nodes: nodes_map,
453            edges: edges_set,
454            edges_by_tail,
455            edges_by_head,
456        }
457    }
458
459    /// Returns a reference to all nodes in the graph, keyed by name.
460    pub fn nodes(&self) -> &HashMap<NodeName, Node> {
461        &self.nodes
462    }
463
464    /// Returns a reference to all edges in the graph.
465    pub fn edges(&self) -> &HashSet<Edge> {
466        &self.edges
467    }
468
469    /// Returns the edge that has the given output port as its source (tail).
470    pub fn get_edge_by_tail(&self, output_port_ref: &PortRef) -> Option<&Edge> {
471        self.edges_by_tail.get(output_port_ref)
472    }
473
474    /// Returns all edges that have the given input port as their target (head).
475    /// Fan-in is allowed, so multiple edges can connect to the same input port.
476    pub fn get_edges_by_head(&self, input_port_ref: &PortRef) -> &[Edge] {
477        self.edges_by_head
478            .get(input_port_ref)
479            .map(|v| v.as_slice())
480            .unwrap_or(&[])
481    }
482
483    /// Validates the graph structure.
484    ///
485    /// Returns a list of all validation errors found. An empty list means the graph is valid.
486    pub fn validate(&self) -> Vec<GraphValidationError> {
487        let mut errors = Vec::new();
488
489        // Track seen edges to detect duplicates
490        let mut seen_edges: HashSet<(&PortRef, &PortRef)> = HashSet::new();
491
492        // Validate edges
493        for edge in &self.edges {
494            let source = &edge.source;
495            let target = &edge.target;
496
497            // Check for duplicate edges
498            if !seen_edges.insert((source, target)) {
499                errors.push(GraphValidationError::DuplicateEdge {
500                    edge_source: source.clone(),
501                    edge_target: target.clone(),
502                });
503                continue;
504            }
505
506            // Validate source node exists
507            let source_node = match self.nodes.get(&source.node_name) {
508                Some(node) => node,
509                None => {
510                    errors.push(GraphValidationError::EdgeReferencesNonexistentNode {
511                        edge_source: source.clone(),
512                        edge_target: target.clone(),
513                        missing_node: source.node_name.clone(),
514                    });
515                    continue;
516                }
517            };
518
519            // Validate target node exists
520            let target_node = match self.nodes.get(&target.node_name) {
521                Some(node) => node,
522                None => {
523                    errors.push(GraphValidationError::EdgeReferencesNonexistentNode {
524                        edge_source: source.clone(),
525                        edge_target: target.clone(),
526                        missing_node: target.node_name.clone(),
527                    });
528                    continue;
529                }
530            };
531
532            // Validate source is an output port
533            if source.port_type != PortType::Output {
534                errors.push(GraphValidationError::EdgeSourceNotOutputPort {
535                    edge_source: source.clone(),
536                    edge_target: target.clone(),
537                });
538            } else if !source_node.out_ports.contains_key(&source.port_name) {
539                errors.push(GraphValidationError::EdgeReferencesNonexistentPort {
540                    edge_source: source.clone(),
541                    edge_target: target.clone(),
542                    missing_port: source.clone(),
543                });
544            }
545
546            // Validate target is an input port
547            if target.port_type != PortType::Input {
548                errors.push(GraphValidationError::EdgeTargetNotInputPort {
549                    edge_source: source.clone(),
550                    edge_target: target.clone(),
551                });
552            } else if !target_node.in_ports.contains_key(&target.port_name) {
553                errors.push(GraphValidationError::EdgeReferencesNonexistentPort {
554                    edge_source: source.clone(),
555                    edge_target: target.clone(),
556                    missing_port: target.clone(),
557                });
558            }
559        }
560
561        // Check for multiple edges from same output port (fan-out not allowed)
562        let mut edges_from_source: HashMap<&PortRef, usize> = HashMap::new();
563        for edge in &self.edges {
564            *edges_from_source.entry(&edge.source).or_insert(0) += 1;
565        }
566        for (port_ref, count) in edges_from_source {
567            if count > 1 {
568                errors.push(GraphValidationError::MultipleEdgesFromOutputPort {
569                    output_port: port_ref.clone(),
570                    edge_count: count,
571                });
572            }
573        }
574
575        // Validate nodes and their salvo conditions
576        for (node_name, node) in &self.nodes {
577            // Validate input salvo conditions
578            for (cond_name, condition) in &node.in_salvo_conditions {
579                // Input salvo conditions must have max_salvos == Finite(1)
580                if condition.max_salvos != MaxSalvos::Finite(1) {
581                    errors.push(GraphValidationError::InputSalvoConditionInvalidMaxSalvos {
582                        node_name: node_name.clone(),
583                        condition_name: cond_name.clone(),
584                        max_salvos: condition.max_salvos.clone(),
585                    });
586                }
587
588                // Validate ports in condition.ports exist as input ports
589                for port_name in condition.ports.keys() {
590                    if !node.in_ports.contains_key(port_name) {
591                        errors.push(
592                            GraphValidationError::SalvoConditionReferencesNonexistentPort {
593                                node_name: node_name.clone(),
594                                condition_name: cond_name.clone(),
595                                is_input_condition: true,
596                                missing_port: port_name.clone(),
597                            },
598                        );
599                    }
600                }
601
602                // Validate ports in condition.term exist as input ports
603                let mut term_ports = HashSet::new();
604                collect_ports_from_term(&condition.term, &mut term_ports);
605                for port_name in term_ports {
606                    if !node.in_ports.contains_key(&port_name) {
607                        errors.push(
608                            GraphValidationError::SalvoConditionTermReferencesNonexistentPort {
609                                node_name: node_name.clone(),
610                                condition_name: cond_name.clone(),
611                                is_input_condition: true,
612                                missing_port: port_name,
613                            },
614                        );
615                    }
616                }
617            }
618
619            // Validate output salvo conditions
620            for (cond_name, condition) in &node.out_salvo_conditions {
621                // Validate ports in condition.ports exist as output ports
622                for port_name in condition.ports.keys() {
623                    if !node.out_ports.contains_key(port_name) {
624                        errors.push(
625                            GraphValidationError::SalvoConditionReferencesNonexistentPort {
626                                node_name: node_name.clone(),
627                                condition_name: cond_name.clone(),
628                                is_input_condition: false,
629                                missing_port: port_name.clone(),
630                            },
631                        );
632                    }
633                }
634
635                // Validate ports in condition.term exist as output ports
636                let mut term_ports = HashSet::new();
637                collect_ports_from_term(&condition.term, &mut term_ports);
638                for port_name in term_ports {
639                    if !node.out_ports.contains_key(&port_name) {
640                        errors.push(
641                            GraphValidationError::SalvoConditionTermReferencesNonexistentPort {
642                                node_name: node_name.clone(),
643                                condition_name: cond_name.clone(),
644                                is_input_condition: false,
645                                missing_port: port_name,
646                            },
647                        );
648                    }
649                }
650            }
651        }
652
653        errors
654    }
655}
656
657#[cfg(test)]
658#[path = "graph_tests.rs"]
659mod tests;