netrun_sim/
net.rs

1//! Runtime state and operations for flow-based development networks.
2//!
3//! This module provides the [`Net`] type which tracks the runtime state of a network,
4//! including packet locations, epoch lifecycles, and provides actions to control packet flow.
5//!
6//! All mutations to the network state go through [`Net::do_action`] which accepts a
7//! [`NetAction`] and returns a [`NetActionResponse`] containing any events that occurred.
8
9use crate::_utils::get_utc_now;
10use crate::graph::{Edge, Graph, NodeName, Port, PortSlotSpec, PortName, PortType, PortRef, SalvoConditionName, SalvoConditionTerm, evaluate_salvo_condition};
11use indexmap::IndexSet;
12use std::collections::{HashMap, HashSet};
13use ulid::Ulid;
14
15/// Unique identifier for a packet (ULID).
16pub type PacketID = Ulid;
17
18/// Unique identifier for an epoch (ULID).
19pub type EpochID = Ulid;
20
21/// Where a packet is located in the network.
22///
23/// Packets move through these locations as they flow through the network:
24/// - Start outside the net or get created inside an epoch
25/// - Move to edges, then to input ports
26/// - Get consumed into epochs via salvo conditions
27/// - Can be loaded into output ports and sent back to edges
28#[derive(Debug, PartialEq, Eq, Hash, Clone)]
29pub enum PacketLocation {
30    /// Inside an epoch (either startable or running).
31    Node(EpochID),
32    /// Waiting at a node's input port.
33    InputPort(NodeName, PortName),
34    /// Loaded into an epoch's output port, ready to be sent.
35    OutputPort(EpochID, PortName),
36    /// In transit on an edge between nodes.
37    Edge(Edge),
38    /// External to the network (not yet injected or already extracted).
39    OutsideNet,
40}
41
42/// A unit that flows through the network.
43#[derive(Debug)]
44pub struct Packet {
45    /// Unique identifier for this packet.
46    pub id: PacketID,
47    /// Current location of this packet.
48    pub location: PacketLocation,
49}
50
51/// A collection of packets that enter or exit a node together.
52///
53/// Salvos are created when salvo conditions are satisfied:
54/// - Input salvos are created when packets at input ports trigger an epoch
55/// - Output salvos are created when packets at output ports are sent out
56#[derive(Debug, Clone)]
57pub struct Salvo {
58    /// The name of the salvo condition that was triggered.
59    pub salvo_condition: SalvoConditionName,
60    /// The packets in this salvo, paired with their port names.
61    pub packets: Vec<(PortName, PacketID)>,
62}
63
64/// The lifecycle state of an epoch.
65#[derive(Debug, Clone, PartialEq)]
66pub enum EpochState {
67    /// Epoch is created but not yet started. External code must call StartEpoch.
68    Startable,
69    /// Epoch is actively running. Packets can be created, loaded, and sent.
70    Running,
71    /// Epoch has completed. No further operations are allowed.
72    Finished,
73}
74
75/// An execution instance of a node.
76///
77/// A single node can have multiple simultaneous epochs. Each epoch tracks
78/// which packets entered (in_salvo), which have been sent out (out_salvos),
79/// and its current lifecycle state.
80#[derive(Debug, Clone)]
81pub struct Epoch {
82    /// Unique identifier for this epoch.
83    pub id: EpochID,
84    /// The node this epoch is executing on.
85    pub node_name: NodeName,
86    /// The salvo of packets that triggered this epoch.
87    pub in_salvo: Salvo,
88    /// Salvos that have been sent out from this epoch.
89    pub out_salvos: Vec<Salvo>,
90    /// Current lifecycle state.
91    pub state: EpochState,
92}
93
94impl Epoch {
95    /// Returns the timestamp when this epoch was created (milliseconds since Unix epoch).
96    pub fn start_time(&self) -> u64 {
97        self.id.timestamp_ms()
98    }
99}
100
101/// Timestamp in milliseconds (UTC).
102pub type EventUTC = i128;
103
104/// An action that can be performed on the network.
105///
106/// All mutations to [`Net`] state must go through these actions via [`Net::do_action`].
107/// This ensures all operations are tracked and produce appropriate events.
108#[derive(Debug)]
109pub enum NetAction {
110    /// Run automatic packet flow until no more progress can be made.
111    /// Moves packets from edges to input ports and triggers input salvo conditions.
112    RunNetUntilBlocked,
113    /// Create a new packet, optionally inside an epoch.
114    /// If `None`, packet is created outside the network.
115    CreatePacket(Option<EpochID>),
116    /// Remove a packet from the network entirely.
117    ConsumePacket(PacketID),
118    /// Transition a startable epoch to running state.
119    StartEpoch(EpochID),
120    /// Complete a running epoch. Fails if epoch still contains packets.
121    FinishEpoch(EpochID),
122    /// Cancel an epoch and destroy all packets inside it.
123    CancelEpoch(EpochID),
124    /// Manually create and start an epoch with specified packets.
125    /// Bypasses the normal salvo condition triggering mechanism.
126    CreateAndStartEpoch(NodeName, Salvo),
127    /// Move a packet from inside an epoch to one of its output ports.
128    LoadPacketIntoOutputPort(PacketID, PortName),
129    /// Send packets from output ports onto edges according to a salvo condition.
130    SendOutputSalvo(EpochID, SalvoConditionName),
131    /// Transport a packet to a new location.
132    /// Restrictions:
133    /// - Cannot move packets into or out of Running epochs (only Startable allowed)
134    /// - Input ports are checked for capacity
135    TransportPacketToLocation(PacketID, PacketLocation),
136}
137
138/// Errors that can occur when performing a NetAction
139#[derive(Debug, thiserror::Error)]
140pub enum NetActionError {
141    /// Packet with the given ID was not found in the network
142    #[error("packet not found: {packet_id}")]
143    PacketNotFound { packet_id: PacketID },
144
145    /// Epoch with the given ID was not found
146    #[error("epoch not found: {epoch_id}")]
147    EpochNotFound { epoch_id: EpochID },
148
149    /// Epoch exists but is not in Running state
150    #[error("epoch {epoch_id} is not running")]
151    EpochNotRunning { epoch_id: EpochID },
152
153    /// Epoch exists but is not in Startable state
154    #[error("epoch {epoch_id} is not startable")]
155    EpochNotStartable { epoch_id: EpochID },
156
157    /// Cannot finish epoch because it still contains packets
158    #[error("cannot finish epoch {epoch_id}: epoch still contains packets")]
159    CannotFinishNonEmptyEpoch { epoch_id: EpochID },
160
161    /// Packet is not inside the specified epoch's node location
162    #[error("packet {packet_id} is not inside epoch {epoch_id}")]
163    PacketNotInNode { packet_id: PacketID, epoch_id: EpochID },
164
165    /// Output port does not exist on the node
166    #[error("output port '{port_name}' not found on node for epoch {epoch_id}")]
167    OutputPortNotFound { port_name: PortName, epoch_id: EpochID },
168
169    /// Output salvo condition does not exist on the node
170    #[error("output salvo condition '{condition_name}' not found on node for epoch {epoch_id}")]
171    OutputSalvoConditionNotFound { condition_name: SalvoConditionName, epoch_id: EpochID },
172
173    /// Maximum number of output salvos reached for this condition
174    #[error("max output salvos reached for condition '{condition_name}' on epoch {epoch_id}")]
175    MaxOutputSalvosReached { condition_name: SalvoConditionName, epoch_id: EpochID },
176
177    /// Output salvo condition is not satisfied
178    #[error("salvo condition '{condition_name}' not met for epoch {epoch_id}")]
179    SalvoConditionNotMet { condition_name: SalvoConditionName, epoch_id: EpochID },
180
181    /// Output port has reached its capacity
182    #[error("output port '{port_name}' is full for epoch {epoch_id}")]
183    OutputPortFull { port_name: PortName, epoch_id: EpochID },
184
185    /// Cannot send packets to an output port that has no connected edge
186    #[error("output port '{port_name}' on node '{node_name}' is not connected to any edge")]
187    CannotPutPacketIntoUnconnectedOutputPort { port_name: PortName, node_name: NodeName },
188
189    /// Node with the given name was not found in the graph
190    #[error("node not found: '{node_name}'")]
191    NodeNotFound { node_name: NodeName },
192
193    /// Packet is not at the expected input port
194    #[error("packet {packet_id} is not at input port '{port_name}' of node '{node_name}'")]
195    PacketNotAtInputPort { packet_id: PacketID, port_name: PortName, node_name: NodeName },
196
197    /// Input port does not exist on the node
198    #[error("input port '{port_name}' not found on node '{node_name}'")]
199    InputPortNotFound { port_name: PortName, node_name: NodeName },
200
201    /// Input port has reached its capacity
202    #[error("input port '{port_name}' on node '{node_name}' is full")]
203    InputPortFull { port_name: PortName, node_name: NodeName },
204
205    /// Cannot move packet out of a running epoch
206    #[error("cannot move packet {packet_id} out of running epoch {epoch_id}")]
207    CannotMovePacketFromRunningEpoch { packet_id: PacketID, epoch_id: EpochID },
208
209    /// Cannot move packet into a running epoch
210    #[error("cannot move packet {packet_id} into running epoch {epoch_id}")]
211    CannotMovePacketIntoRunningEpoch { packet_id: PacketID, epoch_id: EpochID },
212
213    /// Edge does not exist in the graph
214    #[error("edge not found: {edge}")]
215    EdgeNotFound { edge: Edge },
216}
217
218/// An event that occurred during a network action.
219///
220/// Events provide a complete audit trail of all state changes in the network.
221/// Each event includes a timestamp and relevant identifiers.
222#[derive(Debug, Clone)]
223pub enum NetEvent {
224    /// A new packet was created.
225    PacketCreated(EventUTC, PacketID),
226    /// A packet was removed from the network.
227    PacketConsumed(EventUTC, PacketID),
228    /// A new epoch was created (in Startable state).
229    EpochCreated(EventUTC, EpochID),
230    /// An epoch transitioned from Startable to Running.
231    EpochStarted(EventUTC, EpochID),
232    /// An epoch completed successfully.
233    EpochFinished(EventUTC, EpochID),
234    /// An epoch was cancelled.
235    EpochCancelled(EventUTC, EpochID),
236    /// A packet moved to a new location.
237    PacketMoved(EventUTC, PacketID, PacketLocation),
238    /// An input salvo condition was triggered, creating an epoch.
239    InputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
240    /// An output salvo condition was triggered, sending packets.
241    OutputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
242}
243
244/// Data returned by a successful network action.
245#[derive(Debug)]
246pub enum NetActionResponseData {
247    /// A packet ID (returned by CreatePacket).
248    Packet(PacketID),
249    /// The started epoch (returned by StartEpoch, CreateAndStartEpoch).
250    StartedEpoch(Epoch),
251    /// The finished epoch (returned by FinishEpoch).
252    FinishedEpoch(Epoch),
253    /// The cancelled epoch and IDs of destroyed packets (returned by CancelEpoch).
254    CancelledEpoch(Epoch, Vec<PacketID>),
255    /// No specific data (returned by RunNetUntilBlocked, ConsumePacket, etc.).
256    None,
257}
258
259/// The result of performing a network action.
260#[derive(Debug)]
261pub enum NetActionResponse {
262    /// Action succeeded, with optional data and a list of events that occurred.
263    Success(NetActionResponseData, Vec<NetEvent>),
264    /// Action failed with an error.
265    Error(NetActionError),
266}
267
268/// The runtime state of a flow-based network.
269///
270/// A `Net` is created from a [`Graph`] and tracks:
271/// - All packets and their locations
272/// - All epochs and their states
273/// - Which epochs are startable
274///
275/// All mutations must go through [`Net::do_action`] to ensure proper event tracking.
276#[derive(Debug)]
277pub struct Net {
278    /// The graph topology this network is running on.
279    pub graph: Graph,
280    _packets: HashMap<PacketID, Packet>,
281    _packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>>,
282    _epochs: HashMap<EpochID, Epoch>,
283    _startable_epochs: HashSet<EpochID>,
284    _node_to_epochs: HashMap<NodeName, Vec<EpochID>>,
285}
286
287impl Net {
288    /// Creates a new Net from a Graph.
289    ///
290    /// Initializes packet location tracking for all edges and input ports.
291    pub fn new(graph: Graph) -> Self {
292        let mut packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>> = HashMap::new();
293
294        // Initialize empty packet sets for all edges
295        for edge in graph.edges() {
296            packets_by_location.insert(PacketLocation::Edge(edge.clone()), IndexSet::new());
297        }
298
299        // Initialize empty packet sets for all input ports
300        for (node_name, node) in graph.nodes() {
301            for port_name in node.in_ports.keys() {
302                packets_by_location.insert(
303                    PacketLocation::InputPort(node_name.clone(), port_name.clone()),
304                    IndexSet::new(),
305                );
306            }
307        }
308
309        // Initialize OutsideNet location for packets created outside the network
310        packets_by_location.insert(PacketLocation::OutsideNet, IndexSet::new());
311
312        // Note: Output port locations are created per-epoch when epochs are created
313        // Note: Node locations (inside epochs) are created when epochs are created
314
315        Net {
316            graph,
317            _packets: HashMap::new(),
318            _packets_by_location: packets_by_location,
319            _epochs: HashMap::new(),
320            _startable_epochs: HashSet::new(),
321            _node_to_epochs: HashMap::new(),
322        }
323    }
324
325    fn move_packet(&mut self, packet_id: &PacketID, new_location: PacketLocation) {
326        let packet = self._packets.get_mut(&packet_id).unwrap();
327        let packets_at_old_location = self._packets_by_location.get_mut(&packet.location)
328            .expect("Packet location has no entry in self._packets_by_location.");
329        packets_at_old_location.shift_remove(packet_id);
330        packet.location = new_location;
331        if !self._packets_by_location.get_mut(&packet.location)
332                .expect("Packet location has no entry in self._packets_by_location")
333                .insert(packet_id.clone()) {
334            panic!("Attempted to move packet to a location that already contains it.");
335        }
336    }
337
338    // NetActions
339
340    fn run_until_blocked(&mut self) -> NetActionResponse {
341        let mut all_events: Vec<NetEvent> = Vec::new();
342
343        loop {
344            let mut made_progress = false;
345
346            // Collect all edge locations and their first packet (FIFO)
347            // We need to extract data before mutating to avoid borrow issues
348            struct EdgeMoveCandidate {
349                packet_id: PacketID,
350                target_node_name: NodeName,
351                input_port_location: PacketLocation,
352                can_move: bool,
353            }
354
355            let mut edge_candidates: Vec<EdgeMoveCandidate> = Vec::new();
356
357            // Iterate through all edge locations in _packets_by_location
358            for (location, packets) in &self._packets_by_location {
359                if let PacketLocation::Edge(edge_ref) = location {
360                    // Get the first packet (FIFO order)
361                    if let Some(first_packet_id) = packets.first() {
362                        let target_node_name = edge_ref.target.node_name.clone();
363                        let target_port_name = edge_ref.target.port_name.clone();
364
365                        // Check if the target input port has space
366                        let node = self.graph.nodes().get(&target_node_name)
367                            .expect("Edge targets a non-existent node");
368                        let port = node.in_ports.get(&target_port_name)
369                            .expect("Edge targets a non-existent input port");
370
371                        let input_port_location = PacketLocation::InputPort(target_node_name.clone(), target_port_name.clone());
372                        let current_count = self._packets_by_location
373                            .get(&input_port_location)
374                            .map(|packets| packets.len() as u64)
375                            .unwrap_or(0);
376
377                        let can_move = match port.slots_spec {
378                            PortSlotSpec::Infinite => true,
379                            PortSlotSpec::Finite(max_slots) => current_count < max_slots,
380                        };
381
382                        edge_candidates.push(EdgeMoveCandidate {
383                            packet_id: first_packet_id.clone(),
384                            target_node_name,
385                            input_port_location,
386                            can_move,
387                        });
388                    }
389                }
390            }
391
392            // Process each edge that can move a packet
393            for candidate in edge_candidates {
394                if !candidate.can_move {
395                    continue;
396                }
397
398                // Move the packet to the input port
399                self.move_packet(&candidate.packet_id, candidate.input_port_location.clone());
400                all_events.push(NetEvent::PacketMoved(get_utc_now(), candidate.packet_id.clone(), candidate.input_port_location.clone()));
401                made_progress = true;
402
403                // Check input salvo conditions on the target node
404                // Extract all needed data from the graph first
405                let node = self.graph.nodes().get(&candidate.target_node_name)
406                    .expect("Edge targets a non-existent node");
407
408                let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
409                let in_ports_clone: HashMap<PortName, Port> = node.in_ports.iter()
410                    .map(|(k, v)| (k.clone(), Port { slots_spec: match v.slots_spec {
411                        PortSlotSpec::Infinite => PortSlotSpec::Infinite,
412                        PortSlotSpec::Finite(n) => PortSlotSpec::Finite(n),
413                    }}))
414                    .collect();
415
416                // Collect salvo condition data
417                struct SalvoConditionData {
418                    name: SalvoConditionName,
419                    ports: Vec<PortName>,
420                    term: SalvoConditionTerm,
421                }
422
423                let salvo_conditions: Vec<SalvoConditionData> = node.in_salvo_conditions.iter()
424                    .map(|(name, cond)| SalvoConditionData {
425                        name: name.clone(),
426                        ports: cond.ports.clone(),
427                        term: cond.term.clone(),
428                    })
429                    .collect();
430
431                // Check salvo conditions in order - first satisfied one wins
432                for salvo_cond_data in salvo_conditions {
433                    // Calculate packet counts for all input ports
434                    let port_packet_counts: HashMap<PortName, u64> = in_port_names.iter()
435                        .map(|port_name| {
436                            let count = self._packets_by_location
437                                .get(&PacketLocation::InputPort(candidate.target_node_name.clone(), port_name.clone()))
438                                .map(|packets| packets.len() as u64)
439                                .unwrap_or(0);
440                            (port_name.clone(), count)
441                        })
442                        .collect();
443
444                    // Check if salvo condition is satisfied
445                    if evaluate_salvo_condition(&salvo_cond_data.term, &port_packet_counts, &in_ports_clone) {
446                        // Create a new epoch
447                        let epoch_id = Ulid::new();
448
449                        // Collect packets from the ports listed in salvo_condition.ports
450                        let mut salvo_packets: Vec<(PortName, PacketID)> = Vec::new();
451                        let mut packets_to_move: Vec<(PacketID, PortName)> = Vec::new();
452
453                        for port_name in &salvo_cond_data.ports {
454                            let port_location = PacketLocation::InputPort(candidate.target_node_name.clone(), port_name.clone());
455                            if let Some(packet_ids) = self._packets_by_location.get(&port_location) {
456                                for pid in packet_ids.iter() {
457                                    salvo_packets.push((port_name.clone(), pid.clone()));
458                                    packets_to_move.push((pid.clone(), port_name.clone()));
459                                }
460                            }
461                        }
462
463                        // Create the salvo
464                        let in_salvo = Salvo {
465                            salvo_condition: salvo_cond_data.name.clone(),
466                            packets: salvo_packets,
467                        };
468
469                        // Create the epoch
470                        let epoch = Epoch {
471                            id: epoch_id.clone(),
472                            node_name: candidate.target_node_name.clone(),
473                            in_salvo,
474                            out_salvos: Vec::new(),
475                            state: EpochState::Startable,
476                        };
477
478                        // Register the epoch
479                        self._epochs.insert(epoch_id.clone(), epoch);
480                        self._startable_epochs.insert(epoch_id.clone());
481                        self._node_to_epochs
482                            .entry(candidate.target_node_name.clone())
483                            .or_insert_with(Vec::new)
484                            .push(epoch_id.clone());
485
486                        // Create a location entry for packets inside the epoch
487                        let epoch_location = PacketLocation::Node(epoch_id.clone());
488                        self._packets_by_location.insert(epoch_location.clone(), IndexSet::new());
489
490                        // Create output port location entries for this epoch
491                        let node = self.graph.nodes().get(&candidate.target_node_name)
492                            .expect("Node not found for epoch creation");
493                        for port_name in node.out_ports.keys() {
494                            let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
495                            self._packets_by_location.insert(output_port_location, IndexSet::new());
496                        }
497
498                        // Move packets from input ports into the epoch
499                        for (pid, _port_name) in &packets_to_move {
500                            self.move_packet(pid, epoch_location.clone());
501                            all_events.push(NetEvent::PacketMoved(get_utc_now(), pid.clone(), epoch_location.clone()));
502                        }
503
504                        all_events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id.clone()));
505                        all_events.push(NetEvent::InputSalvoTriggered(get_utc_now(), epoch_id.clone(), salvo_cond_data.name.clone()));
506
507                        // Only one salvo condition can trigger per node per iteration
508                        break;
509                    }
510                }
511            }
512
513            // If no progress was made, we're blocked
514            if !made_progress {
515                break;
516            }
517        }
518
519        NetActionResponse::Success(NetActionResponseData::None, all_events)
520    }
521
522    fn create_packet(&mut self, maybe_epoch_id: &Option<EpochID>) -> NetActionResponse {
523        // Check that epoch_id exists and is running
524        if let Some(epoch_id) = maybe_epoch_id {
525            if !self._epochs.contains_key(&epoch_id) {
526                return NetActionResponse::Error(NetActionError::EpochNotFound {
527                    epoch_id: epoch_id.clone(),
528                });
529            }
530            if !matches!(self._epochs[&epoch_id].state, EpochState::Running) {
531                return NetActionResponse::Error(NetActionError::EpochNotRunning {
532                    epoch_id: epoch_id.clone(),
533                });
534            }
535        }
536
537        let packet_location = match maybe_epoch_id {
538            Some(epoch_id) => PacketLocation::Node(epoch_id.clone()),
539            None => PacketLocation::OutsideNet,
540        };
541
542        let packet = Packet {
543            id: Ulid::new(),
544            location: packet_location.clone(),
545        };
546
547        let packet_id = packet.id.clone();
548        self._packets.insert(packet.id.clone(), packet);
549
550        // Add packet to location index
551        self._packets_by_location
552            .entry(packet_location)
553            .or_insert_with(IndexSet::new)
554            .insert(packet_id.clone());
555
556        NetActionResponse::Success(
557            NetActionResponseData::Packet(packet_id),
558            vec![NetEvent::PacketCreated(get_utc_now(), packet_id)]
559        )
560    }
561
562    fn consume_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
563        if !self._packets.contains_key(packet_id) {
564            return NetActionResponse::Error(NetActionError::PacketNotFound {
565                packet_id: packet_id.clone(),
566            });
567        }
568
569        if let Some(packets) = self
570            ._packets_by_location
571            .get_mut(&self._packets[packet_id].location)
572        {
573            if packets.shift_remove(packet_id) {
574                self._packets.remove(packet_id);
575                NetActionResponse::Success(
576                    NetActionResponseData::None,
577                    vec![NetEvent::PacketConsumed(get_utc_now(), packet_id.clone())]
578                )
579            } else {
580                panic!(
581                    "Packet with ID {} not found in location {:?}",
582                    packet_id, self._packets[packet_id].location
583                );
584            }
585        } else {
586            panic!("Packet location {:?} not found", self._packets[packet_id].location);
587        }
588    }
589
590    fn start_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
591        if let Some(epoch) = self._epochs.get_mut(epoch_id) {
592            if !self._startable_epochs.contains(epoch_id) {
593                return NetActionResponse::Error(NetActionError::EpochNotStartable {
594                    epoch_id: epoch_id.clone(),
595                });
596            }
597            debug_assert!(matches!(epoch.state, EpochState::Startable),
598                "Epoch state is not Startable but was in net._startable_epochs.");
599            epoch.state = EpochState::Running;
600            self._startable_epochs.remove(epoch_id);
601            NetActionResponse::Success(
602                NetActionResponseData::StartedEpoch(epoch.clone()),
603                vec![NetEvent::EpochStarted(get_utc_now(), epoch_id.clone())]
604            )
605        } else {
606            return NetActionResponse::Error(NetActionError::EpochNotFound {
607                epoch_id: epoch_id.clone(),
608            });
609        }
610    }
611
612    fn finish_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
613        if let Some(epoch) = self._epochs.get(&epoch_id) {
614            if let EpochState::Running = epoch.state {
615                // No packets may remain in the epoch by the time it has ended.
616                let epoch_loc = PacketLocation::Node(epoch_id.clone());
617                if let Some(packets) = self._packets_by_location.get(&epoch_loc) {
618                    if packets.len() > 0 {
619                        return NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch {
620                            epoch_id: epoch_id.clone(),
621                        });
622                    }
623
624                    let mut epoch = self._epochs.remove(&epoch_id).unwrap();
625                    epoch.state = EpochState::Finished;
626                    self._packets_by_location.remove(&epoch_loc);
627                    NetActionResponse::Success(
628                        NetActionResponseData::FinishedEpoch(epoch),
629                        vec![NetEvent::EpochFinished(get_utc_now(), epoch_id.clone())]
630                    )
631                } else {
632                    panic!("Epoch {} not found in location {:?}", epoch_id, epoch_loc);
633                }
634            } else {
635                return NetActionResponse::Error(NetActionError::EpochNotRunning {
636                    epoch_id: epoch_id.clone(),
637                });
638            }
639        } else {
640            return NetActionResponse::Error(NetActionError::EpochNotFound {
641                epoch_id: epoch_id.clone(),
642            });
643        }
644    }
645
646    fn cancel_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
647        // Check if epoch exists
648        let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
649            epoch.clone()
650        } else {
651            return NetActionResponse::Error(NetActionError::EpochNotFound {
652                epoch_id: epoch_id.clone(),
653            });
654        };
655
656        let mut events: Vec<NetEvent> = Vec::new();
657        let mut destroyed_packets: Vec<PacketID> = Vec::new();
658
659        // Collect packets inside the epoch (Node location)
660        let epoch_location = PacketLocation::Node(epoch_id.clone());
661        if let Some(packet_ids) = self._packets_by_location.get(&epoch_location) {
662            destroyed_packets.extend(packet_ids.iter().cloned());
663        }
664
665        // Collect packets in the epoch's output ports
666        let node = self.graph.nodes().get(&epoch.node_name)
667            .expect("Epoch references non-existent node");
668        for port_name in node.out_ports.keys() {
669            let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
670            if let Some(packet_ids) = self._packets_by_location.get(&output_port_location) {
671                destroyed_packets.extend(packet_ids.iter().cloned());
672            }
673        }
674
675        // Remove packets from _packets and _packets_by_location, emit events
676        for packet_id in &destroyed_packets {
677            let packet = self._packets.remove(packet_id)
678                .expect("Packet in location map not found in packets map");
679            if let Some(packets_at_location) = self._packets_by_location.get_mut(&packet.location) {
680                packets_at_location.shift_remove(packet_id);
681            }
682            events.push(NetEvent::PacketConsumed(get_utc_now(), packet_id.clone()));
683        }
684
685        // Remove output port location entries for this epoch
686        for port_name in node.out_ports.keys() {
687            let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
688            self._packets_by_location.remove(&output_port_location);
689        }
690
691        // Remove the epoch's node location entry
692        self._packets_by_location.remove(&epoch_location);
693
694        // Update _startable_epochs if epoch was startable
695        self._startable_epochs.remove(epoch_id);
696
697        // Update _node_to_epochs
698        if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
699            epoch_ids.retain(|id| id != epoch_id);
700        }
701
702        // Remove epoch from _epochs
703        let epoch = self._epochs.remove(epoch_id)
704            .expect("Epoch should exist");
705
706        events.push(NetEvent::EpochCancelled(get_utc_now(), epoch_id.clone()));
707
708        NetActionResponse::Success(
709            NetActionResponseData::CancelledEpoch(epoch, destroyed_packets),
710            events
711        )
712    }
713
714    fn create_and_start_epoch(&mut self, node_name: &NodeName, salvo: &Salvo) -> NetActionResponse {
715        // Validate node exists
716        let node = match self.graph.nodes().get(node_name) {
717            Some(node) => node,
718            None => {
719                return NetActionResponse::Error(NetActionError::NodeNotFound {
720                    node_name: node_name.clone(),
721                });
722            }
723        };
724
725        // Validate all packets in salvo
726        for (port_name, packet_id) in &salvo.packets {
727            // Validate input port exists
728            if !node.in_ports.contains_key(port_name) {
729                return NetActionResponse::Error(NetActionError::InputPortNotFound {
730                    port_name: port_name.clone(),
731                    node_name: node_name.clone(),
732                });
733            }
734
735            // Validate packet exists
736            let packet = match self._packets.get(packet_id) {
737                Some(packet) => packet,
738                None => {
739                    return NetActionResponse::Error(NetActionError::PacketNotFound {
740                        packet_id: packet_id.clone(),
741                    });
742                }
743            };
744
745            // Validate packet is at the input port of this node
746            let expected_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
747            if packet.location != expected_location {
748                return NetActionResponse::Error(NetActionError::PacketNotAtInputPort {
749                    packet_id: packet_id.clone(),
750                    port_name: port_name.clone(),
751                    node_name: node_name.clone(),
752                });
753            }
754        }
755
756        let mut events: Vec<NetEvent> = Vec::new();
757
758        // Create the epoch
759        let epoch_id = Ulid::new();
760        let epoch = Epoch {
761            id: epoch_id.clone(),
762            node_name: node_name.clone(),
763            in_salvo: salvo.clone(),
764            out_salvos: Vec::new(),
765            state: EpochState::Running,
766        };
767
768        // Register the epoch
769        self._epochs.insert(epoch_id.clone(), epoch.clone());
770        self._node_to_epochs
771            .entry(node_name.clone())
772            .or_insert_with(Vec::new)
773            .push(epoch_id.clone());
774
775        // Create location entry for packets inside the epoch
776        let epoch_location = PacketLocation::Node(epoch_id.clone());
777        self._packets_by_location.insert(epoch_location.clone(), IndexSet::new());
778
779        // Create output port location entries for this epoch
780        for port_name in node.out_ports.keys() {
781            let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
782            self._packets_by_location.insert(output_port_location, IndexSet::new());
783        }
784
785        events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id.clone()));
786
787        // Move packets from input ports into the epoch
788        for (_, packet_id) in &salvo.packets {
789            self.move_packet(packet_id, epoch_location.clone());
790            events.push(NetEvent::PacketMoved(get_utc_now(), packet_id.clone(), epoch_location.clone()));
791        }
792
793        events.push(NetEvent::EpochStarted(get_utc_now(), epoch_id.clone()));
794
795        NetActionResponse::Success(
796            NetActionResponseData::StartedEpoch(epoch),
797            events
798        )
799    }
800
801    fn load_packet_into_output_port(&mut self, packet_id: &PacketID, port_name: &String) -> NetActionResponse {
802        let (epoch_id, old_location) = if let Some(packet) = self._packets.get(packet_id) {
803            if let PacketLocation::Node(epoch_id) = packet.location {
804                (epoch_id, packet.location.clone())
805            } else {
806                // We don't know the epoch_id since the packet isn't in a node
807                // Use a placeholder - this is an edge case where we can't provide full context
808                return NetActionResponse::Error(NetActionError::PacketNotInNode {
809                    packet_id: packet_id.clone(),
810                    epoch_id: Ulid::nil(), // Placeholder since packet isn't in any epoch
811                })
812            }
813        } else {
814            return NetActionResponse::Error(NetActionError::PacketNotFound {
815                packet_id: packet_id.clone(),
816            });
817        };
818
819        let node_name = self._epochs.get(&epoch_id)
820            .expect("The epoch id in the location of a packet could not be found.")
821            .node_name.clone();
822        let node = self.graph.nodes().get(&node_name)
823            .expect("Packet located in a non-existing node (yet the node has an epoch).");
824
825        if !node.out_ports.contains_key(port_name) {
826            return NetActionResponse::Error(NetActionError::OutputPortNotFound {
827                port_name: port_name.clone(),
828                epoch_id: epoch_id.clone(),
829            })
830        }
831
832        let port = node.out_ports.get(port_name).unwrap();
833        let port_packets = self._packets_by_location.get(&old_location)
834            .expect("No entry in Net._packets_by_location found for output port.");
835
836        // Check if the output port is full
837        if let PortSlotSpec::Finite(num_slots) = port.slots_spec {
838            if num_slots >= port_packets.len() as u64 {
839                return NetActionResponse::Error(NetActionError::OutputPortFull {
840                    port_name: port_name.clone(),
841                    epoch_id: epoch_id.clone(),
842                })
843            }
844        }
845
846        let new_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
847        self.move_packet(&packet_id, new_location.clone());
848        NetActionResponse::Success(
849            NetActionResponseData::None,
850            vec![NetEvent::PacketMoved(get_utc_now(), epoch_id, new_location)]
851        )
852    }
853
854    fn send_output_salvo(&mut self, epoch_id: &EpochID, salvo_condition_name: &SalvoConditionName) -> NetActionResponse {
855        // Get epoch
856        let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
857            epoch
858        } else {
859            return NetActionResponse::Error(NetActionError::EpochNotFound {
860                epoch_id: epoch_id.clone(),
861            });
862        };
863
864        // Get node
865        let node = self.graph.nodes().get(&epoch.node_name)
866            .expect("Node associated with epoch could not be found.");
867
868        // Get salvo condition
869        let salvo_condition = if let Some(salvo_condition) = node.out_salvo_conditions.get(salvo_condition_name) {
870            salvo_condition
871        } else {
872            return NetActionResponse::Error(NetActionError::OutputSalvoConditionNotFound {
873                condition_name: salvo_condition_name.clone(),
874                epoch_id: epoch_id.clone(),
875            });
876        };
877
878        // Check if max salvos reached (max_salvos = 0 means unlimited)
879        if salvo_condition.max_salvos > 0 && epoch.out_salvos.len() as u64 >= salvo_condition.max_salvos {
880            return NetActionResponse::Error(NetActionError::MaxOutputSalvosReached {
881                condition_name: salvo_condition_name.clone(),
882                epoch_id: epoch_id.clone(),
883            });
884        }
885
886        // Check that the salvo condition is met
887        let port_packet_counts: HashMap<PortName, u64> = node.out_ports.keys()
888            .map(|port_name| {
889                let count = self._packets_by_location
890                    .get(&PacketLocation::OutputPort(epoch_id.clone(), port_name.clone()))
891                    .map(|packets| packets.len() as u64)
892                    .unwrap_or(0);
893                (port_name.clone(), count)
894            })
895            .collect();
896        if !evaluate_salvo_condition(&salvo_condition.term, &port_packet_counts, &node.out_ports) {
897            return NetActionResponse::Error(NetActionError::SalvoConditionNotMet {
898                condition_name: salvo_condition_name.clone(),
899                epoch_id: epoch_id.clone(),
900            });
901        }
902
903        // Get the locations to send packets to
904        let mut packets_to_move: Vec<(PacketID, PortName, PacketLocation)> = Vec::new();
905        for port_name in &salvo_condition.ports {
906            let packets = self._packets_by_location.get(&PacketLocation::OutputPort(epoch_id.clone(), port_name.clone()))
907                .expect(format!("Output port '{}' of node '{}' does not have an entry in self._packets_by_location", port_name, node.name.clone()).as_str())
908                .clone();
909            let edge_ref = if let Some(edge_ref) = self.graph.get_edge_by_tail(&PortRef { node_name: node.name.clone(), port_type: PortType::Output, port_name: port_name.clone() }) {
910                edge_ref.clone()
911            } else {
912                return NetActionResponse::Error(NetActionError::CannotPutPacketIntoUnconnectedOutputPort {
913                    port_name: port_name.clone(),
914                    node_name: node.name.clone(),
915                });
916            };
917            let new_location = PacketLocation::Edge(edge_ref.clone());
918            for packet_id in packets {
919                packets_to_move.push((packet_id.clone(), port_name.clone(), new_location.clone()));
920            }
921        }
922
923        // Create a Salvo and add it to the epoch
924        let salvo = Salvo {
925            salvo_condition: salvo_condition_name.clone(),
926            packets: packets_to_move.iter().map(|(packet_id, port_name, _)| {
927                (port_name.clone(), packet_id.clone())
928            }).collect()
929        };
930        self._epochs.get_mut(&epoch_id).unwrap().out_salvos.push(salvo);
931
932        // Move packets
933        let mut net_events = Vec::new();
934        for (packet_id, _port_name, new_location) in packets_to_move {
935            net_events.push(NetEvent::PacketMoved(get_utc_now(), packet_id.clone(), new_location.clone()));
936            self.move_packet(&packet_id, new_location);
937        }
938
939        NetActionResponse::Success(
940            NetActionResponseData::None,
941            net_events
942        )
943    }
944
945    fn transport_packet_to_location(&mut self, packet_id: &PacketID, destination: &PacketLocation) -> NetActionResponse {
946        // Validate packet exists
947        let packet = if let Some(p) = self._packets.get(packet_id) {
948            p
949        } else {
950            return NetActionResponse::Error(NetActionError::PacketNotFound {
951                packet_id: packet_id.clone(),
952            });
953        };
954        let current_location = packet.location.clone();
955
956        // Check if moving FROM a running epoch
957        match &current_location {
958            PacketLocation::Node(epoch_id) => {
959                if let Some(epoch) = self._epochs.get(epoch_id) {
960                    if epoch.state == EpochState::Running {
961                        return NetActionResponse::Error(NetActionError::CannotMovePacketFromRunningEpoch {
962                            packet_id: packet_id.clone(),
963                            epoch_id: epoch_id.clone(),
964                        });
965                    }
966                }
967            }
968            PacketLocation::OutputPort(epoch_id, _) => {
969                if let Some(epoch) = self._epochs.get(epoch_id) {
970                    if epoch.state == EpochState::Running {
971                        return NetActionResponse::Error(NetActionError::CannotMovePacketFromRunningEpoch {
972                            packet_id: packet_id.clone(),
973                            epoch_id: epoch_id.clone(),
974                        });
975                    }
976                }
977            }
978            _ => {}
979        }
980
981        // Check if moving TO a running epoch
982        match destination {
983            PacketLocation::Node(epoch_id) => {
984                if let Some(epoch) = self._epochs.get(epoch_id) {
985                    if epoch.state == EpochState::Running {
986                        return NetActionResponse::Error(NetActionError::CannotMovePacketIntoRunningEpoch {
987                            packet_id: packet_id.clone(),
988                            epoch_id: epoch_id.clone(),
989                        });
990                    }
991                } else {
992                    return NetActionResponse::Error(NetActionError::EpochNotFound {
993                        epoch_id: epoch_id.clone(),
994                    });
995                }
996            }
997            PacketLocation::OutputPort(epoch_id, port_name) => {
998                if let Some(epoch) = self._epochs.get(epoch_id) {
999                    if epoch.state == EpochState::Running {
1000                        return NetActionResponse::Error(NetActionError::CannotMovePacketIntoRunningEpoch {
1001                            packet_id: packet_id.clone(),
1002                            epoch_id: epoch_id.clone(),
1003                        });
1004                    }
1005                    // Check that output port exists on the node
1006                    let node = self.graph.nodes().get(&epoch.node_name)
1007                        .expect("Node associated with epoch could not be found.");
1008                    if !node.out_ports.contains_key(port_name) {
1009                        return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1010                            port_name: port_name.clone(),
1011                            epoch_id: epoch_id.clone(),
1012                        });
1013                    }
1014                } else {
1015                    return NetActionResponse::Error(NetActionError::EpochNotFound {
1016                        epoch_id: epoch_id.clone(),
1017                    });
1018                }
1019            }
1020            PacketLocation::InputPort(node_name, port_name) => {
1021                // Check node exists
1022                let node = if let Some(n) = self.graph.nodes().get(node_name) {
1023                    n
1024                } else {
1025                    return NetActionResponse::Error(NetActionError::NodeNotFound {
1026                        node_name: node_name.clone(),
1027                    });
1028                };
1029                // Check port exists
1030                let port = if let Some(p) = node.in_ports.get(port_name) {
1031                    p
1032                } else {
1033                    return NetActionResponse::Error(NetActionError::InputPortNotFound {
1034                        port_name: port_name.clone(),
1035                        node_name: node_name.clone(),
1036                    });
1037                };
1038                // Check capacity
1039                let current_count = self._packets_by_location
1040                    .get(destination)
1041                    .map(|s| s.len())
1042                    .unwrap_or(0);
1043                let is_full = match &port.slots_spec {
1044                    PortSlotSpec::Infinite => false,
1045                    PortSlotSpec::Finite(capacity) => current_count >= *capacity as usize,
1046                };
1047                if is_full {
1048                    return NetActionResponse::Error(NetActionError::InputPortFull {
1049                        port_name: port_name.clone(),
1050                        node_name: node_name.clone(),
1051                    });
1052                }
1053            }
1054            PacketLocation::Edge(edge) => {
1055                // Check edge exists in graph
1056                if !self.graph.edges().contains(edge) {
1057                    return NetActionResponse::Error(NetActionError::EdgeNotFound {
1058                        edge: edge.clone(),
1059                    });
1060                }
1061            }
1062            PacketLocation::OutsideNet => {
1063                // Always allowed
1064            }
1065        }
1066
1067        // Move the packet
1068        self.move_packet(packet_id, destination.clone());
1069
1070        NetActionResponse::Success(
1071            NetActionResponseData::None,
1072            vec![NetEvent::PacketMoved(get_utc_now(), packet_id.clone(), destination.clone())]
1073        )
1074    }
1075
1076    /// Perform an action on the network.
1077    ///
1078    /// This is the primary way to mutate the network state. All actions produce
1079    /// a response containing either success data and events, or an error.
1080    ///
1081    /// # Example
1082    ///
1083    /// ```
1084    /// use netrun_sim::net::{Net, NetAction, NetActionResponse, NetActionResponseData};
1085    /// use netrun_sim::graph::{Graph, Node, Port, PortSlotSpec};
1086    /// use std::collections::HashMap;
1087    ///
1088    /// let node = Node {
1089    ///     name: "A".to_string(),
1090    ///     in_ports: HashMap::new(),
1091    ///     out_ports: HashMap::new(),
1092    ///     in_salvo_conditions: HashMap::new(),
1093    ///     out_salvo_conditions: HashMap::new(),
1094    /// };
1095    /// let graph = Graph::new(vec![node], vec![]);
1096    /// let mut net = Net::new(graph);
1097    ///
1098    /// // Create a packet outside the network
1099    /// let response = net.do_action(&NetAction::CreatePacket(None));
1100    /// match response {
1101    ///     NetActionResponse::Success(NetActionResponseData::Packet(id), events) => {
1102    ///         println!("Created packet {}", id);
1103    ///     }
1104    ///     _ => panic!("Expected success"),
1105    /// }
1106    /// ```
1107    pub fn do_action(&mut self, action: &NetAction) -> NetActionResponse {
1108        match action {
1109            NetAction::RunNetUntilBlocked => self.run_until_blocked(),
1110            NetAction::CreatePacket(maybe_epoch_id) => self.create_packet(maybe_epoch_id),
1111            NetAction::ConsumePacket(packet_id) => self.consume_packet(packet_id),
1112            NetAction::StartEpoch(epoch_id) => self.start_epoch(epoch_id),
1113            NetAction::FinishEpoch(epoch_id) => self.finish_epoch(epoch_id),
1114            NetAction::CancelEpoch(epoch_id) => self.cancel_epoch(epoch_id),
1115            NetAction::CreateAndStartEpoch(node_name, salvo) => self.create_and_start_epoch(node_name, salvo),
1116            NetAction::LoadPacketIntoOutputPort(packet_id, port_name) => self.load_packet_into_output_port(packet_id, port_name),
1117            NetAction::SendOutputSalvo(epoch_id, salvo_condition_name) => self.send_output_salvo(epoch_id, salvo_condition_name),
1118            NetAction::TransportPacketToLocation(packet_id, location) => self.transport_packet_to_location(packet_id, location),
1119        }
1120    }
1121
1122    // ========== Public Accessors ==========
1123
1124    /// Get the number of packets at a given location.
1125    pub fn packet_count_at(&self, location: &PacketLocation) -> usize {
1126        self._packets_by_location.get(location).map(|s| s.len()).unwrap_or(0)
1127    }
1128
1129    /// Get all packets at a given location.
1130    pub fn get_packets_at_location(&self, location: &PacketLocation) -> Vec<PacketID> {
1131        self._packets_by_location
1132            .get(location)
1133            .map(|s| s.iter().cloned().collect())
1134            .unwrap_or_default()
1135    }
1136
1137    /// Get an epoch by ID.
1138    pub fn get_epoch(&self, epoch_id: &EpochID) -> Option<&Epoch> {
1139        self._epochs.get(epoch_id)
1140    }
1141
1142    /// Get all startable epoch IDs.
1143    pub fn get_startable_epochs(&self) -> Vec<EpochID> {
1144        self._startable_epochs.iter().cloned().collect()
1145    }
1146
1147    /// Get a packet by ID.
1148    pub fn get_packet(&self, packet_id: &PacketID) -> Option<&Packet> {
1149        self._packets.get(packet_id)
1150    }
1151
1152    // ========== Internal Test Helpers ==========
1153
1154    #[cfg(test)]
1155    pub fn startable_epoch_ids(&self) -> Vec<EpochID> {
1156        self.get_startable_epochs()
1157    }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    use super::*;
1163    use crate::test_fixtures::*;
1164
1165    // Helper to extract packet ID from create response
1166    fn get_packet_id(response: &NetActionResponse) -> PacketID {
1167        match response {
1168            NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id.clone(),
1169            _ => panic!("Expected Packet response, got: {:?}", response),
1170        }
1171    }
1172
1173    // Helper to extract epoch from start response
1174    fn get_started_epoch(response: &NetActionResponse) -> Epoch {
1175        match response {
1176            NetActionResponse::Success(NetActionResponseData::StartedEpoch(epoch), _) => epoch.clone(),
1177            _ => panic!("Expected StartedEpoch response, got: {:?}", response),
1178        }
1179    }
1180
1181    // ========== Packet Creation and Consumption Tests ==========
1182
1183    #[test]
1184    fn test_create_packet_outside_net() {
1185        let graph = linear_graph_3();
1186        let mut net = Net::new(graph);
1187
1188        let response = net.do_action(&NetAction::CreatePacket(None));
1189        assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::Packet(_), _)));
1190    }
1191
1192    #[test]
1193    fn test_consume_packet() {
1194        let graph = linear_graph_3();
1195        let mut net = Net::new(graph);
1196
1197        // Create a packet
1198        let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1199
1200        // Consume it
1201        let response = net.do_action(&NetAction::ConsumePacket(packet_id));
1202        assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::None, _)));
1203    }
1204
1205    #[test]
1206    fn test_consume_nonexistent_packet_fails() {
1207        let graph = linear_graph_3();
1208        let mut net = Net::new(graph);
1209
1210        let fake_id = Ulid::new();
1211        let response = net.do_action(&NetAction::ConsumePacket(fake_id));
1212        assert!(matches!(response, NetActionResponse::Error(NetActionError::PacketNotFound { .. })));
1213    }
1214
1215    // ========== Epoch Lifecycle Tests ==========
1216
1217    #[test]
1218    fn test_epoch_lifecycle_via_run_until_blocked() {
1219        let graph = linear_graph_3();
1220        let mut net = Net::new(graph);
1221
1222        // Put a packet on edge A->B
1223        let response = net.do_action(&NetAction::CreatePacket(None));
1224        let packet_id = get_packet_id(&response);
1225
1226        // Manually place packet on edge (simulating it came from node A)
1227        let edge_location = PacketLocation::Edge(Edge {
1228            source: PortRef {
1229                node_name: "A".to_string(),
1230                port_type: PortType::Output,
1231                port_name: "out".to_string(),
1232            },
1233            target: PortRef {
1234                node_name: "B".to_string(),
1235                port_type: PortType::Input,
1236                port_name: "in".to_string(),
1237            },
1238        });
1239        net._packets.get_mut(&packet_id).unwrap().location = edge_location.clone();
1240        net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1241        net._packets_by_location.get_mut(&edge_location).unwrap().insert(packet_id.clone());
1242
1243        // Run until blocked - packet should move to input port and trigger epoch
1244        net.do_action(&NetAction::RunNetUntilBlocked);
1245
1246        // Should have one startable epoch
1247        let startable = net.startable_epoch_ids();
1248        assert_eq!(startable.len(), 1);
1249
1250        // Start the epoch
1251        let epoch_id = startable[0].clone();
1252        let epoch = get_started_epoch(&net.do_action(&NetAction::StartEpoch(epoch_id.clone())));
1253        assert!(matches!(epoch.state, EpochState::Running));
1254
1255        // Consume the packet (simulating node processing)
1256        net.do_action(&NetAction::ConsumePacket(packet_id));
1257
1258        // Finish the epoch
1259        let response = net.do_action(&NetAction::FinishEpoch(epoch_id));
1260        assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::FinishedEpoch(_), _)));
1261    }
1262
1263    #[test]
1264    fn test_cannot_start_nonexistent_epoch() {
1265        let graph = linear_graph_3();
1266        let mut net = Net::new(graph);
1267
1268        let fake_id = Ulid::new();
1269        let response = net.do_action(&NetAction::StartEpoch(fake_id));
1270        assert!(matches!(response, NetActionResponse::Error(NetActionError::EpochNotFound { .. })));
1271    }
1272
1273    #[test]
1274    fn test_cannot_finish_epoch_with_packets() {
1275        let graph = linear_graph_3();
1276        let mut net = Net::new(graph);
1277
1278        // Create epoch with packet via create_and_start_epoch
1279        // First put packet at input port
1280        let response = net.do_action(&NetAction::CreatePacket(None));
1281        let packet_id = get_packet_id(&response);
1282
1283        let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1284        net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1285        net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1286        net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1287
1288        // Create and start epoch
1289        let salvo = Salvo {
1290            salvo_condition: "manual".to_string(),
1291            packets: vec![("in".to_string(), packet_id)],
1292        };
1293        let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1294
1295        // Try to finish without consuming packet
1296        let response = net.do_action(&NetAction::FinishEpoch(epoch.id));
1297        assert!(matches!(response, NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch { .. })));
1298    }
1299
1300    // ========== Cancel Epoch Tests ==========
1301
1302    #[test]
1303    fn test_cancel_epoch_destroys_packets() {
1304        let graph = linear_graph_3();
1305        let mut net = Net::new(graph);
1306
1307        // Create packet and place at input port
1308        let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1309        let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1310        net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1311        net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1312        net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1313
1314        // Create and start epoch
1315        let salvo = Salvo {
1316            salvo_condition: "manual".to_string(),
1317            packets: vec![("in".to_string(), packet_id.clone())],
1318        };
1319        let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1320
1321        // Cancel the epoch
1322        let response = net.do_action(&NetAction::CancelEpoch(epoch.id));
1323        match response {
1324            NetActionResponse::Success(NetActionResponseData::CancelledEpoch(_, destroyed), _) => {
1325                assert_eq!(destroyed.len(), 1);
1326                assert_eq!(destroyed[0], packet_id);
1327            }
1328            _ => panic!("Expected CancelledEpoch response"),
1329        }
1330
1331        // Packet should be gone
1332        assert!(!net._packets.contains_key(&packet_id));
1333    }
1334
1335    // ========== Run Until Blocked Tests ==========
1336
1337    #[test]
1338    fn test_run_until_blocked_moves_packet_to_input_port() {
1339        let graph = linear_graph_3();
1340        let mut net = Net::new(graph);
1341
1342        // Create packet on edge A->B
1343        let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1344        let edge_location = PacketLocation::Edge(Edge {
1345            source: PortRef {
1346                node_name: "A".to_string(),
1347                port_type: PortType::Output,
1348                port_name: "out".to_string(),
1349            },
1350            target: PortRef {
1351                node_name: "B".to_string(),
1352                port_type: PortType::Input,
1353                port_name: "in".to_string(),
1354            },
1355        });
1356        net._packets.get_mut(&packet_id).unwrap().location = edge_location.clone();
1357        net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1358        net._packets_by_location.get_mut(&edge_location).unwrap().insert(packet_id.clone());
1359
1360        // Run until blocked
1361        net.do_action(&NetAction::RunNetUntilBlocked);
1362
1363        // Packet should have triggered an epoch (moved into node)
1364        assert_eq!(net.startable_epoch_ids().len(), 1);
1365    }
1366
1367    #[test]
1368    fn test_run_until_blocked_respects_port_capacity() {
1369        // Create a graph where node B has finite capacity input port but NO salvo conditions
1370        // This tests that port capacity limits how many packets can wait at the input port
1371        use std::collections::HashMap;
1372        use crate::graph::Node;
1373        let node_b = Node {
1374            name: "B".to_string(),
1375            in_ports: {
1376                let mut ports = HashMap::new();
1377                ports.insert("in".to_string(), Port { slots_spec: PortSlotSpec::Finite(1) });
1378                ports
1379            },
1380            out_ports: HashMap::new(),
1381            in_salvo_conditions: HashMap::new(),  // No salvo conditions = packets wait at input port
1382            out_salvo_conditions: HashMap::new(),
1383        };
1384
1385        let nodes = vec![
1386            simple_node("A", vec![], vec!["out"]),
1387            node_b,
1388        ];
1389        let edges = vec![edge("A", "out", "B", "in")];
1390        let graph = Graph::new(nodes, edges);
1391        let mut net = Net::new(graph);
1392
1393        // Create two packets on edge A->B
1394        let edge_location = PacketLocation::Edge(Edge {
1395            source: PortRef {
1396                node_name: "A".to_string(),
1397                port_type: PortType::Output,
1398                port_name: "out".to_string(),
1399            },
1400            target: PortRef {
1401                node_name: "B".to_string(),
1402                port_type: PortType::Input,
1403                port_name: "in".to_string(),
1404            },
1405        });
1406
1407        let packet1 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1408        let packet2 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1409
1410        // Move both to edge
1411        for pid in [&packet1, &packet2] {
1412            net._packets.get_mut(pid).unwrap().location = edge_location.clone();
1413            net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(pid);
1414            net._packets_by_location.get_mut(&edge_location).unwrap().insert(pid.clone());
1415        }
1416
1417        // Run until blocked - only first packet should move (capacity = 1)
1418        net.do_action(&NetAction::RunNetUntilBlocked);
1419
1420        // No epochs created (no salvo conditions), one packet at input port, one still on edge
1421        assert_eq!(net.startable_epoch_ids().len(), 0);
1422        let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1423        assert_eq!(net.packet_count_at(&input_port_loc), 1);
1424        assert_eq!(net.packet_count_at(&edge_location), 1);
1425    }
1426
1427    #[test]
1428    fn test_fifo_packet_ordering() {
1429        // Test that packets are processed in FIFO order on edges
1430        // We verify this by examining the events emitted during run_until_blocked
1431        let graph = linear_graph_3();
1432        let mut net = Net::new(graph);
1433
1434        let edge_location = PacketLocation::Edge(Edge {
1435            source: PortRef {
1436                node_name: "A".to_string(),
1437                port_type: PortType::Output,
1438                port_name: "out".to_string(),
1439            },
1440            target: PortRef {
1441                node_name: "B".to_string(),
1442                port_type: PortType::Input,
1443                port_name: "in".to_string(),
1444            },
1445        });
1446
1447        // Create packets in order
1448        let packet1 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1449        let packet2 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1450        let packet3 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1451
1452        // Add to edge in order (packet1 first)
1453        for pid in [&packet1, &packet2, &packet3] {
1454            net._packets.get_mut(pid).unwrap().location = edge_location.clone();
1455            net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(pid);
1456            net._packets_by_location.get_mut(&edge_location).unwrap().insert(pid.clone());
1457        }
1458
1459        // Run - each packet triggers its own epoch (default salvo condition)
1460        let response = net.do_action(&NetAction::RunNetUntilBlocked);
1461
1462        // Extract PacketMoved events to verify FIFO order
1463        let events = match response {
1464            NetActionResponse::Success(_, events) => events,
1465            _ => panic!("Expected success response"),
1466        };
1467
1468        // Find the first PacketMoved event for each packet (when it moved from edge to input port)
1469        // The order of these events should match the FIFO order: packet1, packet2, packet3
1470        let packet_move_order: Vec<PacketID> = events.iter()
1471            .filter_map(|event| {
1472                if let NetEvent::PacketMoved(_, packet_id, PacketLocation::InputPort(_, _)) = event {
1473                    Some(packet_id.clone())
1474                } else {
1475                    None
1476                }
1477            })
1478            .collect();
1479
1480        // We should have 3 packet moves to input ports, in FIFO order
1481        assert_eq!(packet_move_order.len(), 3, "Expected 3 packets to move to input port");
1482        assert_eq!(packet_move_order[0], packet1, "First packet to move should be packet1");
1483        assert_eq!(packet_move_order[1], packet2, "Second packet to move should be packet2");
1484        assert_eq!(packet_move_order[2], packet3, "Third packet to move should be packet3");
1485    }
1486
1487    // ========== Output Salvo Tests ==========
1488
1489    #[test]
1490    fn test_load_packet_into_output_port() {
1491        let graph = linear_graph_3();
1492        let mut net = Net::new(graph);
1493
1494        // Create packet and place at input port of B
1495        let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1496        let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1497        net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1498        net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1499        net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1500
1501        // Create and start epoch
1502        let salvo = Salvo {
1503            salvo_condition: "manual".to_string(),
1504            packets: vec![("in".to_string(), packet_id.clone())],
1505        };
1506        let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1507
1508        // Load packet into output port
1509        let response = net.do_action(&NetAction::LoadPacketIntoOutputPort(packet_id.clone(), "out".to_string()));
1510        assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::None, _)));
1511
1512        // Packet should be at output port
1513        let output_loc = PacketLocation::OutputPort(epoch.id, "out".to_string());
1514        assert_eq!(net.packet_count_at(&output_loc), 1);
1515    }
1516
1517    #[test]
1518    fn test_send_output_salvo() {
1519        let graph = linear_graph_3();
1520        let mut net = Net::new(graph);
1521
1522        // Create packet and place at input port of B
1523        let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1524        let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1525        net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1526        net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1527        net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1528
1529        // Create and start epoch
1530        let salvo = Salvo {
1531            salvo_condition: "manual".to_string(),
1532            packets: vec![("in".to_string(), packet_id.clone())],
1533        };
1534        let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1535
1536        // Load packet into output port
1537        net.do_action(&NetAction::LoadPacketIntoOutputPort(packet_id.clone(), "out".to_string()));
1538
1539        // Send output salvo
1540        let response = net.do_action(&NetAction::SendOutputSalvo(epoch.id.clone(), "default".to_string()));
1541        assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::None, _)));
1542
1543        // Packet should now be on edge B->C
1544        let edge_loc = PacketLocation::Edge(Edge {
1545            source: PortRef {
1546                node_name: "B".to_string(),
1547                port_type: PortType::Output,
1548                port_name: "out".to_string(),
1549            },
1550            target: PortRef {
1551                node_name: "C".to_string(),
1552                port_type: PortType::Input,
1553                port_name: "in".to_string(),
1554            },
1555        });
1556        assert_eq!(net.packet_count_at(&edge_loc), 1);
1557    }
1558
1559    // ========== Create And Start Epoch Tests ==========
1560
1561    #[test]
1562    fn test_create_and_start_epoch() {
1563        let graph = linear_graph_3();
1564        let mut net = Net::new(graph);
1565
1566        // Create packet at input port
1567        let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1568        let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1569        net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1570        net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1571        net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1572
1573        // Create and start epoch manually
1574        let salvo = Salvo {
1575            salvo_condition: "manual".to_string(),
1576            packets: vec![("in".to_string(), packet_id.clone())],
1577        };
1578        let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1579
1580        assert!(matches!(epoch.state, EpochState::Running));
1581        assert_eq!(epoch.node_name, "B");
1582    }
1583
1584    #[test]
1585    fn test_create_and_start_epoch_validates_node() {
1586        let graph = linear_graph_3();
1587        let mut net = Net::new(graph);
1588
1589        let salvo = Salvo {
1590            salvo_condition: "manual".to_string(),
1591            packets: vec![],
1592        };
1593        let response = net.do_action(&NetAction::CreateAndStartEpoch("NonExistent".to_string(), salvo));
1594        assert!(matches!(response, NetActionResponse::Error(NetActionError::NodeNotFound { .. })));
1595    }
1596
1597    #[test]
1598    fn test_create_and_start_epoch_validates_packet_location() {
1599        let graph = linear_graph_3();
1600        let mut net = Net::new(graph);
1601
1602        // Create packet but leave it OutsideNet
1603        let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1604
1605        let salvo = Salvo {
1606            salvo_condition: "manual".to_string(),
1607            packets: vec![("in".to_string(), packet_id)],
1608        };
1609        let response = net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo));
1610        assert!(matches!(response, NetActionResponse::Error(NetActionError::PacketNotAtInputPort { .. })));
1611    }
1612}