Skip to main content

netrun_sim/
net.rs

1//! Runtime state and operations for flow-based development networks.
2//!
3//! This module provides the [`NetSim`] type which tracks the runtime state of a network,
4//! including packet locations, epoch lifecycles, and provides actions to control packet flow.
5//!
6//! All mutations to the network state go through [`NetSim::do_action`] which accepts a
7//! [`NetAction`] and returns a [`NetActionResponse`] containing any events that occurred.
8
9use crate::_utils::get_utc_now;
10use crate::graph::{
11    DependencyRequestTrigger, Edge, Graph, MaxSalvos, NodeName, PacketCount, Port, PortName,
12    PortRef, PortSlotSpec, PortType, SalvoConditionName, SalvoConditionTerm,
13    evaluate_salvo_condition,
14};
15use indexmap::IndexSet;
16use std::collections::{HashMap, HashSet};
17use ulid::Ulid;
18
19/// Salvo condition name for request-created epochs.
20pub const REQUEST_SALVO_CONDITION: &str = "__request__";
21
22/// Unique identifier for a packet (ULID).
23pub type PacketID = Ulid;
24
25/// Unique identifier for an epoch (ULID).
26pub type EpochID = Ulid;
27
28/// Where a packet is located in the network.
29///
30/// Packets move through these locations as they flow through the network:
31/// - Start outside the net or get created inside an epoch
32/// - Move to edges, then to input ports
33/// - Get consumed into epochs via salvo conditions
34/// - Can be loaded into output ports and sent back to edges
35#[derive(Debug, PartialEq, Eq, Hash, Clone)]
36pub enum PacketLocation {
37    /// Inside an epoch (either startable or running).
38    Node(EpochID),
39    /// Waiting at a node's input port.
40    InputPort(NodeName, PortName),
41    /// Loaded into an epoch's output port, ready to be sent.
42    OutputPort(EpochID, PortName),
43    /// In transit on an edge between nodes.
44    Edge(Edge),
45    /// External to the network (not yet injected or already extracted).
46    OutsideNet,
47}
48
49/// A unit that flows through the network.
50#[derive(Debug)]
51pub struct Packet {
52    /// Unique identifier for this packet.
53    pub id: PacketID,
54    /// Current location of this packet.
55    pub location: PacketLocation,
56}
57
58/// A collection of packets that enter or exit a node together.
59///
60/// Salvos are created when salvo conditions are satisfied:
61/// - Input salvos are created when packets at input ports trigger an epoch
62/// - Output salvos are created when packets at output ports are sent out
63#[derive(Debug, Clone)]
64pub struct Salvo {
65    /// The name of the salvo condition that was triggered.
66    pub salvo_condition: SalvoConditionName,
67    /// The packets in this salvo, paired with their port names.
68    pub packets: Vec<(PortName, PacketID)>,
69}
70
71/// The lifecycle state of an epoch.
72#[derive(Debug, Clone, PartialEq, Eq)]
73#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int))]
74pub enum EpochState {
75    /// Epoch is created but not yet started. External code must call StartEpoch.
76    Startable,
77    /// Epoch is actively running. Packets can be created, loaded, and sent.
78    Running,
79    /// Epoch has completed. No further operations are allowed.
80    Finished,
81}
82
83#[cfg(feature = "python")]
84#[pyo3::pymethods]
85impl EpochState {
86    fn __repr__(&self) -> String {
87        match self {
88            EpochState::Startable => "EpochState.Startable".to_string(),
89            EpochState::Running => "EpochState.Running".to_string(),
90            EpochState::Finished => "EpochState.Finished".to_string(),
91        }
92    }
93}
94
95/// An execution instance of a node.
96///
97/// A single node can have multiple simultaneous epochs. Each epoch tracks
98/// which packets entered (in_salvo), which have been sent out (out_salvos),
99/// and its current lifecycle state.
100#[derive(Debug, Clone)]
101pub struct Epoch {
102    /// Unique identifier for this epoch.
103    pub id: EpochID,
104    /// The node this epoch is executing on.
105    pub node_name: NodeName,
106    /// The salvo of packets that triggered this epoch.
107    pub in_salvo: Salvo,
108    /// Salvos that have been sent out from this epoch.
109    pub out_salvos: Vec<Salvo>,
110    /// Current lifecycle state.
111    pub state: EpochState,
112    /// Packets that were sent to unconnected output ports (moved to OutsideNet).
113    pub orphaned_packets: Vec<OrphanedPacketInfo>,
114}
115
116/// Information about a packet that was sent to an unconnected output port.
117///
118/// When `send_output_salvo` is called and a port has no connected edge,
119/// the packet is moved to `OutsideNet` and tracked here.
120#[derive(Debug, Clone)]
121pub struct OrphanedPacketInfo {
122    /// The packet that was orphaned.
123    pub packet_id: PacketID,
124    /// The output port name the packet was sent from.
125    pub from_port: PortName,
126    /// The salvo condition that triggered the send.
127    pub salvo_condition: SalvoConditionName,
128}
129
130impl Epoch {
131    /// Returns the timestamp when this epoch was created (milliseconds since Unix epoch).
132    pub fn start_time(&self) -> u64 {
133        self.id.timestamp_ms()
134    }
135}
136
137/// Timestamp in milliseconds (UTC).
138pub type EventUTC = i128;
139
140/// How a request was created — used for undo to know which state to reverse.
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum RequestCreatedSource {
143    /// From a `CreateRequest` action — pushed to `_pending_requests`.
144    External,
145    /// From RunStep Phase 2b on_startup — set `_startup_requests_sent = true`.
146    OnStartup,
147    /// From RunStep Phase 2b on_no_salvo_triggered — spent `_request_tokens[node]`.
148    OnNoSalvoTriggered,
149}
150
151/// A pending packet request waiting to be resolved during RunStep.
152#[derive(Debug, Clone)]
153pub struct PendingRequest {
154    /// The node whose dependency edges initiated the request.
155    pub node_name: NodeName,
156    /// Label for deduplication (same label + same source = one epoch).
157    pub label: String,
158}
159
160/// An action that can be performed on the network.
161///
162/// All mutations to [`NetSim`] state must go through these actions via [`NetSim::do_action`].
163/// This ensures all operations are tracked and produce appropriate events.
164#[derive(Debug, Clone)]
165pub enum NetAction {
166    /// Execute one step of automatic packet flow.
167    ///
168    /// A single step performs one full iteration of the flow loop:
169    /// 1. Move all movable packets from edges to input ports
170    /// 2. Trigger all satisfied input salvo conditions
171    ///
172    /// Returns `StepResult { made_progress }` indicating whether any progress was made.
173    /// If no progress was made, the network is "blocked".
174    RunStep,
175    /// Create a new packet, optionally inside an epoch.
176    /// If `None`, packet is created outside the network.
177    CreatePacket(Option<EpochID>),
178    /// Consume a packet (normal removal from the network).
179    ConsumePacket(PacketID),
180    /// Destroy a packet (abnormal removal, e.g., due to error or cancellation).
181    DestroyPacket(PacketID),
182    /// Transition a startable epoch to running state.
183    StartEpoch(EpochID),
184    /// Complete a running epoch. Fails if epoch still contains packets.
185    FinishEpoch(EpochID),
186    /// Cancel an epoch and destroy all packets inside it.
187    CancelEpoch(EpochID),
188    /// Manually create an epoch with specified packets.
189    /// Bypasses the normal salvo condition triggering mechanism.
190    /// The epoch is created in Startable state - call StartEpoch to begin execution.
191    CreateEpoch(NodeName, Salvo),
192    /// Move a packet from inside an epoch to one of its output ports.
193    LoadPacketIntoOutputPort(PacketID, PortName),
194    /// Send packets from output ports onto edges according to a salvo condition.
195    SendOutputSalvo(EpochID, SalvoConditionName),
196    /// Transport a packet to a new location.
197    /// Restrictions:
198    /// - Cannot move packets into or out of Running epochs (only Startable allowed)
199    /// - Input ports are checked for capacity
200    TransportPacketToLocation(PacketID, PacketLocation),
201    /// Register a pending packet request on a node with a given label.
202    /// The request will be resolved during the next RunStep's Phase 3.
203    CreateRequest(NodeName, String),
204}
205
206/// Errors that can occur when undoing a NetAction
207#[derive(Debug, thiserror::Error)]
208pub enum UndoError {
209    /// Cannot undo because the expected state does not match
210    #[error("state mismatch: {0}")]
211    StateMismatch(String),
212
213    /// Cannot undo because a required entity was not found
214    #[error("entity not found: {0}")]
215    NotFound(String),
216
217    /// Cannot undo because the action type is not undoable
218    #[error("action not undoable: {0}")]
219    NotUndoable(String),
220
221    /// Internal error during undo
222    #[error("internal error: {0}")]
223    InternalError(String),
224}
225
226/// Errors that can occur when performing a NetAction
227#[derive(Debug, thiserror::Error)]
228pub enum NetActionError {
229    /// Packet with the given ID was not found in the network
230    #[error("packet not found: {packet_id}")]
231    PacketNotFound { packet_id: PacketID },
232
233    /// Epoch with the given ID was not found
234    #[error("epoch not found: {epoch_id}")]
235    EpochNotFound { epoch_id: EpochID },
236
237    /// Epoch exists but is not in Running state
238    #[error("epoch {epoch_id} is not running")]
239    EpochNotRunning { epoch_id: EpochID },
240
241    /// Epoch exists but is not in Startable state
242    #[error("epoch {epoch_id} is not startable")]
243    EpochNotStartable { epoch_id: EpochID },
244
245    /// Cannot finish epoch because it still contains packets
246    #[error("cannot finish epoch {epoch_id}: epoch still contains packets")]
247    CannotFinishNonEmptyEpoch { epoch_id: EpochID },
248
249    /// Cannot finish epoch because output port still has unsent packets
250    #[error("cannot finish epoch {epoch_id}: output port '{port_name}' has unsent packets")]
251    UnsentOutputSalvo {
252        epoch_id: EpochID,
253        port_name: PortName,
254    },
255
256    /// Packet is not inside any epoch (not at a Node location)
257    #[error("packet {packet_id} is not inside any epoch")]
258    PacketNotInAnyNode { packet_id: PacketID },
259
260    /// Output port does not exist on the node
261    #[error("output port '{port_name}' not found on node for epoch {epoch_id}")]
262    OutputPortNotFound {
263        port_name: PortName,
264        epoch_id: EpochID,
265    },
266
267    /// Output salvo condition does not exist on the node
268    #[error("output salvo condition '{condition_name}' not found on node for epoch {epoch_id}")]
269    OutputSalvoConditionNotFound {
270        condition_name: SalvoConditionName,
271        epoch_id: EpochID,
272    },
273
274    /// Maximum number of output salvos reached for this condition
275    #[error("max output salvos reached for condition '{condition_name}' on epoch {epoch_id}")]
276    MaxOutputSalvosReached {
277        condition_name: SalvoConditionName,
278        epoch_id: EpochID,
279    },
280
281    /// Output salvo condition is not satisfied
282    #[error("salvo condition '{condition_name}' not met for epoch {epoch_id}")]
283    SalvoConditionNotMet {
284        condition_name: SalvoConditionName,
285        epoch_id: EpochID,
286    },
287
288    /// Output port has reached its capacity
289    #[error("output port '{port_name}' is full for epoch {epoch_id}")]
290    OutputPortFull {
291        port_name: PortName,
292        epoch_id: EpochID,
293    },
294
295    /// Node with the given name was not found in the graph
296    #[error("node not found: '{node_name}'")]
297    NodeNotFound { node_name: NodeName },
298
299    /// Packet is not at the expected input port
300    #[error("packet {packet_id} is not at input port '{port_name}' of node '{node_name}'")]
301    PacketNotAtInputPort {
302        packet_id: PacketID,
303        port_name: PortName,
304        node_name: NodeName,
305    },
306
307    /// Input port does not exist on the node
308    #[error("input port '{port_name}' not found on node '{node_name}'")]
309    InputPortNotFound {
310        port_name: PortName,
311        node_name: NodeName,
312    },
313
314    /// Input port has reached its capacity
315    #[error("input port '{port_name}' on node '{node_name}' is full")]
316    InputPortFull {
317        port_name: PortName,
318        node_name: NodeName,
319    },
320
321    /// Cannot move packet out of a running epoch
322    #[error("cannot move packet {packet_id} out of running epoch {epoch_id}")]
323    CannotMovePacketFromRunningEpoch {
324        packet_id: PacketID,
325        epoch_id: EpochID,
326    },
327
328    /// Cannot move packet into a running epoch
329    #[error("cannot move packet {packet_id} into running epoch {epoch_id}")]
330    CannotMovePacketIntoRunningEpoch {
331        packet_id: PacketID,
332        epoch_id: EpochID,
333    },
334
335    /// Edge does not exist in the graph
336    #[error("edge not found: {edge}")]
337    EdgeNotFound { edge: Edge },
338
339    /// Cycle detected during request cascade
340    #[error("request cascade cycle detected at node '{node_name}'")]
341    RequestCycleDetected { node_name: NodeName },
342
343    /// Unconnected input port encountered during request cascade
344    #[error("request cascade reached unconnected input port '{port_name}' on node '{node_name}'")]
345    RequestUnconnectedPort {
346        node_name: NodeName,
347        port_name: PortName,
348    },
349
350    /// Request targets a non-existent node
351    #[error("request targets non-existent node '{node_name}'")]
352    RequestNodeNotFound { node_name: NodeName },
353}
354
355/// An event that occurred during a network action.
356///
357/// Events provide a complete audit trail of all state changes in the network.
358/// Each event includes a timestamp and relevant identifiers.
359/// Events contain all information needed to undo the operation.
360#[derive(Debug, Clone)]
361pub enum NetEvent {
362    /// A new packet was created.
363    PacketCreated(EventUTC, PacketID),
364    /// A packet was consumed (normal removal from the network).
365    /// Includes the packet's location before consumption for undo support.
366    PacketConsumed(EventUTC, PacketID, PacketLocation),
367    /// A packet was destroyed (abnormal removal, e.g., epoch cancellation).
368    /// Includes the packet's location before destruction for undo support.
369    PacketDestroyed(EventUTC, PacketID, PacketLocation),
370    /// A new epoch was created (in Startable state).
371    EpochCreated(EventUTC, EpochID),
372    /// An epoch transitioned from Startable to Running.
373    EpochStarted(EventUTC, EpochID),
374    /// An epoch completed successfully.
375    /// Includes the full epoch state for undo support.
376    EpochFinished(EventUTC, Epoch),
377    /// An epoch was cancelled.
378    /// Includes the full epoch state for undo support.
379    EpochCancelled(EventUTC, Epoch),
380    /// A packet moved from one location to another.
381    /// Includes the index in the source location for perfect undo restoration.
382    /// (timestamp, packet_id, from_location, to_location, from_index)
383    PacketMoved(EventUTC, PacketID, PacketLocation, PacketLocation, usize),
384    /// An input salvo condition was triggered, creating an epoch.
385    InputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
386    /// An output salvo condition was triggered, sending packets.
387    OutputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
388    /// A packet was sent to an unconnected output port and moved to OutsideNet.
389    /// (timestamp, packet_id, epoch_id, node_name, port_name, salvo_condition)
390    PacketOrphaned(
391        EventUTC,
392        PacketID,
393        EpochID,
394        NodeName,
395        PortName,
396        SalvoConditionName,
397    ),
398    /// A packet request was created (registered as pending).
399    /// (timestamp, node_name, label, source)
400    RequestCreated(EventUTC, NodeName, String, RequestCreatedSource),
401    /// A request cascade was resolved, identifying source nodes.
402    /// (timestamp, source_nodes, label)
403    RequestCascadeResolved(EventUTC, Vec<NodeName>, String),
404    /// An epoch was created at a source node as a result of a request cascade.
405    /// (timestamp, epoch_id, source_node_name, label)
406    RequestEpochCreated(EventUTC, EpochID, NodeName, String),
407}
408
409/// Data returned by a successful network action.
410#[derive(Debug, Clone)]
411pub enum NetActionResponseData {
412    /// Result of RunStep: whether any progress was made.
413    StepResult {
414        /// True if any packets moved or epochs were created.
415        /// False if the network is blocked.
416        made_progress: bool,
417    },
418    /// A packet ID (returned by CreatePacket).
419    Packet(PacketID),
420    /// The created epoch in Startable state (returned by CreateEpoch).
421    CreatedEpoch(Epoch),
422    /// The started epoch (returned by StartEpoch).
423    StartedEpoch(Epoch),
424    /// The finished epoch (returned by FinishEpoch).
425    FinishedEpoch(Epoch),
426    /// The cancelled epoch and IDs of destroyed packets (returned by CancelEpoch).
427    CancelledEpoch(Epoch, Vec<PacketID>),
428    /// No specific data (returned by ConsumePacket, DestroyPacket, etc.).
429    None,
430}
431
432/// The result of performing a network action.
433#[derive(Debug)]
434pub enum NetActionResponse {
435    /// Action succeeded, with optional data and a list of events that occurred.
436    Success(NetActionResponseData, Vec<NetEvent>),
437    /// Action failed with an error.
438    Error(NetActionError),
439}
440
441/// The runtime state of a flow-based network.
442///
443/// A `NetSim` is created from a [`Graph`] and tracks:
444/// - All packets and their locations
445/// - All epochs and their states
446/// - Which epochs are startable
447///
448/// All mutations must go through [`NetSim::do_action`] to ensure proper event tracking.
449#[derive(Debug)]
450pub struct NetSim {
451    /// The graph topology this network is running on.
452    pub graph: Graph,
453    _packets: HashMap<PacketID, Packet>,
454    _packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>>,
455    _epochs: HashMap<EpochID, Epoch>,
456    _startable_epochs: HashSet<EpochID>,
457    _node_to_epochs: HashMap<NodeName, Vec<EpochID>>,
458    _pending_requests: Vec<PendingRequest>,
459    _request_tokens: HashMap<NodeName, bool>,
460    _startup_requests_sent: bool,
461}
462
463impl NetSim {
464    /// Creates a new net simulation from a Graph.
465    ///
466    /// Initializes packet location tracking for all edges and input ports.
467    pub fn new(graph: Graph) -> Self {
468        let errors = graph.validate();
469        if !errors.is_empty() {
470            let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
471            panic!(
472                "Cannot create NetSim with invalid graph:\n  - {}",
473                msgs.join("\n  - ")
474            );
475        }
476
477        let mut packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>> = HashMap::new();
478
479        // Initialize empty packet sets for all edges
480        for edge in graph.edges() {
481            packets_by_location.insert(PacketLocation::Edge(edge.clone()), IndexSet::new());
482        }
483
484        // Initialize empty packet sets for all input ports
485        for (node_name, node) in graph.nodes() {
486            for port_name in node.in_ports.keys() {
487                packets_by_location.insert(
488                    PacketLocation::InputPort(node_name.clone(), port_name.clone()),
489                    IndexSet::new(),
490                );
491            }
492        }
493
494        // Initialize OutsideNet location for packets created outside the network
495        packets_by_location.insert(PacketLocation::OutsideNet, IndexSet::new());
496
497        // Note: Output port locations are created per-epoch when epochs are created
498        // Note: Node locations (inside epochs) are created when epochs are created
499
500        // Initialize request tokens for nodes with OnNoSalvoTriggered trigger
501        let mut request_tokens: HashMap<NodeName, bool> = HashMap::new();
502        for (node_name, node) in graph.nodes() {
503            if let Some(config) = &node.dependency_request_config
504                && config
505                    .triggers
506                    .contains(&DependencyRequestTrigger::OnNoSalvoTriggered)
507            {
508                request_tokens.insert(node_name.clone(), true);
509            }
510        }
511
512        NetSim {
513            graph,
514            _packets: HashMap::new(),
515            _packets_by_location: packets_by_location,
516            _epochs: HashMap::new(),
517            _startable_epochs: HashSet::new(),
518            _node_to_epochs: HashMap::new(),
519            _pending_requests: Vec::new(),
520            _request_tokens: request_tokens,
521            _startup_requests_sent: false,
522        }
523    }
524
525    fn move_packet(&mut self, packet_id: &PacketID, new_location: PacketLocation) {
526        let packet = self._packets.get_mut(packet_id).unwrap();
527        let packets_at_old_location = self
528            ._packets_by_location
529            .get_mut(&packet.location)
530            .expect("Packet location has no entry in self._packets_by_location.");
531        packets_at_old_location.shift_remove(packet_id);
532        packet.location = new_location;
533        if !self
534            ._packets_by_location
535            .get_mut(&packet.location)
536            .expect("Packet location has no entry in self._packets_by_location")
537            .insert(*packet_id)
538        {
539            panic!("Attempted to move packet to a location that already contains it.");
540        }
541    }
542
543    // NetActions
544
545    /// Helper: Try to trigger an input salvo condition for a node.
546    /// Returns (triggered: bool, events: Vec<NetEvent>).
547    fn try_trigger_input_salvo(&mut self, node_name: &NodeName) -> (bool, Vec<NetEvent>) {
548        let mut events: Vec<NetEvent> = Vec::new();
549
550        let node = match self.graph.nodes().get(node_name) {
551            Some(n) => n,
552            None => return (false, events),
553        };
554
555        let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
556        let in_ports_clone: HashMap<PortName, Port> = node.in_ports.clone();
557
558        // Collect salvo condition data
559        struct SalvoConditionData {
560            name: SalvoConditionName,
561            ports: HashMap<PortName, PacketCount>,
562            term: SalvoConditionTerm,
563        }
564
565        let salvo_conditions: Vec<SalvoConditionData> = node
566            .in_salvo_conditions
567            .iter()
568            .map(|(name, cond)| SalvoConditionData {
569                name: name.clone(),
570                ports: cond.ports.clone(),
571                term: cond.term.clone(),
572            })
573            .collect();
574
575        // Check salvo conditions in order - first satisfied one wins
576        for salvo_cond_data in salvo_conditions {
577            // Calculate packet counts for all input ports
578            let port_packet_counts: HashMap<PortName, u64> = in_port_names
579                .iter()
580                .map(|port_name| {
581                    let count = self
582                        ._packets_by_location
583                        .get(&PacketLocation::InputPort(
584                            node_name.clone(),
585                            port_name.clone(),
586                        ))
587                        .map(|packets| packets.len() as u64)
588                        .unwrap_or(0);
589                    (port_name.clone(), count)
590                })
591                .collect();
592
593            // Check if salvo condition is satisfied
594            if evaluate_salvo_condition(&salvo_cond_data.term, &port_packet_counts, &in_ports_clone)
595            {
596                // Create a new epoch
597                let epoch_id = Ulid::new();
598
599                // Collect packets from the ports listed in salvo_condition.ports
600                // Store (packet_id, port_name) for each packet to move
601                let mut salvo_packets: Vec<(PortName, PacketID)> = Vec::new();
602                let mut packets_to_move: Vec<(PacketID, PortName)> = Vec::new();
603
604                for (port_name, packet_count) in &salvo_cond_data.ports {
605                    let port_location =
606                        PacketLocation::InputPort(node_name.clone(), port_name.clone());
607                    if let Some(packet_ids) = self._packets_by_location.get(&port_location) {
608                        let take_count = match packet_count {
609                            PacketCount::All => packet_ids.len(),
610                            PacketCount::Count(n) => std::cmp::min(*n as usize, packet_ids.len()),
611                        };
612                        for pid in packet_ids.iter().take(take_count) {
613                            salvo_packets.push((port_name.clone(), *pid));
614                            packets_to_move.push((*pid, port_name.clone()));
615                        }
616                    }
617                }
618
619                // Create the salvo
620                let in_salvo = Salvo {
621                    salvo_condition: salvo_cond_data.name.clone(),
622                    packets: salvo_packets,
623                };
624
625                // Create the epoch
626                let epoch = Epoch {
627                    id: epoch_id,
628                    node_name: node_name.clone(),
629                    in_salvo,
630                    out_salvos: Vec::new(),
631                    state: EpochState::Startable,
632                    orphaned_packets: Vec::new(),
633                };
634
635                // Register the epoch
636                self._epochs.insert(epoch_id, epoch);
637                self._startable_epochs.insert(epoch_id);
638                self._node_to_epochs
639                    .entry(node_name.clone())
640                    .or_default()
641                    .push(epoch_id);
642
643                // Create a location entry for packets inside the epoch
644                let epoch_location = PacketLocation::Node(epoch_id);
645                self._packets_by_location
646                    .insert(epoch_location.clone(), IndexSet::new());
647
648                // Create output port location entries for this epoch
649                let node = self
650                    .graph
651                    .nodes()
652                    .get(node_name)
653                    .expect("Node not found for epoch creation");
654                for out_port_name in node.out_ports.keys() {
655                    let output_port_location =
656                        PacketLocation::OutputPort(epoch_id, out_port_name.clone());
657                    self._packets_by_location
658                        .insert(output_port_location, IndexSet::new());
659                }
660
661                // Emit events in logical order:
662                // 1. InputSalvoTriggered - the condition was met, triggering epoch creation
663                // 2. EpochCreated - the epoch is created as a result
664                // 3. PacketMoved - packets move into the newly created epoch
665                events.push(NetEvent::InputSalvoTriggered(
666                    get_utc_now(),
667                    epoch_id,
668                    salvo_cond_data.name.clone(),
669                ));
670                events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
671
672                // Move packets from input ports into the epoch
673                for (pid, port_name) in &packets_to_move {
674                    let from_location =
675                        PacketLocation::InputPort(node_name.clone(), port_name.clone());
676
677                    // Get the fresh index of this packet before moving (indices shift after each move)
678                    let from_index = self
679                        ._packets_by_location
680                        .get(&from_location)
681                        .and_then(|packets| packets.get_index_of(pid))
682                        .expect("Packet should exist at from_location");
683
684                    self.move_packet(pid, epoch_location.clone());
685                    events.push(NetEvent::PacketMoved(
686                        get_utc_now(),
687                        *pid,
688                        from_location,
689                        epoch_location.clone(),
690                        from_index,
691                    ));
692                }
693
694                // Only one salvo condition can trigger per node per call
695                return (true, events);
696            }
697        }
698
699        (false, events)
700    }
701
702    fn run_step(&mut self) -> NetActionResponse {
703        let mut all_events: Vec<NetEvent> = Vec::new();
704        let mut made_progress = false;
705
706        // Phase 1: Move packets from edges to input ports
707        // Collect all edge locations and their first packet (FIFO)
708        // We need to extract data before mutating to avoid borrow issues
709        struct EdgeMoveCandidate {
710            packet_id: PacketID,
711            from_location: PacketLocation,
712            from_index: usize,
713            input_port_location: PacketLocation,
714            can_move: bool,
715        }
716
717        let mut edge_candidates: Vec<EdgeMoveCandidate> = Vec::new();
718
719        // Iterate through all edge locations in _packets_by_location
720        for (location, packets) in &self._packets_by_location {
721            if let PacketLocation::Edge(edge_ref) = location {
722                // Get the first packet (FIFO order)
723                if let Some(first_packet_id) = packets.first() {
724                    let target_node_name = edge_ref.target.node_name.clone();
725                    let target_port_name = edge_ref.target.port_name.clone();
726
727                    // Check if the target input port has space
728                    let node = self
729                        .graph
730                        .nodes()
731                        .get(&target_node_name)
732                        .expect("Edge targets a non-existent node");
733                    let port = node
734                        .in_ports
735                        .get(&target_port_name)
736                        .expect("Edge targets a non-existent input port");
737
738                    let input_port_location = PacketLocation::InputPort(
739                        target_node_name.clone(),
740                        target_port_name.clone(),
741                    );
742                    let current_count = self
743                        ._packets_by_location
744                        .get(&input_port_location)
745                        .map(|packets| packets.len() as u64)
746                        .unwrap_or(0);
747
748                    let can_move = match port.slots_spec {
749                        PortSlotSpec::Infinite => true,
750                        PortSlotSpec::Finite(max_slots) => current_count < max_slots,
751                    };
752
753                    edge_candidates.push(EdgeMoveCandidate {
754                        packet_id: *first_packet_id,
755                        from_location: location.clone(),
756                        from_index: 0, // First packet is always at index 0
757                        input_port_location,
758                        can_move,
759                    });
760                }
761            }
762        }
763
764        // Phase 1: Move packets from edges to input ports
765        for candidate in edge_candidates {
766            if !candidate.can_move {
767                continue;
768            }
769
770            // Move the packet to the input port
771            self.move_packet(&candidate.packet_id, candidate.input_port_location.clone());
772            all_events.push(NetEvent::PacketMoved(
773                get_utc_now(),
774                candidate.packet_id,
775                candidate.from_location,
776                candidate.input_port_location.clone(),
777                candidate.from_index,
778            ));
779            made_progress = true;
780        }
781
782        // Phase 2: Check salvo conditions for all nodes with packets at input ports
783        let mut nodes_with_input_packets: Vec<NodeName> = Vec::new();
784        for (location, packets) in &self._packets_by_location {
785            if let PacketLocation::InputPort(node_name, _) = location
786                && !packets.is_empty()
787                && !nodes_with_input_packets.contains(node_name)
788            {
789                nodes_with_input_packets.push(node_name.clone());
790            }
791        }
792
793        let mut nodes_that_triggered_salvo: HashSet<NodeName> = HashSet::new();
794        for node_name in &nodes_with_input_packets {
795            let (triggered, events) = self.try_trigger_input_salvo(node_name);
796            all_events.extend(events);
797            if triggered {
798                made_progress = true;
799                nodes_that_triggered_salvo.insert(node_name.clone());
800            }
801        }
802
803        // Phase 2b: Auto-request generation
804
805        // on_startup: first RunStep only (flag set only when startup nodes exist,
806        // so graphs without them skip this cheaply; undo reverses via OnStartup events)
807        if !self._startup_requests_sent {
808            let mut found_startup = false;
809            for (node_name, node) in self.graph.nodes() {
810                if let Some(config) = &node.dependency_request_config
811                    && config
812                        .triggers
813                        .contains(&DependencyRequestTrigger::OnStartup)
814                {
815                    found_startup = true;
816                    all_events.push(NetEvent::RequestCreated(
817                        get_utc_now(),
818                        node_name.clone(),
819                        config.label.clone(),
820                        RequestCreatedSource::OnStartup,
821                    ));
822                    self._pending_requests.push(PendingRequest {
823                        node_name: node_name.clone(),
824                        label: config.label.clone(),
825                    });
826                }
827            }
828            if found_startup {
829                self._startup_requests_sent = true;
830            }
831        }
832
833        // on_no_salvo_triggered: for nodes that had packets but no salvo triggered
834        {
835            let no_salvo_nodes: Vec<(NodeName, String)> = nodes_with_input_packets
836                .iter()
837                .filter(|node_name| !nodes_that_triggered_salvo.contains(*node_name))
838                .filter_map(|node_name| {
839                    let node = self.graph.nodes().get(node_name)?;
840                    let config = node.dependency_request_config.as_ref()?;
841                    if config
842                        .triggers
843                        .contains(&DependencyRequestTrigger::OnNoSalvoTriggered)
844                        && self._request_tokens.get(node_name) == Some(&true)
845                    {
846                        Some((node_name.clone(), config.label.clone()))
847                    } else {
848                        None
849                    }
850                })
851                .collect();
852
853            for (node_name, label) in no_salvo_nodes {
854                // Spend token
855                self._request_tokens.insert(node_name.clone(), false);
856                all_events.push(NetEvent::RequestCreated(
857                    get_utc_now(),
858                    node_name.clone(),
859                    label.clone(),
860                    RequestCreatedSource::OnNoSalvoTriggered,
861                ));
862                self._pending_requests
863                    .push(PendingRequest { node_name, label });
864            }
865        }
866
867        // Phase 3: Resolve pending packet requests
868        if !self._pending_requests.is_empty() {
869            let pending_requests: Vec<PendingRequest> = self._pending_requests.drain(..).collect();
870
871            // For each request, find dependency-edge input ports and cascade backward
872            // Group results by (source_node, label) for deduplication
873            let mut source_label_pairs: Vec<(NodeName, String)> = Vec::new();
874
875            for request in &pending_requests {
876                // Find input ports on this node connected via dependency edges
877                let start_ports: Vec<PortRef> = self
878                    .graph
879                    .dependency_edges()
880                    .iter()
881                    .filter(|dep_edge| dep_edge.target.node_name == request.node_name)
882                    .map(|dep_edge| dep_edge.target.clone())
883                    .collect();
884
885                if start_ports.is_empty() {
886                    continue;
887                }
888
889                // Run backward cascade
890                match self.graph.cascade_backward(&start_ports) {
891                    Ok(result) => {
892                        all_events.push(NetEvent::RequestCascadeResolved(
893                            get_utc_now(),
894                            result.source_nodes.clone(),
895                            request.label.clone(),
896                        ));
897
898                        for source_node in &result.source_nodes {
899                            let pair = (source_node.clone(), request.label.clone());
900                            if !source_label_pairs.contains(&pair) {
901                                source_label_pairs.push(pair);
902                            }
903                        }
904                    }
905                    Err(crate::graph::CascadeError::CycleDetected { node_name }) => {
906                        return NetActionResponse::Error(NetActionError::RequestCycleDetected {
907                            node_name,
908                        });
909                    }
910                    Err(crate::graph::CascadeError::UnconnectedInputPort {
911                        node_name,
912                        port_name,
913                    }) => {
914                        return NetActionResponse::Error(NetActionError::RequestUnconnectedPort {
915                            node_name,
916                            port_name,
917                        });
918                    }
919                }
920            }
921
922            // Create epochs at source nodes (one per unique (source_node, label) pair)
923            for (source_node_name, label) in &source_label_pairs {
924                let epoch_id = Ulid::new();
925                let in_salvo = Salvo {
926                    salvo_condition: REQUEST_SALVO_CONDITION.to_string(),
927                    packets: vec![],
928                };
929
930                let node = self
931                    .graph
932                    .nodes()
933                    .get(source_node_name)
934                    .expect("Source node from cascade should exist");
935
936                let epoch = Epoch {
937                    id: epoch_id,
938                    node_name: source_node_name.clone(),
939                    in_salvo,
940                    out_salvos: Vec::new(),
941                    state: EpochState::Startable,
942                    orphaned_packets: Vec::new(),
943                };
944
945                self._epochs.insert(epoch_id, epoch);
946                self._startable_epochs.insert(epoch_id);
947                self._node_to_epochs
948                    .entry(source_node_name.clone())
949                    .or_default()
950                    .push(epoch_id);
951
952                // Create location entries
953                let epoch_location = PacketLocation::Node(epoch_id);
954                self._packets_by_location
955                    .insert(epoch_location, IndexSet::new());
956                for out_port_name in node.out_ports.keys() {
957                    let output_port_location =
958                        PacketLocation::OutputPort(epoch_id, out_port_name.clone());
959                    self._packets_by_location
960                        .insert(output_port_location, IndexSet::new());
961                }
962
963                all_events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
964                all_events.push(NetEvent::RequestEpochCreated(
965                    get_utc_now(),
966                    epoch_id,
967                    source_node_name.clone(),
968                    label.clone(),
969                ));
970                made_progress = true;
971            }
972        }
973
974        NetActionResponse::Success(
975            NetActionResponseData::StepResult { made_progress },
976            all_events,
977        )
978    }
979
980    fn create_packet(&mut self, maybe_epoch_id: &Option<EpochID>) -> NetActionResponse {
981        // Check that epoch_id exists and is running
982        if let Some(epoch_id) = maybe_epoch_id {
983            if !self._epochs.contains_key(epoch_id) {
984                return NetActionResponse::Error(NetActionError::EpochNotFound {
985                    epoch_id: *epoch_id,
986                });
987            }
988            if !matches!(self._epochs[epoch_id].state, EpochState::Running) {
989                return NetActionResponse::Error(NetActionError::EpochNotRunning {
990                    epoch_id: *epoch_id,
991                });
992            }
993        }
994
995        let packet_location = match maybe_epoch_id {
996            Some(epoch_id) => PacketLocation::Node(*epoch_id),
997            None => PacketLocation::OutsideNet,
998        };
999
1000        let packet = Packet {
1001            id: Ulid::new(),
1002            location: packet_location.clone(),
1003        };
1004
1005        let packet_id = packet.id;
1006        self._packets.insert(packet.id, packet);
1007
1008        // Add packet to location index
1009        self._packets_by_location
1010            .entry(packet_location)
1011            .or_default()
1012            .insert(packet_id);
1013
1014        NetActionResponse::Success(
1015            NetActionResponseData::Packet(packet_id),
1016            vec![NetEvent::PacketCreated(get_utc_now(), packet_id)],
1017        )
1018    }
1019
1020    fn consume_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
1021        if !self._packets.contains_key(packet_id) {
1022            return NetActionResponse::Error(NetActionError::PacketNotFound {
1023                packet_id: *packet_id,
1024            });
1025        }
1026
1027        let location = self._packets[packet_id].location.clone();
1028
1029        if let Some(packets) = self._packets_by_location.get_mut(&location) {
1030            if packets.shift_remove(packet_id) {
1031                self._packets.remove(packet_id);
1032                NetActionResponse::Success(
1033                    NetActionResponseData::None,
1034                    vec![NetEvent::PacketConsumed(
1035                        get_utc_now(),
1036                        *packet_id,
1037                        location,
1038                    )],
1039                )
1040            } else {
1041                panic!(
1042                    "Packet with ID {} not found in location {:?}",
1043                    packet_id, location
1044                );
1045            }
1046        } else {
1047            panic!("Packet location {:?} not found", location);
1048        }
1049    }
1050
1051    fn destroy_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
1052        if !self._packets.contains_key(packet_id) {
1053            return NetActionResponse::Error(NetActionError::PacketNotFound {
1054                packet_id: *packet_id,
1055            });
1056        }
1057
1058        let location = self._packets[packet_id].location.clone();
1059
1060        if let Some(packets) = self._packets_by_location.get_mut(&location) {
1061            if packets.shift_remove(packet_id) {
1062                self._packets.remove(packet_id);
1063                NetActionResponse::Success(
1064                    NetActionResponseData::None,
1065                    vec![NetEvent::PacketDestroyed(
1066                        get_utc_now(),
1067                        *packet_id,
1068                        location,
1069                    )],
1070                )
1071            } else {
1072                panic!(
1073                    "Packet with ID {} not found in location {:?}",
1074                    packet_id, location
1075                );
1076            }
1077        } else {
1078            panic!("Packet location {:?} not found", location);
1079        }
1080    }
1081
1082    fn start_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
1083        if let Some(epoch) = self._epochs.get_mut(epoch_id) {
1084            if !self._startable_epochs.contains(epoch_id) {
1085                return NetActionResponse::Error(NetActionError::EpochNotStartable {
1086                    epoch_id: *epoch_id,
1087                });
1088            }
1089            debug_assert!(
1090                matches!(epoch.state, EpochState::Startable),
1091                "Epoch state is not Startable but was in net._startable_epochs."
1092            );
1093            epoch.state = EpochState::Running;
1094            self._startable_epochs.remove(epoch_id);
1095            NetActionResponse::Success(
1096                NetActionResponseData::StartedEpoch(epoch.clone()),
1097                vec![NetEvent::EpochStarted(get_utc_now(), *epoch_id)],
1098            )
1099        } else {
1100            NetActionResponse::Error(NetActionError::EpochNotFound {
1101                epoch_id: *epoch_id,
1102            })
1103        }
1104    }
1105
1106    /// Set the request token for a node if it has an `OnNoSalvoTriggered` trigger.
1107    /// Called with `true` to replenish (on epoch finish/cancel) or `false` to reverse (on undo).
1108    fn set_request_token(&mut self, node_name: &NodeName, value: bool) {
1109        if let Some(node) = self.graph.nodes().get(node_name)
1110            && let Some(config) = &node.dependency_request_config
1111            && config
1112                .triggers
1113                .contains(&DependencyRequestTrigger::OnNoSalvoTriggered)
1114        {
1115            self._request_tokens.insert(node_name.clone(), value);
1116        }
1117    }
1118
1119    fn finish_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
1120        // Check if epoch exists
1121        let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
1122            epoch.clone()
1123        } else {
1124            return NetActionResponse::Error(NetActionError::EpochNotFound {
1125                epoch_id: *epoch_id,
1126            });
1127        };
1128
1129        // Check if epoch is running
1130        if epoch.state != EpochState::Running {
1131            return NetActionResponse::Error(NetActionError::EpochNotRunning {
1132                epoch_id: *epoch_id,
1133            });
1134        }
1135
1136        // No packets may remain inside the epoch
1137        let epoch_loc = PacketLocation::Node(*epoch_id);
1138        if let Some(packets) = self._packets_by_location.get(&epoch_loc) {
1139            if !packets.is_empty() {
1140                return NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch {
1141                    epoch_id: *epoch_id,
1142                });
1143            }
1144        } else {
1145            panic!("Epoch {} not found in location {:?}", epoch_id, epoch_loc);
1146        }
1147
1148        // No packets may remain in output ports (unsent salvos)
1149        let node = self
1150            .graph
1151            .nodes()
1152            .get(&epoch.node_name)
1153            .expect("Epoch references non-existent node");
1154        for port_name in node.out_ports.keys() {
1155            let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1156            if let Some(packets) = self._packets_by_location.get(&output_port_loc)
1157                && !packets.is_empty()
1158            {
1159                return NetActionResponse::Error(NetActionError::UnsentOutputSalvo {
1160                    epoch_id: *epoch_id,
1161                    port_name: port_name.clone(),
1162                });
1163            }
1164        }
1165
1166        // All checks passed - finish the epoch
1167        // Collect port names before we release the borrow on graph
1168        let out_port_names: Vec<String> = node.out_ports.keys().cloned().collect();
1169
1170        // Clone epoch state before modifying for the event (captures Running state)
1171        let epoch_before_finish = self._epochs[epoch_id].clone();
1172
1173        // Replenish request token if node has OnNoSalvoTriggered trigger
1174        let epoch_node_name = self._epochs[epoch_id].node_name.clone();
1175        self.set_request_token(&epoch_node_name, true);
1176
1177        let mut epoch = self._epochs.remove(epoch_id).unwrap();
1178        epoch.state = EpochState::Finished;
1179
1180        // Clean up location entries
1181        self._packets_by_location.remove(&epoch_loc);
1182        for port_name in &out_port_names {
1183            let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1184            self._packets_by_location.remove(&output_port_loc);
1185        }
1186
1187        // Remove from _node_to_epochs
1188        if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
1189            epoch_ids.retain(|id| id != epoch_id);
1190        }
1191
1192        NetActionResponse::Success(
1193            NetActionResponseData::FinishedEpoch(epoch),
1194            vec![NetEvent::EpochFinished(get_utc_now(), epoch_before_finish)],
1195        )
1196    }
1197
1198    fn cancel_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
1199        // Check if epoch exists and capture it for the event
1200        let epoch_for_event = if let Some(epoch) = self._epochs.get(epoch_id) {
1201            epoch.clone()
1202        } else {
1203            return NetActionResponse::Error(NetActionError::EpochNotFound {
1204                epoch_id: *epoch_id,
1205            });
1206        };
1207
1208        // Replenish request token if node has OnNoSalvoTriggered trigger
1209        self.set_request_token(&epoch_for_event.node_name, true);
1210
1211        let mut events: Vec<NetEvent> = Vec::new();
1212        let mut destroyed_packets: Vec<PacketID> = Vec::new();
1213
1214        // Collect packets inside the epoch (Node location)
1215        let epoch_location = PacketLocation::Node(*epoch_id);
1216        if let Some(packet_ids) = self._packets_by_location.get(&epoch_location) {
1217            destroyed_packets.extend(packet_ids.iter().cloned());
1218        }
1219
1220        // Collect packets in the epoch's output ports
1221        let node = self
1222            .graph
1223            .nodes()
1224            .get(&epoch_for_event.node_name)
1225            .expect("Epoch references non-existent node");
1226        for port_name in node.out_ports.keys() {
1227            let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1228            if let Some(packet_ids) = self._packets_by_location.get(&output_port_location) {
1229                destroyed_packets.extend(packet_ids.iter().cloned());
1230            }
1231        }
1232
1233        // Remove packets from _packets and _packets_by_location, emit events with location
1234        for packet_id in &destroyed_packets {
1235            let packet = self
1236                ._packets
1237                .remove(packet_id)
1238                .expect("Packet in location map not found in packets map");
1239            let packet_location = packet.location.clone();
1240            if let Some(packets_at_location) = self._packets_by_location.get_mut(&packet_location) {
1241                packets_at_location.shift_remove(packet_id);
1242            }
1243            events.push(NetEvent::PacketDestroyed(
1244                get_utc_now(),
1245                *packet_id,
1246                packet_location,
1247            ));
1248        }
1249
1250        // Remove output port location entries for this epoch
1251        for port_name in node.out_ports.keys() {
1252            let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1253            self._packets_by_location.remove(&output_port_location);
1254        }
1255
1256        // Remove the epoch's node location entry
1257        self._packets_by_location.remove(&epoch_location);
1258
1259        // Update _startable_epochs if epoch was startable
1260        self._startable_epochs.remove(epoch_id);
1261
1262        // Update _node_to_epochs
1263        if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch_for_event.node_name) {
1264            epoch_ids.retain(|id| id != epoch_id);
1265        }
1266
1267        // Remove epoch from _epochs
1268        let epoch = self._epochs.remove(epoch_id).expect("Epoch should exist");
1269
1270        events.push(NetEvent::EpochCancelled(get_utc_now(), epoch_for_event));
1271
1272        NetActionResponse::Success(
1273            NetActionResponseData::CancelledEpoch(epoch, destroyed_packets),
1274            events,
1275        )
1276    }
1277
1278    fn create_epoch(&mut self, node_name: &NodeName, salvo: &Salvo) -> NetActionResponse {
1279        // Validate node exists
1280        let node = match self.graph.nodes().get(node_name) {
1281            Some(node) => node,
1282            None => {
1283                return NetActionResponse::Error(NetActionError::NodeNotFound {
1284                    node_name: node_name.clone(),
1285                });
1286            }
1287        };
1288
1289        // Validate all packets in salvo
1290        for (port_name, packet_id) in &salvo.packets {
1291            // Validate input port exists
1292            if !node.in_ports.contains_key(port_name) {
1293                return NetActionResponse::Error(NetActionError::InputPortNotFound {
1294                    port_name: port_name.clone(),
1295                    node_name: node_name.clone(),
1296                });
1297            }
1298
1299            // Validate packet exists
1300            let packet = match self._packets.get(packet_id) {
1301                Some(packet) => packet,
1302                None => {
1303                    return NetActionResponse::Error(NetActionError::PacketNotFound {
1304                        packet_id: *packet_id,
1305                    });
1306                }
1307            };
1308
1309            // Validate packet is at the input port of this node
1310            let expected_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1311            if packet.location != expected_location {
1312                return NetActionResponse::Error(NetActionError::PacketNotAtInputPort {
1313                    packet_id: *packet_id,
1314                    port_name: port_name.clone(),
1315                    node_name: node_name.clone(),
1316                });
1317            }
1318        }
1319
1320        let mut events: Vec<NetEvent> = Vec::new();
1321
1322        // Create the epoch in Startable state
1323        let epoch_id = Ulid::new();
1324        let epoch = Epoch {
1325            id: epoch_id,
1326            node_name: node_name.clone(),
1327            in_salvo: salvo.clone(),
1328            out_salvos: Vec::new(),
1329            state: EpochState::Startable,
1330            orphaned_packets: Vec::new(),
1331        };
1332
1333        // Register the epoch
1334        self._epochs.insert(epoch_id, epoch.clone());
1335        self._startable_epochs.insert(epoch_id);
1336        self._node_to_epochs
1337            .entry(node_name.clone())
1338            .or_default()
1339            .push(epoch_id);
1340
1341        // Create location entry for packets inside the epoch
1342        let epoch_location = PacketLocation::Node(epoch_id);
1343        self._packets_by_location
1344            .insert(epoch_location.clone(), IndexSet::new());
1345
1346        // Create output port location entries for this epoch
1347        for port_name in node.out_ports.keys() {
1348            let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
1349            self._packets_by_location
1350                .insert(output_port_location, IndexSet::new());
1351        }
1352
1353        events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
1354
1355        // Move packets from input ports into the epoch
1356        for (port_name, packet_id) in &salvo.packets {
1357            let from_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1358
1359            // Get the index of this packet in its source location before moving
1360            let from_index = self
1361                ._packets_by_location
1362                .get(&from_location)
1363                .and_then(|packets| packets.get_index_of(packet_id))
1364                .expect("Packet should exist at from_location");
1365
1366            self.move_packet(packet_id, epoch_location.clone());
1367            events.push(NetEvent::PacketMoved(
1368                get_utc_now(),
1369                *packet_id,
1370                from_location,
1371                epoch_location.clone(),
1372                from_index,
1373            ));
1374        }
1375
1376        NetActionResponse::Success(NetActionResponseData::CreatedEpoch(epoch), events)
1377    }
1378
1379    fn load_packet_into_output_port(
1380        &mut self,
1381        packet_id: &PacketID,
1382        port_name: &str,
1383    ) -> NetActionResponse {
1384        let (epoch_id, old_location) = if let Some(packet) = self._packets.get(packet_id) {
1385            if let PacketLocation::Node(epoch_id) = packet.location {
1386                (epoch_id, packet.location.clone())
1387            } else {
1388                return NetActionResponse::Error(NetActionError::PacketNotInAnyNode {
1389                    packet_id: *packet_id,
1390                });
1391            }
1392        } else {
1393            return NetActionResponse::Error(NetActionError::PacketNotFound {
1394                packet_id: *packet_id,
1395            });
1396        };
1397
1398        let node_name = self
1399            ._epochs
1400            .get(&epoch_id)
1401            .expect("The epoch id in the location of a packet could not be found.")
1402            .node_name
1403            .clone();
1404        let node = self
1405            .graph
1406            .nodes()
1407            .get(&node_name)
1408            .expect("Packet located in a non-existing node (yet the node has an epoch).");
1409
1410        if !node.out_ports.contains_key(port_name) {
1411            return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1412                port_name: port_name.to_string(),
1413                epoch_id,
1414            });
1415        }
1416
1417        let port = node.out_ports.get(port_name).unwrap();
1418        let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.to_string());
1419        let port_packets = self
1420            ._packets_by_location
1421            .get(&output_port_location)
1422            .expect("No entry in NetSim._packets_by_location found for output port.");
1423
1424        // Check if the output port is full
1425        if let PortSlotSpec::Finite(num_slots) = port.slots_spec
1426            && port_packets.len() as u64 >= num_slots
1427        {
1428            return NetActionResponse::Error(NetActionError::OutputPortFull {
1429                port_name: port_name.to_string(),
1430                epoch_id,
1431            });
1432        }
1433
1434        // Get the index before moving
1435        let from_index = self
1436            ._packets_by_location
1437            .get(&old_location)
1438            .and_then(|packets| packets.get_index_of(packet_id))
1439            .expect("Packet should exist at old_location");
1440
1441        let new_location = output_port_location;
1442        self.move_packet(packet_id, new_location.clone());
1443        NetActionResponse::Success(
1444            NetActionResponseData::None,
1445            vec![NetEvent::PacketMoved(
1446                get_utc_now(),
1447                *packet_id,
1448                old_location,
1449                new_location,
1450                from_index,
1451            )],
1452        )
1453    }
1454
1455    fn send_output_salvo(
1456        &mut self,
1457        epoch_id: &EpochID,
1458        salvo_condition_name: &SalvoConditionName,
1459    ) -> NetActionResponse {
1460        // Get epoch
1461        let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
1462            epoch
1463        } else {
1464            return NetActionResponse::Error(NetActionError::EpochNotFound {
1465                epoch_id: *epoch_id,
1466            });
1467        };
1468
1469        // Get node and capture node_name early to avoid borrow issues
1470        let node = self
1471            .graph
1472            .nodes()
1473            .get(&epoch.node_name)
1474            .expect("Node associated with epoch could not be found.");
1475        let node_name = node.name.clone();
1476
1477        // Get salvo condition
1478        let salvo_condition =
1479            if let Some(salvo_condition) = node.out_salvo_conditions.get(salvo_condition_name) {
1480                salvo_condition
1481            } else {
1482                return NetActionResponse::Error(NetActionError::OutputSalvoConditionNotFound {
1483                    condition_name: salvo_condition_name.clone(),
1484                    epoch_id: *epoch_id,
1485                });
1486            };
1487
1488        // Check if max salvos reached for this specific condition
1489        if let MaxSalvos::Finite(max) = salvo_condition.max_salvos {
1490            let condition_salvo_count = epoch
1491                .out_salvos
1492                .iter()
1493                .filter(|s| s.salvo_condition == *salvo_condition_name)
1494                .count() as u64;
1495            if condition_salvo_count >= max {
1496                return NetActionResponse::Error(NetActionError::MaxOutputSalvosReached {
1497                    condition_name: salvo_condition_name.clone(),
1498                    epoch_id: *epoch_id,
1499                });
1500            }
1501        }
1502
1503        // Check that the salvo condition is met
1504        let port_packet_counts: HashMap<PortName, u64> = node
1505            .out_ports
1506            .keys()
1507            .map(|port_name| {
1508                let count = self
1509                    ._packets_by_location
1510                    .get(&PacketLocation::OutputPort(*epoch_id, port_name.clone()))
1511                    .map(|packets| packets.len() as u64)
1512                    .unwrap_or(0);
1513                (port_name.clone(), count)
1514            })
1515            .collect();
1516        if !evaluate_salvo_condition(&salvo_condition.term, &port_packet_counts, &node.out_ports) {
1517            return NetActionResponse::Error(NetActionError::SalvoConditionNotMet {
1518                condition_name: salvo_condition_name.clone(),
1519                epoch_id: *epoch_id,
1520            });
1521        }
1522
1523        // Get the locations to send packets to
1524        // Tuple: (packet_id, port_name, from_location, to_location, is_orphaned)
1525        let mut packets_to_move: Vec<(PacketID, PortName, PacketLocation, PacketLocation, bool)> =
1526            Vec::new();
1527        for (port_name, packet_count) in &salvo_condition.ports {
1528            let from_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1529            let packets = self
1530                ._packets_by_location
1531                .get(&from_location)
1532                .unwrap_or_else(|| {
1533                    panic!(
1534                        "Output port '{}' of node '{}' does not have an entry in self._packets_by_location",
1535                        port_name,
1536                        node_name
1537                    )
1538                })
1539                .clone();
1540
1541            // Check if there's an edge connected to this output port
1542            let (to_location, is_orphaned) = if let Some(edge_ref) =
1543                self.graph.get_edge_by_tail(&PortRef {
1544                    node_name: node_name.clone(),
1545                    port_type: PortType::Output,
1546                    port_name: port_name.clone(),
1547                }) {
1548                // Connected: send to edge
1549                (PacketLocation::Edge(edge_ref.clone()), false)
1550            } else {
1551                // Unconnected: send to OutsideNet (orphaned)
1552                (PacketLocation::OutsideNet, true)
1553            };
1554
1555            let take_count = match packet_count {
1556                PacketCount::All => packets.len(),
1557                PacketCount::Count(n) => std::cmp::min(*n as usize, packets.len()),
1558            };
1559            for packet_id in packets.into_iter().take(take_count) {
1560                packets_to_move.push((
1561                    packet_id,
1562                    port_name.clone(),
1563                    from_location.clone(),
1564                    to_location.clone(),
1565                    is_orphaned,
1566                ));
1567            }
1568        }
1569
1570        // Create a Salvo and add it to the epoch
1571        let salvo = Salvo {
1572            salvo_condition: salvo_condition_name.clone(),
1573            packets: packets_to_move
1574                .iter()
1575                .map(|(packet_id, port_name, _, _, _)| (port_name.clone(), *packet_id))
1576                .collect(),
1577        };
1578        self._epochs
1579            .get_mut(epoch_id)
1580            .unwrap()
1581            .out_salvos
1582            .push(salvo);
1583
1584        // Move packets and track orphaned ones
1585        let mut net_events = Vec::new();
1586        let mut orphaned_infos: Vec<OrphanedPacketInfo> = Vec::new();
1587
1588        for (packet_id, port_name, from_location, to_location, is_orphaned) in packets_to_move {
1589            if is_orphaned {
1590                // Emit PacketOrphaned event for unconnected port
1591                net_events.push(NetEvent::PacketOrphaned(
1592                    get_utc_now(),
1593                    packet_id,
1594                    *epoch_id,
1595                    node_name.clone(),
1596                    port_name.clone(),
1597                    salvo_condition_name.clone(),
1598                ));
1599                orphaned_infos.push(OrphanedPacketInfo {
1600                    packet_id,
1601                    from_port: port_name,
1602                    salvo_condition: salvo_condition_name.clone(),
1603                });
1604            } else {
1605                // Get the fresh index of this packet before moving (indices shift after each move)
1606                let from_index = self
1607                    ._packets_by_location
1608                    .get(&from_location)
1609                    .and_then(|packets| packets.get_index_of(&packet_id))
1610                    .expect("Packet should exist at from_location");
1611
1612                // Emit PacketMoved event for connected port
1613                net_events.push(NetEvent::PacketMoved(
1614                    get_utc_now(),
1615                    packet_id,
1616                    from_location,
1617                    to_location.clone(),
1618                    from_index,
1619                ));
1620            }
1621            self.move_packet(&packet_id, to_location);
1622        }
1623
1624        // Add orphaned packets to the epoch
1625        if !orphaned_infos.is_empty() {
1626            self._epochs
1627                .get_mut(epoch_id)
1628                .unwrap()
1629                .orphaned_packets
1630                .extend(orphaned_infos);
1631        }
1632
1633        // Emit OutputSalvoTriggered event
1634        net_events.push(NetEvent::OutputSalvoTriggered(
1635            get_utc_now(),
1636            *epoch_id,
1637            salvo_condition_name.clone(),
1638        ));
1639
1640        NetActionResponse::Success(NetActionResponseData::None, net_events)
1641    }
1642
1643    fn create_request(&mut self, node_name: &NodeName, label: &str) -> NetActionResponse {
1644        // Validate node exists
1645        if !self.graph.nodes().contains_key(node_name) {
1646            return NetActionResponse::Error(NetActionError::RequestNodeNotFound {
1647                node_name: node_name.clone(),
1648            });
1649        }
1650
1651        self._pending_requests.push(PendingRequest {
1652            node_name: node_name.clone(),
1653            label: label.to_string(),
1654        });
1655
1656        NetActionResponse::Success(
1657            NetActionResponseData::None,
1658            vec![NetEvent::RequestCreated(
1659                get_utc_now(),
1660                node_name.clone(),
1661                label.to_string(),
1662                RequestCreatedSource::External,
1663            )],
1664        )
1665    }
1666
1667    fn transport_packet_to_location(
1668        &mut self,
1669        packet_id: &PacketID,
1670        destination: &PacketLocation,
1671    ) -> NetActionResponse {
1672        // Validate packet exists
1673        let packet = if let Some(p) = self._packets.get(packet_id) {
1674            p
1675        } else {
1676            return NetActionResponse::Error(NetActionError::PacketNotFound {
1677                packet_id: *packet_id,
1678            });
1679        };
1680        let current_location = packet.location.clone();
1681
1682        // Check if moving FROM a running epoch
1683        match &current_location {
1684            PacketLocation::Node(epoch_id) => {
1685                if let Some(epoch) = self._epochs.get(epoch_id)
1686                    && epoch.state == EpochState::Running
1687                {
1688                    return NetActionResponse::Error(
1689                        NetActionError::CannotMovePacketFromRunningEpoch {
1690                            packet_id: *packet_id,
1691                            epoch_id: *epoch_id,
1692                        },
1693                    );
1694                }
1695            }
1696            PacketLocation::OutputPort(epoch_id, _) => {
1697                if let Some(epoch) = self._epochs.get(epoch_id)
1698                    && epoch.state == EpochState::Running
1699                {
1700                    return NetActionResponse::Error(
1701                        NetActionError::CannotMovePacketFromRunningEpoch {
1702                            packet_id: *packet_id,
1703                            epoch_id: *epoch_id,
1704                        },
1705                    );
1706                }
1707            }
1708            _ => {}
1709        }
1710
1711        // Check if moving TO a running epoch
1712        match destination {
1713            PacketLocation::Node(epoch_id) => {
1714                if let Some(epoch) = self._epochs.get(epoch_id) {
1715                    if epoch.state == EpochState::Running {
1716                        return NetActionResponse::Error(
1717                            NetActionError::CannotMovePacketIntoRunningEpoch {
1718                                packet_id: *packet_id,
1719                                epoch_id: *epoch_id,
1720                            },
1721                        );
1722                    }
1723                } else {
1724                    return NetActionResponse::Error(NetActionError::EpochNotFound {
1725                        epoch_id: *epoch_id,
1726                    });
1727                }
1728            }
1729            PacketLocation::OutputPort(epoch_id, port_name) => {
1730                if let Some(epoch) = self._epochs.get(epoch_id) {
1731                    if epoch.state == EpochState::Running {
1732                        return NetActionResponse::Error(
1733                            NetActionError::CannotMovePacketIntoRunningEpoch {
1734                                packet_id: *packet_id,
1735                                epoch_id: *epoch_id,
1736                            },
1737                        );
1738                    }
1739                    // Check that output port exists on the node
1740                    let node = self
1741                        .graph
1742                        .nodes()
1743                        .get(&epoch.node_name)
1744                        .expect("Node associated with epoch could not be found.");
1745                    if !node.out_ports.contains_key(port_name) {
1746                        return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1747                            port_name: port_name.clone(),
1748                            epoch_id: *epoch_id,
1749                        });
1750                    }
1751                } else {
1752                    return NetActionResponse::Error(NetActionError::EpochNotFound {
1753                        epoch_id: *epoch_id,
1754                    });
1755                }
1756            }
1757            PacketLocation::InputPort(node_name, port_name) => {
1758                // Check node exists
1759                let node = if let Some(n) = self.graph.nodes().get(node_name) {
1760                    n
1761                } else {
1762                    return NetActionResponse::Error(NetActionError::NodeNotFound {
1763                        node_name: node_name.clone(),
1764                    });
1765                };
1766                // Check port exists
1767                let port = if let Some(p) = node.in_ports.get(port_name) {
1768                    p
1769                } else {
1770                    return NetActionResponse::Error(NetActionError::InputPortNotFound {
1771                        port_name: port_name.clone(),
1772                        node_name: node_name.clone(),
1773                    });
1774                };
1775                // Check capacity
1776                let current_count = self
1777                    ._packets_by_location
1778                    .get(destination)
1779                    .map(|s| s.len())
1780                    .unwrap_or(0);
1781                let is_full = match &port.slots_spec {
1782                    PortSlotSpec::Infinite => false,
1783                    PortSlotSpec::Finite(capacity) => current_count >= *capacity as usize,
1784                };
1785                if is_full {
1786                    return NetActionResponse::Error(NetActionError::InputPortFull {
1787                        port_name: port_name.clone(),
1788                        node_name: node_name.clone(),
1789                    });
1790                }
1791            }
1792            PacketLocation::Edge(edge) => {
1793                // Check edge exists in graph
1794                if !self.graph.edges().contains(edge) {
1795                    return NetActionResponse::Error(NetActionError::EdgeNotFound {
1796                        edge: edge.clone(),
1797                    });
1798                }
1799            }
1800            PacketLocation::OutsideNet => {
1801                // Always allowed
1802            }
1803        }
1804
1805        // Get the index before moving
1806        let from_index = self
1807            ._packets_by_location
1808            .get(&current_location)
1809            .and_then(|packets| packets.get_index_of(packet_id))
1810            .expect("Packet should exist at current_location");
1811
1812        // Move the packet
1813        self.move_packet(packet_id, destination.clone());
1814
1815        NetActionResponse::Success(
1816            NetActionResponseData::None,
1817            vec![NetEvent::PacketMoved(
1818                get_utc_now(),
1819                *packet_id,
1820                current_location,
1821                destination.clone(),
1822                from_index,
1823            )],
1824        )
1825    }
1826
1827    /// Perform an action on the network.
1828    ///
1829    /// This is the primary way to mutate the network state. All actions produce
1830    /// a response containing either success data and events, or an error.
1831    ///
1832    /// # Example
1833    ///
1834    /// ```
1835    /// use netrun_sim::net::{NetSim, NetAction, NetActionResponse, NetActionResponseData};
1836    /// use netrun_sim::graph::{Graph, Node, Port, PortSlotSpec};
1837    /// use indexmap::IndexMap;
1838    /// use std::collections::HashMap;
1839    ///
1840    /// let node = Node {
1841    ///     name: "A".to_string(),
1842    ///     in_ports: HashMap::new(),
1843    ///     out_ports: HashMap::new(),
1844    ///     in_salvo_conditions: IndexMap::new(),
1845    ///     out_salvo_conditions: IndexMap::new(),
1846    ///     dependency_request_config: None,
1847    /// };
1848    /// let graph = Graph::new(vec![node], vec![]);
1849    /// let mut net = NetSim::new(graph);
1850    ///
1851    /// // Create a packet outside the network
1852    /// let response = net.do_action(&NetAction::CreatePacket(None));
1853    /// match response {
1854    ///     NetActionResponse::Success(NetActionResponseData::Packet(id), events) => {
1855    ///         println!("Created packet {}", id);
1856    ///     }
1857    ///     _ => panic!("Expected success"),
1858    /// }
1859    /// ```
1860    pub fn do_action(&mut self, action: &NetAction) -> NetActionResponse {
1861        match action {
1862            NetAction::RunStep => self.run_step(),
1863            NetAction::CreatePacket(maybe_epoch_id) => self.create_packet(maybe_epoch_id),
1864            NetAction::ConsumePacket(packet_id) => self.consume_packet(packet_id),
1865            NetAction::DestroyPacket(packet_id) => self.destroy_packet(packet_id),
1866            NetAction::StartEpoch(epoch_id) => self.start_epoch(epoch_id),
1867            NetAction::FinishEpoch(epoch_id) => self.finish_epoch(epoch_id),
1868            NetAction::CancelEpoch(epoch_id) => self.cancel_epoch(epoch_id),
1869            NetAction::CreateEpoch(node_name, salvo) => self.create_epoch(node_name, salvo),
1870            NetAction::LoadPacketIntoOutputPort(packet_id, port_name) => {
1871                self.load_packet_into_output_port(packet_id, port_name)
1872            }
1873            NetAction::SendOutputSalvo(epoch_id, salvo_condition_name) => {
1874                self.send_output_salvo(epoch_id, salvo_condition_name)
1875            }
1876            NetAction::TransportPacketToLocation(packet_id, location) => {
1877                self.transport_packet_to_location(packet_id, location)
1878            }
1879            NetAction::CreateRequest(node_name, label) => self.create_request(node_name, label),
1880        }
1881    }
1882
1883    // ========== Public Accessors ==========
1884
1885    /// Get the number of packets at a given location.
1886    pub fn packet_count_at(&self, location: &PacketLocation) -> usize {
1887        self._packets_by_location
1888            .get(location)
1889            .map(|s| s.len())
1890            .unwrap_or(0)
1891    }
1892
1893    /// Get all packets at a given location.
1894    pub fn get_packets_at_location(&self, location: &PacketLocation) -> Vec<PacketID> {
1895        self._packets_by_location
1896            .get(location)
1897            .map(|s| s.iter().cloned().collect())
1898            .unwrap_or_default()
1899    }
1900
1901    /// Get an epoch by ID.
1902    pub fn get_epoch(&self, epoch_id: &EpochID) -> Option<&Epoch> {
1903        self._epochs.get(epoch_id)
1904    }
1905
1906    /// Get all startable epoch IDs.
1907    pub fn get_startable_epochs(&self) -> Vec<EpochID> {
1908        self._startable_epochs.iter().cloned().collect()
1909    }
1910
1911    /// Get a packet by ID.
1912    pub fn get_packet(&self, packet_id: &PacketID) -> Option<&Packet> {
1913        self._packets.get(packet_id)
1914    }
1915
1916    /// Run the network until blocked, returning all events that occurred.
1917    ///
1918    /// This is a convenience method that repeatedly calls `RunStep` until no more
1919    /// progress can be made. Equivalent to:
1920    /// ```ignore
1921    /// while !net.is_blocked() {
1922    ///     net.do_action(&NetAction::RunStep);
1923    /// }
1924    /// ```
1925    pub fn run_until_blocked(&mut self) -> Vec<NetEvent> {
1926        let mut all_events = Vec::new();
1927        while !self.is_blocked() {
1928            if let NetActionResponse::Success(_, events) = self.do_action(&NetAction::RunStep) {
1929                all_events.extend(events);
1930            }
1931        }
1932        all_events
1933    }
1934
1935    /// Check if the network is blocked (no progress can be made by RunStep).
1936    ///
1937    /// Returns true if:
1938    /// - No packets can move from edges to input ports (all destinations full or no packets on edges)
1939    /// - No input salvo conditions can be triggered
1940    pub fn is_blocked(&self) -> bool {
1941        // Check Phase 1: Can any packet move from an edge to an input port?
1942        for (location, packets) in &self._packets_by_location {
1943            if let PacketLocation::Edge(edge_ref) = location {
1944                if packets.is_empty() {
1945                    continue;
1946                }
1947
1948                let target_node_name = &edge_ref.target.node_name;
1949                let target_port_name = &edge_ref.target.port_name;
1950
1951                let node = match self.graph.nodes().get(target_node_name) {
1952                    Some(n) => n,
1953                    None => continue,
1954                };
1955                let port = match node.in_ports.get(target_port_name) {
1956                    Some(p) => p,
1957                    None => continue,
1958                };
1959
1960                let input_port_location =
1961                    PacketLocation::InputPort(target_node_name.clone(), target_port_name.clone());
1962                let current_count = self
1963                    ._packets_by_location
1964                    .get(&input_port_location)
1965                    .map(|p| p.len() as u64)
1966                    .unwrap_or(0);
1967
1968                let can_move = match port.slots_spec {
1969                    PortSlotSpec::Infinite => true,
1970                    PortSlotSpec::Finite(max_slots) => current_count < max_slots,
1971                };
1972
1973                if can_move {
1974                    return false; // Not blocked - a packet can move
1975                }
1976            }
1977        }
1978
1979        // Check Phase 2: Can any salvo condition be triggered?
1980        for (location, packets) in &self._packets_by_location {
1981            if let PacketLocation::InputPort(node_name, _) = location {
1982                if packets.is_empty() {
1983                    continue;
1984                }
1985
1986                // Check if any salvo condition on this node can be triggered
1987                if self.can_trigger_input_salvo(node_name) {
1988                    return false; // Not blocked - a salvo condition can trigger
1989                }
1990            }
1991        }
1992
1993        // Check Phase 2b/3: Are there pending requests or unsent startup triggers?
1994        if !self._pending_requests.is_empty() {
1995            return false;
1996        }
1997        if !self._startup_requests_sent {
1998            // Check if any node has OnStartup trigger
1999            let has_startup = self.graph.nodes().values().any(|node| {
2000                node.dependency_request_config
2001                    .as_ref()
2002                    .is_some_and(|c| c.triggers.contains(&DependencyRequestTrigger::OnStartup))
2003            });
2004            if has_startup {
2005                return false;
2006            }
2007        }
2008
2009        true // Blocked - no progress possible
2010    }
2011
2012    /// Helper: Check if any input salvo condition can be triggered for a node.
2013    fn can_trigger_input_salvo(&self, node_name: &NodeName) -> bool {
2014        let node = match self.graph.nodes().get(node_name) {
2015            Some(n) => n,
2016            None => return false,
2017        };
2018
2019        let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
2020
2021        // Calculate packet counts for all input ports
2022        let port_packet_counts: HashMap<PortName, u64> = in_port_names
2023            .iter()
2024            .map(|port_name| {
2025                let count = self
2026                    ._packets_by_location
2027                    .get(&PacketLocation::InputPort(
2028                        node_name.clone(),
2029                        port_name.clone(),
2030                    ))
2031                    .map(|packets| packets.len() as u64)
2032                    .unwrap_or(0);
2033                (port_name.clone(), count)
2034            })
2035            .collect();
2036
2037        // Check if any salvo condition is satisfied
2038        for cond in node.in_salvo_conditions.values() {
2039            if evaluate_salvo_condition(&cond.term, &port_packet_counts, &node.in_ports) {
2040                return true;
2041            }
2042        }
2043
2044        false
2045    }
2046
2047    // ========== Undo Implementation ==========
2048
2049    /// Undo a previously executed action.
2050    ///
2051    /// Takes the original action and the events it produced.
2052    /// Returns `Ok(())` on success, or an error if undo is not possible.
2053    ///
2054    /// # Restrictions
2055    /// - Actions must be undone in reverse order (LIFO)
2056    /// - State may have changed since the action (undo may fail)
2057    ///
2058    /// # Example
2059    /// ```ignore
2060    /// let action = NetAction::CreatePacket(None);
2061    /// let response = net.do_action(&action);
2062    /// if let NetActionResponse::Success(_, events) = response {
2063    ///     // Later, to undo:
2064    ///     net.undo_action(&action, &events)?;
2065    /// }
2066    /// ```
2067    pub fn undo_action(
2068        &mut self,
2069        action: &NetAction,
2070        events: &[NetEvent],
2071    ) -> Result<(), UndoError> {
2072        // Process events in reverse order
2073        for event in events.iter().rev() {
2074            self.undo_event(action, event)?;
2075        }
2076        Ok(())
2077    }
2078
2079    /// Undo a single event.
2080    fn undo_event(&mut self, action: &NetAction, event: &NetEvent) -> Result<(), UndoError> {
2081        match event {
2082            NetEvent::PacketCreated(_, packet_id) => self.undo_packet_created(packet_id),
2083            NetEvent::PacketConsumed(_, packet_id, location) => {
2084                self.undo_packet_consumed(packet_id, location)
2085            }
2086            NetEvent::PacketDestroyed(_, packet_id, location) => {
2087                self.undo_packet_destroyed(packet_id, location)
2088            }
2089            NetEvent::EpochCreated(_, epoch_id) => self.undo_epoch_created(epoch_id),
2090            NetEvent::EpochStarted(_, epoch_id) => self.undo_epoch_started(epoch_id),
2091            NetEvent::EpochFinished(_, epoch) => self.undo_epoch_finished(epoch),
2092            NetEvent::EpochCancelled(_, epoch) => self.undo_epoch_cancelled(epoch),
2093            NetEvent::PacketMoved(_, packet_id, from, to, from_index) => {
2094                self.undo_packet_moved(packet_id, from, to, *from_index)
2095            }
2096            NetEvent::InputSalvoTriggered(_, _, _) => {
2097                // Informational only - no state to undo
2098                Ok(())
2099            }
2100            NetEvent::OutputSalvoTriggered(_, epoch_id, _) => {
2101                // Pop the last out_salvo from the epoch
2102                self.undo_output_salvo_triggered(epoch_id, action)
2103            }
2104            NetEvent::PacketOrphaned(_, packet_id, epoch_id, _, port_name, _) => {
2105                // Move packet back from OutsideNet to output port
2106                self.undo_packet_orphaned(packet_id, epoch_id, port_name)
2107            }
2108            NetEvent::RequestCreated(_, node_name, label, source) => {
2109                match source {
2110                    RequestCreatedSource::External => {
2111                        // Pop the last matching pending request (LIFO)
2112                        if let Some(pos) = self
2113                            ._pending_requests
2114                            .iter()
2115                            .rposition(|r| r.node_name == *node_name && r.label == *label)
2116                        {
2117                            self._pending_requests.remove(pos);
2118                        }
2119                        Ok(())
2120                    }
2121                    RequestCreatedSource::OnStartup => {
2122                        self._startup_requests_sent = false;
2123                        Ok(())
2124                    }
2125                    RequestCreatedSource::OnNoSalvoTriggered => {
2126                        // Un-spend the token
2127                        self._request_tokens.insert(node_name.clone(), true);
2128                        Ok(())
2129                    }
2130                }
2131            }
2132            NetEvent::RequestCascadeResolved(_, _, _)
2133            | NetEvent::RequestEpochCreated(_, _, _, _) => {
2134                // Informational only - no state to undo
2135                // (Request-created epochs are undone via EpochCreated events)
2136                Ok(())
2137            }
2138        }
2139    }
2140
2141    /// Undo PacketCreated: Remove the packet from the network.
2142    fn undo_packet_created(&mut self, packet_id: &PacketID) -> Result<(), UndoError> {
2143        // Get packet's location
2144        let location = match self._packets.get(packet_id) {
2145            Some(p) => p.location.clone(),
2146            None => {
2147                return Err(UndoError::NotFound(format!(
2148                    "packet {} not found",
2149                    packet_id
2150                )));
2151            }
2152        };
2153
2154        // Remove from location index
2155        if let Some(packets) = self._packets_by_location.get_mut(&location) {
2156            packets.shift_remove(packet_id);
2157        }
2158
2159        // Remove from packets map
2160        self._packets.remove(packet_id);
2161
2162        Ok(())
2163    }
2164
2165    /// Undo PacketConsumed: Recreate the packet at its previous location.
2166    fn undo_packet_consumed(
2167        &mut self,
2168        packet_id: &PacketID,
2169        location: &PacketLocation,
2170    ) -> Result<(), UndoError> {
2171        self.recreate_packet(packet_id, location)
2172    }
2173
2174    /// Undo PacketDestroyed: Recreate the packet at its previous location.
2175    fn undo_packet_destroyed(
2176        &mut self,
2177        packet_id: &PacketID,
2178        location: &PacketLocation,
2179    ) -> Result<(), UndoError> {
2180        self.recreate_packet(packet_id, location)
2181    }
2182
2183    /// Helper: Recreate a packet at a given location.
2184    fn recreate_packet(
2185        &mut self,
2186        packet_id: &PacketID,
2187        location: &PacketLocation,
2188    ) -> Result<(), UndoError> {
2189        // Check packet doesn't already exist
2190        if self._packets.contains_key(packet_id) {
2191            return Err(UndoError::StateMismatch(format!(
2192                "packet {} already exists",
2193                packet_id
2194            )));
2195        }
2196
2197        // Create the packet
2198        let packet = Packet {
2199            id: *packet_id,
2200            location: location.clone(),
2201        };
2202        self._packets.insert(*packet_id, packet);
2203
2204        // Add to location index
2205        self._packets_by_location
2206            .entry(location.clone())
2207            .or_default()
2208            .insert(*packet_id);
2209
2210        Ok(())
2211    }
2212
2213    /// Undo EpochCreated: Remove the epoch from all indices.
2214    fn undo_epoch_created(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
2215        // Get epoch info before removing
2216        let epoch = match self._epochs.get(epoch_id) {
2217            Some(e) => e.clone(),
2218            None => {
2219                return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2220            }
2221        };
2222
2223        // Remove from _epochs
2224        self._epochs.remove(epoch_id);
2225
2226        // Remove from _startable_epochs if present
2227        self._startable_epochs.remove(epoch_id);
2228
2229        // Remove from _node_to_epochs (and clean up empty entries)
2230        if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
2231            epoch_ids.retain(|id| id != epoch_id);
2232            // Remove the entry entirely if empty to restore exact state
2233            if epoch_ids.is_empty() {
2234                self._node_to_epochs.remove(&epoch.node_name);
2235            }
2236        }
2237
2238        // Remove location entries for the epoch
2239        let epoch_location = PacketLocation::Node(*epoch_id);
2240        self._packets_by_location.remove(&epoch_location);
2241
2242        // Remove output port location entries
2243        if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2244            for port_name in node.out_ports.keys() {
2245                let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
2246                self._packets_by_location.remove(&output_port_location);
2247            }
2248        }
2249
2250        Ok(())
2251    }
2252
2253    /// Undo EpochStarted: Change state back to Startable, add to _startable_epochs.
2254    fn undo_epoch_started(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
2255        let epoch = match self._epochs.get_mut(epoch_id) {
2256            Some(e) => e,
2257            None => {
2258                return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2259            }
2260        };
2261
2262        // Verify epoch is in Running state
2263        if epoch.state != EpochState::Running {
2264            return Err(UndoError::StateMismatch(format!(
2265                "epoch {} is not in Running state, cannot undo start",
2266                epoch_id
2267            )));
2268        }
2269
2270        // Change state back to Startable
2271        epoch.state = EpochState::Startable;
2272
2273        // Add back to _startable_epochs
2274        self._startable_epochs.insert(*epoch_id);
2275
2276        Ok(())
2277    }
2278
2279    /// Undo EpochFinished: Restore the epoch from the event.
2280    fn undo_epoch_finished(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
2281        let epoch_id = epoch.id;
2282
2283        // Check epoch doesn't already exist
2284        if self._epochs.contains_key(&epoch_id) {
2285            return Err(UndoError::StateMismatch(format!(
2286                "epoch {} already exists",
2287                epoch_id
2288            )));
2289        }
2290
2291        // Restore the epoch with its original state (from before finish)
2292        // Note: epoch in the event captures state before finish (Running)
2293        self._epochs.insert(epoch_id, epoch.clone());
2294
2295        // Recreate location entries
2296        let epoch_location = PacketLocation::Node(epoch_id);
2297        self._packets_by_location
2298            .insert(epoch_location, IndexSet::new());
2299
2300        // Recreate output port location entries
2301        if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2302            for port_name in node.out_ports.keys() {
2303                let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
2304                self._packets_by_location
2305                    .insert(output_port_location, IndexSet::new());
2306            }
2307        }
2308
2309        // Add back to _node_to_epochs
2310        self._node_to_epochs
2311            .entry(epoch.node_name.clone())
2312            .or_default()
2313            .push(epoch_id);
2314
2315        // Reverse token replenishment
2316        self.set_request_token(&epoch.node_name, false);
2317
2318        Ok(())
2319    }
2320
2321    /// Undo EpochCancelled: Restore the epoch from the event.
2322    /// Note: Packets are restored via PacketDestroyed events (processed in reverse order).
2323    fn undo_epoch_cancelled(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
2324        let epoch_id = epoch.id;
2325
2326        // Check epoch doesn't already exist
2327        if self._epochs.contains_key(&epoch_id) {
2328            return Err(UndoError::StateMismatch(format!(
2329                "epoch {} already exists",
2330                epoch_id
2331            )));
2332        }
2333
2334        // Restore the epoch with its original state
2335        self._epochs.insert(epoch_id, epoch.clone());
2336
2337        // Recreate location entries
2338        let epoch_location = PacketLocation::Node(epoch_id);
2339        self._packets_by_location
2340            .insert(epoch_location, IndexSet::new());
2341
2342        // Recreate output port location entries
2343        if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2344            for port_name in node.out_ports.keys() {
2345                let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
2346                self._packets_by_location
2347                    .insert(output_port_location, IndexSet::new());
2348            }
2349        }
2350
2351        // Add back to _node_to_epochs
2352        self._node_to_epochs
2353            .entry(epoch.node_name.clone())
2354            .or_default()
2355            .push(epoch_id);
2356
2357        // If epoch was startable, add to _startable_epochs
2358        if epoch.state == EpochState::Startable {
2359            self._startable_epochs.insert(epoch_id);
2360        }
2361
2362        // Reverse token replenishment
2363        self.set_request_token(&epoch.node_name, false);
2364
2365        Ok(())
2366    }
2367
2368    /// Undo PacketMoved: Move packet back from `to` to `from` at `from_index`.
2369    fn undo_packet_moved(
2370        &mut self,
2371        packet_id: &PacketID,
2372        from: &PacketLocation,
2373        to: &PacketLocation,
2374        from_index: usize,
2375    ) -> Result<(), UndoError> {
2376        // Verify packet exists and is at `to` location
2377        let packet = match self._packets.get(packet_id) {
2378            Some(p) => p,
2379            None => {
2380                return Err(UndoError::NotFound(format!(
2381                    "packet {} not found",
2382                    packet_id
2383                )));
2384            }
2385        };
2386
2387        if packet.location != *to {
2388            return Err(UndoError::StateMismatch(format!(
2389                "packet {} is not at expected location {:?}, found at {:?}",
2390                packet_id, to, packet.location
2391            )));
2392        }
2393
2394        // Remove from `to` location
2395        if let Some(packets) = self._packets_by_location.get_mut(to) {
2396            packets.shift_remove(packet_id);
2397        }
2398
2399        // Insert back into `from` at original index using shift_insert
2400        let packets_at_from = self._packets_by_location.entry(from.clone()).or_default();
2401        packets_at_from.shift_insert(from_index, *packet_id);
2402
2403        // Update packet's location
2404        self._packets.get_mut(packet_id).unwrap().location = from.clone();
2405
2406        Ok(())
2407    }
2408
2409    /// Undo OutputSalvoTriggered: Pop the last out_salvo from the epoch and clear orphaned packets.
2410    fn undo_output_salvo_triggered(
2411        &mut self,
2412        epoch_id: &EpochID,
2413        action: &NetAction,
2414    ) -> Result<(), UndoError> {
2415        // Only pop out_salvo for SendOutputSalvo action
2416        // For RunStep, salvo info isn't stored in out_salvos
2417        if !matches!(action, NetAction::SendOutputSalvo(_, _)) {
2418            return Ok(());
2419        }
2420
2421        let epoch = match self._epochs.get_mut(epoch_id) {
2422            Some(e) => e,
2423            None => {
2424                return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2425            }
2426        };
2427
2428        // Pop the last out_salvo
2429        if epoch.out_salvos.pop().is_none() {
2430            return Err(UndoError::StateMismatch(format!(
2431                "epoch {} has no out_salvos to pop",
2432                epoch_id
2433            )));
2434        }
2435
2436        // Note: orphaned_packets are removed via undo_packet_orphaned (called for each PacketOrphaned event)
2437
2438        Ok(())
2439    }
2440
2441    /// Undo PacketOrphaned: Move packet back from OutsideNet to output port.
2442    fn undo_packet_orphaned(
2443        &mut self,
2444        packet_id: &PacketID,
2445        epoch_id: &EpochID,
2446        port_name: &PortName,
2447    ) -> Result<(), UndoError> {
2448        // Verify packet exists and is at OutsideNet
2449        let packet = match self._packets.get(packet_id) {
2450            Some(p) => p,
2451            None => {
2452                return Err(UndoError::NotFound(format!(
2453                    "packet {} not found",
2454                    packet_id
2455                )));
2456            }
2457        };
2458
2459        if packet.location != PacketLocation::OutsideNet {
2460            return Err(UndoError::StateMismatch(format!(
2461                "packet {} is not at OutsideNet, found at {:?}",
2462                packet_id, packet.location
2463            )));
2464        }
2465
2466        // Remove from OutsideNet
2467        if let Some(packets) = self
2468            ._packets_by_location
2469            .get_mut(&PacketLocation::OutsideNet)
2470        {
2471            packets.shift_remove(packet_id);
2472        }
2473
2474        // Move back to output port
2475        let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
2476        self._packets_by_location
2477            .entry(output_port_location.clone())
2478            .or_default()
2479            .insert(*packet_id);
2480
2481        // Update packet's location
2482        self._packets.get_mut(packet_id).unwrap().location = output_port_location;
2483
2484        // Remove from epoch's orphaned_packets list
2485        if let Some(epoch) = self._epochs.get_mut(epoch_id) {
2486            epoch
2487                .orphaned_packets
2488                .retain(|info| info.packet_id != *packet_id);
2489        }
2490
2491        Ok(())
2492    }
2493
2494    // ========== Internal Test Helpers ==========
2495
2496    #[cfg(test)]
2497    pub fn startable_epoch_ids(&self) -> Vec<EpochID> {
2498        self.get_startable_epochs()
2499    }
2500}
2501
2502#[cfg(test)]
2503#[path = "net_tests.rs"]
2504mod tests;