1use crate::_utils::get_utc_now;
10use crate::graph::{Edge, Graph, NodeName, Port, PortSlotSpec, PortName, PortType, PortRef, SalvoConditionName, SalvoConditionTerm, evaluate_salvo_condition};
11use indexmap::IndexSet;
12use std::collections::{HashMap, HashSet};
13use ulid::Ulid;
14
15pub type PacketID = Ulid;
17
18pub type EpochID = Ulid;
20
21#[derive(Debug, PartialEq, Eq, Hash, Clone)]
29pub enum PacketLocation {
30 Node(EpochID),
32 InputPort(NodeName, PortName),
34 OutputPort(EpochID, PortName),
36 Edge(Edge),
38 OutsideNet,
40}
41
42#[derive(Debug)]
44pub struct Packet {
45 pub id: PacketID,
47 pub location: PacketLocation,
49}
50
51#[derive(Debug, Clone)]
57pub struct Salvo {
58 pub salvo_condition: SalvoConditionName,
60 pub packets: Vec<(PortName, PacketID)>,
62}
63
64#[derive(Debug, Clone, PartialEq)]
66pub enum EpochState {
67 Startable,
69 Running,
71 Finished,
73}
74
75#[derive(Debug, Clone)]
81pub struct Epoch {
82 pub id: EpochID,
84 pub node_name: NodeName,
86 pub in_salvo: Salvo,
88 pub out_salvos: Vec<Salvo>,
90 pub state: EpochState,
92}
93
94impl Epoch {
95 pub fn start_time(&self) -> u64 {
97 self.id.timestamp_ms()
98 }
99}
100
101pub type EventUTC = i128;
103
104#[derive(Debug)]
109pub enum NetAction {
110 RunNetUntilBlocked,
113 CreatePacket(Option<EpochID>),
116 ConsumePacket(PacketID),
118 StartEpoch(EpochID),
120 FinishEpoch(EpochID),
122 CancelEpoch(EpochID),
124 CreateAndStartEpoch(NodeName, Salvo),
127 LoadPacketIntoOutputPort(PacketID, PortName),
129 SendOutputSalvo(EpochID, SalvoConditionName),
131 TransportPacketToLocation(PacketID, PacketLocation),
136}
137
138#[derive(Debug, thiserror::Error)]
140pub enum NetActionError {
141 #[error("packet not found: {packet_id}")]
143 PacketNotFound { packet_id: PacketID },
144
145 #[error("epoch not found: {epoch_id}")]
147 EpochNotFound { epoch_id: EpochID },
148
149 #[error("epoch {epoch_id} is not running")]
151 EpochNotRunning { epoch_id: EpochID },
152
153 #[error("epoch {epoch_id} is not startable")]
155 EpochNotStartable { epoch_id: EpochID },
156
157 #[error("cannot finish epoch {epoch_id}: epoch still contains packets")]
159 CannotFinishNonEmptyEpoch { epoch_id: EpochID },
160
161 #[error("packet {packet_id} is not inside epoch {epoch_id}")]
163 PacketNotInNode { packet_id: PacketID, epoch_id: EpochID },
164
165 #[error("output port '{port_name}' not found on node for epoch {epoch_id}")]
167 OutputPortNotFound { port_name: PortName, epoch_id: EpochID },
168
169 #[error("output salvo condition '{condition_name}' not found on node for epoch {epoch_id}")]
171 OutputSalvoConditionNotFound { condition_name: SalvoConditionName, epoch_id: EpochID },
172
173 #[error("max output salvos reached for condition '{condition_name}' on epoch {epoch_id}")]
175 MaxOutputSalvosReached { condition_name: SalvoConditionName, epoch_id: EpochID },
176
177 #[error("salvo condition '{condition_name}' not met for epoch {epoch_id}")]
179 SalvoConditionNotMet { condition_name: SalvoConditionName, epoch_id: EpochID },
180
181 #[error("output port '{port_name}' is full for epoch {epoch_id}")]
183 OutputPortFull { port_name: PortName, epoch_id: EpochID },
184
185 #[error("output port '{port_name}' on node '{node_name}' is not connected to any edge")]
187 CannotPutPacketIntoUnconnectedOutputPort { port_name: PortName, node_name: NodeName },
188
189 #[error("node not found: '{node_name}'")]
191 NodeNotFound { node_name: NodeName },
192
193 #[error("packet {packet_id} is not at input port '{port_name}' of node '{node_name}'")]
195 PacketNotAtInputPort { packet_id: PacketID, port_name: PortName, node_name: NodeName },
196
197 #[error("input port '{port_name}' not found on node '{node_name}'")]
199 InputPortNotFound { port_name: PortName, node_name: NodeName },
200
201 #[error("input port '{port_name}' on node '{node_name}' is full")]
203 InputPortFull { port_name: PortName, node_name: NodeName },
204
205 #[error("cannot move packet {packet_id} out of running epoch {epoch_id}")]
207 CannotMovePacketFromRunningEpoch { packet_id: PacketID, epoch_id: EpochID },
208
209 #[error("cannot move packet {packet_id} into running epoch {epoch_id}")]
211 CannotMovePacketIntoRunningEpoch { packet_id: PacketID, epoch_id: EpochID },
212
213 #[error("edge not found: {edge}")]
215 EdgeNotFound { edge: Edge },
216}
217
218#[derive(Debug, Clone)]
223pub enum NetEvent {
224 PacketCreated(EventUTC, PacketID),
226 PacketConsumed(EventUTC, PacketID),
228 EpochCreated(EventUTC, EpochID),
230 EpochStarted(EventUTC, EpochID),
232 EpochFinished(EventUTC, EpochID),
234 EpochCancelled(EventUTC, EpochID),
236 PacketMoved(EventUTC, PacketID, PacketLocation),
238 InputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
240 OutputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
242}
243
244#[derive(Debug)]
246pub enum NetActionResponseData {
247 Packet(PacketID),
249 StartedEpoch(Epoch),
251 FinishedEpoch(Epoch),
253 CancelledEpoch(Epoch, Vec<PacketID>),
255 None,
257}
258
259#[derive(Debug)]
261pub enum NetActionResponse {
262 Success(NetActionResponseData, Vec<NetEvent>),
264 Error(NetActionError),
266}
267
268#[derive(Debug)]
277pub struct Net {
278 pub graph: Graph,
280 _packets: HashMap<PacketID, Packet>,
281 _packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>>,
282 _epochs: HashMap<EpochID, Epoch>,
283 _startable_epochs: HashSet<EpochID>,
284 _node_to_epochs: HashMap<NodeName, Vec<EpochID>>,
285}
286
287impl Net {
288 pub fn new(graph: Graph) -> Self {
292 let mut packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>> = HashMap::new();
293
294 for edge in graph.edges() {
296 packets_by_location.insert(PacketLocation::Edge(edge.clone()), IndexSet::new());
297 }
298
299 for (node_name, node) in graph.nodes() {
301 for port_name in node.in_ports.keys() {
302 packets_by_location.insert(
303 PacketLocation::InputPort(node_name.clone(), port_name.clone()),
304 IndexSet::new(),
305 );
306 }
307 }
308
309 packets_by_location.insert(PacketLocation::OutsideNet, IndexSet::new());
311
312 Net {
316 graph,
317 _packets: HashMap::new(),
318 _packets_by_location: packets_by_location,
319 _epochs: HashMap::new(),
320 _startable_epochs: HashSet::new(),
321 _node_to_epochs: HashMap::new(),
322 }
323 }
324
325 fn move_packet(&mut self, packet_id: &PacketID, new_location: PacketLocation) {
326 let packet = self._packets.get_mut(&packet_id).unwrap();
327 let packets_at_old_location = self._packets_by_location.get_mut(&packet.location)
328 .expect("Packet location has no entry in self._packets_by_location.");
329 packets_at_old_location.shift_remove(packet_id);
330 packet.location = new_location;
331 if !self._packets_by_location.get_mut(&packet.location)
332 .expect("Packet location has no entry in self._packets_by_location")
333 .insert(packet_id.clone()) {
334 panic!("Attempted to move packet to a location that already contains it.");
335 }
336 }
337
338 fn run_until_blocked(&mut self) -> NetActionResponse {
341 let mut all_events: Vec<NetEvent> = Vec::new();
342
343 loop {
344 let mut made_progress = false;
345
346 struct EdgeMoveCandidate {
349 packet_id: PacketID,
350 target_node_name: NodeName,
351 input_port_location: PacketLocation,
352 can_move: bool,
353 }
354
355 let mut edge_candidates: Vec<EdgeMoveCandidate> = Vec::new();
356
357 for (location, packets) in &self._packets_by_location {
359 if let PacketLocation::Edge(edge_ref) = location {
360 if let Some(first_packet_id) = packets.first() {
362 let target_node_name = edge_ref.target.node_name.clone();
363 let target_port_name = edge_ref.target.port_name.clone();
364
365 let node = self.graph.nodes().get(&target_node_name)
367 .expect("Edge targets a non-existent node");
368 let port = node.in_ports.get(&target_port_name)
369 .expect("Edge targets a non-existent input port");
370
371 let input_port_location = PacketLocation::InputPort(target_node_name.clone(), target_port_name.clone());
372 let current_count = self._packets_by_location
373 .get(&input_port_location)
374 .map(|packets| packets.len() as u64)
375 .unwrap_or(0);
376
377 let can_move = match port.slots_spec {
378 PortSlotSpec::Infinite => true,
379 PortSlotSpec::Finite(max_slots) => current_count < max_slots,
380 };
381
382 edge_candidates.push(EdgeMoveCandidate {
383 packet_id: first_packet_id.clone(),
384 target_node_name,
385 input_port_location,
386 can_move,
387 });
388 }
389 }
390 }
391
392 for candidate in edge_candidates {
394 if !candidate.can_move {
395 continue;
396 }
397
398 self.move_packet(&candidate.packet_id, candidate.input_port_location.clone());
400 all_events.push(NetEvent::PacketMoved(get_utc_now(), candidate.packet_id.clone(), candidate.input_port_location.clone()));
401 made_progress = true;
402
403 let node = self.graph.nodes().get(&candidate.target_node_name)
406 .expect("Edge targets a non-existent node");
407
408 let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
409 let in_ports_clone: HashMap<PortName, Port> = node.in_ports.iter()
410 .map(|(k, v)| (k.clone(), Port { slots_spec: match v.slots_spec {
411 PortSlotSpec::Infinite => PortSlotSpec::Infinite,
412 PortSlotSpec::Finite(n) => PortSlotSpec::Finite(n),
413 }}))
414 .collect();
415
416 struct SalvoConditionData {
418 name: SalvoConditionName,
419 ports: Vec<PortName>,
420 term: SalvoConditionTerm,
421 }
422
423 let salvo_conditions: Vec<SalvoConditionData> = node.in_salvo_conditions.iter()
424 .map(|(name, cond)| SalvoConditionData {
425 name: name.clone(),
426 ports: cond.ports.clone(),
427 term: cond.term.clone(),
428 })
429 .collect();
430
431 for salvo_cond_data in salvo_conditions {
433 let port_packet_counts: HashMap<PortName, u64> = in_port_names.iter()
435 .map(|port_name| {
436 let count = self._packets_by_location
437 .get(&PacketLocation::InputPort(candidate.target_node_name.clone(), port_name.clone()))
438 .map(|packets| packets.len() as u64)
439 .unwrap_or(0);
440 (port_name.clone(), count)
441 })
442 .collect();
443
444 if evaluate_salvo_condition(&salvo_cond_data.term, &port_packet_counts, &in_ports_clone) {
446 let epoch_id = Ulid::new();
448
449 let mut salvo_packets: Vec<(PortName, PacketID)> = Vec::new();
451 let mut packets_to_move: Vec<(PacketID, PortName)> = Vec::new();
452
453 for port_name in &salvo_cond_data.ports {
454 let port_location = PacketLocation::InputPort(candidate.target_node_name.clone(), port_name.clone());
455 if let Some(packet_ids) = self._packets_by_location.get(&port_location) {
456 for pid in packet_ids.iter() {
457 salvo_packets.push((port_name.clone(), pid.clone()));
458 packets_to_move.push((pid.clone(), port_name.clone()));
459 }
460 }
461 }
462
463 let in_salvo = Salvo {
465 salvo_condition: salvo_cond_data.name.clone(),
466 packets: salvo_packets,
467 };
468
469 let epoch = Epoch {
471 id: epoch_id.clone(),
472 node_name: candidate.target_node_name.clone(),
473 in_salvo,
474 out_salvos: Vec::new(),
475 state: EpochState::Startable,
476 };
477
478 self._epochs.insert(epoch_id.clone(), epoch);
480 self._startable_epochs.insert(epoch_id.clone());
481 self._node_to_epochs
482 .entry(candidate.target_node_name.clone())
483 .or_insert_with(Vec::new)
484 .push(epoch_id.clone());
485
486 let epoch_location = PacketLocation::Node(epoch_id.clone());
488 self._packets_by_location.insert(epoch_location.clone(), IndexSet::new());
489
490 let node = self.graph.nodes().get(&candidate.target_node_name)
492 .expect("Node not found for epoch creation");
493 for port_name in node.out_ports.keys() {
494 let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
495 self._packets_by_location.insert(output_port_location, IndexSet::new());
496 }
497
498 for (pid, _port_name) in &packets_to_move {
500 self.move_packet(pid, epoch_location.clone());
501 all_events.push(NetEvent::PacketMoved(get_utc_now(), pid.clone(), epoch_location.clone()));
502 }
503
504 all_events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id.clone()));
505 all_events.push(NetEvent::InputSalvoTriggered(get_utc_now(), epoch_id.clone(), salvo_cond_data.name.clone()));
506
507 break;
509 }
510 }
511 }
512
513 if !made_progress {
515 break;
516 }
517 }
518
519 NetActionResponse::Success(NetActionResponseData::None, all_events)
520 }
521
522 fn create_packet(&mut self, maybe_epoch_id: &Option<EpochID>) -> NetActionResponse {
523 if let Some(epoch_id) = maybe_epoch_id {
525 if !self._epochs.contains_key(&epoch_id) {
526 return NetActionResponse::Error(NetActionError::EpochNotFound {
527 epoch_id: epoch_id.clone(),
528 });
529 }
530 if !matches!(self._epochs[&epoch_id].state, EpochState::Running) {
531 return NetActionResponse::Error(NetActionError::EpochNotRunning {
532 epoch_id: epoch_id.clone(),
533 });
534 }
535 }
536
537 let packet_location = match maybe_epoch_id {
538 Some(epoch_id) => PacketLocation::Node(epoch_id.clone()),
539 None => PacketLocation::OutsideNet,
540 };
541
542 let packet = Packet {
543 id: Ulid::new(),
544 location: packet_location.clone(),
545 };
546
547 let packet_id = packet.id.clone();
548 self._packets.insert(packet.id.clone(), packet);
549
550 self._packets_by_location
552 .entry(packet_location)
553 .or_insert_with(IndexSet::new)
554 .insert(packet_id.clone());
555
556 NetActionResponse::Success(
557 NetActionResponseData::Packet(packet_id),
558 vec![NetEvent::PacketCreated(get_utc_now(), packet_id)]
559 )
560 }
561
562 fn consume_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
563 if !self._packets.contains_key(packet_id) {
564 return NetActionResponse::Error(NetActionError::PacketNotFound {
565 packet_id: packet_id.clone(),
566 });
567 }
568
569 if let Some(packets) = self
570 ._packets_by_location
571 .get_mut(&self._packets[packet_id].location)
572 {
573 if packets.shift_remove(packet_id) {
574 self._packets.remove(packet_id);
575 NetActionResponse::Success(
576 NetActionResponseData::None,
577 vec![NetEvent::PacketConsumed(get_utc_now(), packet_id.clone())]
578 )
579 } else {
580 panic!(
581 "Packet with ID {} not found in location {:?}",
582 packet_id, self._packets[packet_id].location
583 );
584 }
585 } else {
586 panic!("Packet location {:?} not found", self._packets[packet_id].location);
587 }
588 }
589
590 fn start_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
591 if let Some(epoch) = self._epochs.get_mut(epoch_id) {
592 if !self._startable_epochs.contains(epoch_id) {
593 return NetActionResponse::Error(NetActionError::EpochNotStartable {
594 epoch_id: epoch_id.clone(),
595 });
596 }
597 debug_assert!(matches!(epoch.state, EpochState::Startable),
598 "Epoch state is not Startable but was in net._startable_epochs.");
599 epoch.state = EpochState::Running;
600 self._startable_epochs.remove(epoch_id);
601 NetActionResponse::Success(
602 NetActionResponseData::StartedEpoch(epoch.clone()),
603 vec![NetEvent::EpochStarted(get_utc_now(), epoch_id.clone())]
604 )
605 } else {
606 return NetActionResponse::Error(NetActionError::EpochNotFound {
607 epoch_id: epoch_id.clone(),
608 });
609 }
610 }
611
612 fn finish_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
613 if let Some(epoch) = self._epochs.get(&epoch_id) {
614 if let EpochState::Running = epoch.state {
615 let epoch_loc = PacketLocation::Node(epoch_id.clone());
617 if let Some(packets) = self._packets_by_location.get(&epoch_loc) {
618 if packets.len() > 0 {
619 return NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch {
620 epoch_id: epoch_id.clone(),
621 });
622 }
623
624 let mut epoch = self._epochs.remove(&epoch_id).unwrap();
625 epoch.state = EpochState::Finished;
626 self._packets_by_location.remove(&epoch_loc);
627 NetActionResponse::Success(
628 NetActionResponseData::FinishedEpoch(epoch),
629 vec![NetEvent::EpochFinished(get_utc_now(), epoch_id.clone())]
630 )
631 } else {
632 panic!("Epoch {} not found in location {:?}", epoch_id, epoch_loc);
633 }
634 } else {
635 return NetActionResponse::Error(NetActionError::EpochNotRunning {
636 epoch_id: epoch_id.clone(),
637 });
638 }
639 } else {
640 return NetActionResponse::Error(NetActionError::EpochNotFound {
641 epoch_id: epoch_id.clone(),
642 });
643 }
644 }
645
646 fn cancel_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
647 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
649 epoch.clone()
650 } else {
651 return NetActionResponse::Error(NetActionError::EpochNotFound {
652 epoch_id: epoch_id.clone(),
653 });
654 };
655
656 let mut events: Vec<NetEvent> = Vec::new();
657 let mut destroyed_packets: Vec<PacketID> = Vec::new();
658
659 let epoch_location = PacketLocation::Node(epoch_id.clone());
661 if let Some(packet_ids) = self._packets_by_location.get(&epoch_location) {
662 destroyed_packets.extend(packet_ids.iter().cloned());
663 }
664
665 let node = self.graph.nodes().get(&epoch.node_name)
667 .expect("Epoch references non-existent node");
668 for port_name in node.out_ports.keys() {
669 let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
670 if let Some(packet_ids) = self._packets_by_location.get(&output_port_location) {
671 destroyed_packets.extend(packet_ids.iter().cloned());
672 }
673 }
674
675 for packet_id in &destroyed_packets {
677 let packet = self._packets.remove(packet_id)
678 .expect("Packet in location map not found in packets map");
679 if let Some(packets_at_location) = self._packets_by_location.get_mut(&packet.location) {
680 packets_at_location.shift_remove(packet_id);
681 }
682 events.push(NetEvent::PacketConsumed(get_utc_now(), packet_id.clone()));
683 }
684
685 for port_name in node.out_ports.keys() {
687 let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
688 self._packets_by_location.remove(&output_port_location);
689 }
690
691 self._packets_by_location.remove(&epoch_location);
693
694 self._startable_epochs.remove(epoch_id);
696
697 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
699 epoch_ids.retain(|id| id != epoch_id);
700 }
701
702 let epoch = self._epochs.remove(epoch_id)
704 .expect("Epoch should exist");
705
706 events.push(NetEvent::EpochCancelled(get_utc_now(), epoch_id.clone()));
707
708 NetActionResponse::Success(
709 NetActionResponseData::CancelledEpoch(epoch, destroyed_packets),
710 events
711 )
712 }
713
714 fn create_and_start_epoch(&mut self, node_name: &NodeName, salvo: &Salvo) -> NetActionResponse {
715 let node = match self.graph.nodes().get(node_name) {
717 Some(node) => node,
718 None => {
719 return NetActionResponse::Error(NetActionError::NodeNotFound {
720 node_name: node_name.clone(),
721 });
722 }
723 };
724
725 for (port_name, packet_id) in &salvo.packets {
727 if !node.in_ports.contains_key(port_name) {
729 return NetActionResponse::Error(NetActionError::InputPortNotFound {
730 port_name: port_name.clone(),
731 node_name: node_name.clone(),
732 });
733 }
734
735 let packet = match self._packets.get(packet_id) {
737 Some(packet) => packet,
738 None => {
739 return NetActionResponse::Error(NetActionError::PacketNotFound {
740 packet_id: packet_id.clone(),
741 });
742 }
743 };
744
745 let expected_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
747 if packet.location != expected_location {
748 return NetActionResponse::Error(NetActionError::PacketNotAtInputPort {
749 packet_id: packet_id.clone(),
750 port_name: port_name.clone(),
751 node_name: node_name.clone(),
752 });
753 }
754 }
755
756 let mut events: Vec<NetEvent> = Vec::new();
757
758 let epoch_id = Ulid::new();
760 let epoch = Epoch {
761 id: epoch_id.clone(),
762 node_name: node_name.clone(),
763 in_salvo: salvo.clone(),
764 out_salvos: Vec::new(),
765 state: EpochState::Running,
766 };
767
768 self._epochs.insert(epoch_id.clone(), epoch.clone());
770 self._node_to_epochs
771 .entry(node_name.clone())
772 .or_insert_with(Vec::new)
773 .push(epoch_id.clone());
774
775 let epoch_location = PacketLocation::Node(epoch_id.clone());
777 self._packets_by_location.insert(epoch_location.clone(), IndexSet::new());
778
779 for port_name in node.out_ports.keys() {
781 let output_port_location = PacketLocation::OutputPort(epoch_id.clone(), port_name.clone());
782 self._packets_by_location.insert(output_port_location, IndexSet::new());
783 }
784
785 events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id.clone()));
786
787 for (_, packet_id) in &salvo.packets {
789 self.move_packet(packet_id, epoch_location.clone());
790 events.push(NetEvent::PacketMoved(get_utc_now(), packet_id.clone(), epoch_location.clone()));
791 }
792
793 events.push(NetEvent::EpochStarted(get_utc_now(), epoch_id.clone()));
794
795 NetActionResponse::Success(
796 NetActionResponseData::StartedEpoch(epoch),
797 events
798 )
799 }
800
801 fn load_packet_into_output_port(&mut self, packet_id: &PacketID, port_name: &String) -> NetActionResponse {
802 let (epoch_id, old_location) = if let Some(packet) = self._packets.get(packet_id) {
803 if let PacketLocation::Node(epoch_id) = packet.location {
804 (epoch_id, packet.location.clone())
805 } else {
806 return NetActionResponse::Error(NetActionError::PacketNotInNode {
809 packet_id: packet_id.clone(),
810 epoch_id: Ulid::nil(), })
812 }
813 } else {
814 return NetActionResponse::Error(NetActionError::PacketNotFound {
815 packet_id: packet_id.clone(),
816 });
817 };
818
819 let node_name = self._epochs.get(&epoch_id)
820 .expect("The epoch id in the location of a packet could not be found.")
821 .node_name.clone();
822 let node = self.graph.nodes().get(&node_name)
823 .expect("Packet located in a non-existing node (yet the node has an epoch).");
824
825 if !node.out_ports.contains_key(port_name) {
826 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
827 port_name: port_name.clone(),
828 epoch_id: epoch_id.clone(),
829 })
830 }
831
832 let port = node.out_ports.get(port_name).unwrap();
833 let port_packets = self._packets_by_location.get(&old_location)
834 .expect("No entry in Net._packets_by_location found for output port.");
835
836 if let PortSlotSpec::Finite(num_slots) = port.slots_spec {
838 if num_slots >= port_packets.len() as u64 {
839 return NetActionResponse::Error(NetActionError::OutputPortFull {
840 port_name: port_name.clone(),
841 epoch_id: epoch_id.clone(),
842 })
843 }
844 }
845
846 let new_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
847 self.move_packet(&packet_id, new_location.clone());
848 NetActionResponse::Success(
849 NetActionResponseData::None,
850 vec![NetEvent::PacketMoved(get_utc_now(), epoch_id, new_location)]
851 )
852 }
853
854 fn send_output_salvo(&mut self, epoch_id: &EpochID, salvo_condition_name: &SalvoConditionName) -> NetActionResponse {
855 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
857 epoch
858 } else {
859 return NetActionResponse::Error(NetActionError::EpochNotFound {
860 epoch_id: epoch_id.clone(),
861 });
862 };
863
864 let node = self.graph.nodes().get(&epoch.node_name)
866 .expect("Node associated with epoch could not be found.");
867
868 let salvo_condition = if let Some(salvo_condition) = node.out_salvo_conditions.get(salvo_condition_name) {
870 salvo_condition
871 } else {
872 return NetActionResponse::Error(NetActionError::OutputSalvoConditionNotFound {
873 condition_name: salvo_condition_name.clone(),
874 epoch_id: epoch_id.clone(),
875 });
876 };
877
878 if salvo_condition.max_salvos > 0 && epoch.out_salvos.len() as u64 >= salvo_condition.max_salvos {
880 return NetActionResponse::Error(NetActionError::MaxOutputSalvosReached {
881 condition_name: salvo_condition_name.clone(),
882 epoch_id: epoch_id.clone(),
883 });
884 }
885
886 let port_packet_counts: HashMap<PortName, u64> = node.out_ports.keys()
888 .map(|port_name| {
889 let count = self._packets_by_location
890 .get(&PacketLocation::OutputPort(epoch_id.clone(), port_name.clone()))
891 .map(|packets| packets.len() as u64)
892 .unwrap_or(0);
893 (port_name.clone(), count)
894 })
895 .collect();
896 if !evaluate_salvo_condition(&salvo_condition.term, &port_packet_counts, &node.out_ports) {
897 return NetActionResponse::Error(NetActionError::SalvoConditionNotMet {
898 condition_name: salvo_condition_name.clone(),
899 epoch_id: epoch_id.clone(),
900 });
901 }
902
903 let mut packets_to_move: Vec<(PacketID, PortName, PacketLocation)> = Vec::new();
905 for port_name in &salvo_condition.ports {
906 let packets = self._packets_by_location.get(&PacketLocation::OutputPort(epoch_id.clone(), port_name.clone()))
907 .expect(format!("Output port '{}' of node '{}' does not have an entry in self._packets_by_location", port_name, node.name.clone()).as_str())
908 .clone();
909 let edge_ref = if let Some(edge_ref) = self.graph.get_edge_by_tail(&PortRef { node_name: node.name.clone(), port_type: PortType::Output, port_name: port_name.clone() }) {
910 edge_ref.clone()
911 } else {
912 return NetActionResponse::Error(NetActionError::CannotPutPacketIntoUnconnectedOutputPort {
913 port_name: port_name.clone(),
914 node_name: node.name.clone(),
915 });
916 };
917 let new_location = PacketLocation::Edge(edge_ref.clone());
918 for packet_id in packets {
919 packets_to_move.push((packet_id.clone(), port_name.clone(), new_location.clone()));
920 }
921 }
922
923 let salvo = Salvo {
925 salvo_condition: salvo_condition_name.clone(),
926 packets: packets_to_move.iter().map(|(packet_id, port_name, _)| {
927 (port_name.clone(), packet_id.clone())
928 }).collect()
929 };
930 self._epochs.get_mut(&epoch_id).unwrap().out_salvos.push(salvo);
931
932 let mut net_events = Vec::new();
934 for (packet_id, _port_name, new_location) in packets_to_move {
935 net_events.push(NetEvent::PacketMoved(get_utc_now(), packet_id.clone(), new_location.clone()));
936 self.move_packet(&packet_id, new_location);
937 }
938
939 NetActionResponse::Success(
940 NetActionResponseData::None,
941 net_events
942 )
943 }
944
945 fn transport_packet_to_location(&mut self, packet_id: &PacketID, destination: &PacketLocation) -> NetActionResponse {
946 let packet = if let Some(p) = self._packets.get(packet_id) {
948 p
949 } else {
950 return NetActionResponse::Error(NetActionError::PacketNotFound {
951 packet_id: packet_id.clone(),
952 });
953 };
954 let current_location = packet.location.clone();
955
956 match ¤t_location {
958 PacketLocation::Node(epoch_id) => {
959 if let Some(epoch) = self._epochs.get(epoch_id) {
960 if epoch.state == EpochState::Running {
961 return NetActionResponse::Error(NetActionError::CannotMovePacketFromRunningEpoch {
962 packet_id: packet_id.clone(),
963 epoch_id: epoch_id.clone(),
964 });
965 }
966 }
967 }
968 PacketLocation::OutputPort(epoch_id, _) => {
969 if let Some(epoch) = self._epochs.get(epoch_id) {
970 if epoch.state == EpochState::Running {
971 return NetActionResponse::Error(NetActionError::CannotMovePacketFromRunningEpoch {
972 packet_id: packet_id.clone(),
973 epoch_id: epoch_id.clone(),
974 });
975 }
976 }
977 }
978 _ => {}
979 }
980
981 match destination {
983 PacketLocation::Node(epoch_id) => {
984 if let Some(epoch) = self._epochs.get(epoch_id) {
985 if epoch.state == EpochState::Running {
986 return NetActionResponse::Error(NetActionError::CannotMovePacketIntoRunningEpoch {
987 packet_id: packet_id.clone(),
988 epoch_id: epoch_id.clone(),
989 });
990 }
991 } else {
992 return NetActionResponse::Error(NetActionError::EpochNotFound {
993 epoch_id: epoch_id.clone(),
994 });
995 }
996 }
997 PacketLocation::OutputPort(epoch_id, port_name) => {
998 if let Some(epoch) = self._epochs.get(epoch_id) {
999 if epoch.state == EpochState::Running {
1000 return NetActionResponse::Error(NetActionError::CannotMovePacketIntoRunningEpoch {
1001 packet_id: packet_id.clone(),
1002 epoch_id: epoch_id.clone(),
1003 });
1004 }
1005 let node = self.graph.nodes().get(&epoch.node_name)
1007 .expect("Node associated with epoch could not be found.");
1008 if !node.out_ports.contains_key(port_name) {
1009 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1010 port_name: port_name.clone(),
1011 epoch_id: epoch_id.clone(),
1012 });
1013 }
1014 } else {
1015 return NetActionResponse::Error(NetActionError::EpochNotFound {
1016 epoch_id: epoch_id.clone(),
1017 });
1018 }
1019 }
1020 PacketLocation::InputPort(node_name, port_name) => {
1021 let node = if let Some(n) = self.graph.nodes().get(node_name) {
1023 n
1024 } else {
1025 return NetActionResponse::Error(NetActionError::NodeNotFound {
1026 node_name: node_name.clone(),
1027 });
1028 };
1029 let port = if let Some(p) = node.in_ports.get(port_name) {
1031 p
1032 } else {
1033 return NetActionResponse::Error(NetActionError::InputPortNotFound {
1034 port_name: port_name.clone(),
1035 node_name: node_name.clone(),
1036 });
1037 };
1038 let current_count = self._packets_by_location
1040 .get(destination)
1041 .map(|s| s.len())
1042 .unwrap_or(0);
1043 let is_full = match &port.slots_spec {
1044 PortSlotSpec::Infinite => false,
1045 PortSlotSpec::Finite(capacity) => current_count >= *capacity as usize,
1046 };
1047 if is_full {
1048 return NetActionResponse::Error(NetActionError::InputPortFull {
1049 port_name: port_name.clone(),
1050 node_name: node_name.clone(),
1051 });
1052 }
1053 }
1054 PacketLocation::Edge(edge) => {
1055 if !self.graph.edges().contains(edge) {
1057 return NetActionResponse::Error(NetActionError::EdgeNotFound {
1058 edge: edge.clone(),
1059 });
1060 }
1061 }
1062 PacketLocation::OutsideNet => {
1063 }
1065 }
1066
1067 self.move_packet(packet_id, destination.clone());
1069
1070 NetActionResponse::Success(
1071 NetActionResponseData::None,
1072 vec![NetEvent::PacketMoved(get_utc_now(), packet_id.clone(), destination.clone())]
1073 )
1074 }
1075
1076 pub fn do_action(&mut self, action: &NetAction) -> NetActionResponse {
1108 match action {
1109 NetAction::RunNetUntilBlocked => self.run_until_blocked(),
1110 NetAction::CreatePacket(maybe_epoch_id) => self.create_packet(maybe_epoch_id),
1111 NetAction::ConsumePacket(packet_id) => self.consume_packet(packet_id),
1112 NetAction::StartEpoch(epoch_id) => self.start_epoch(epoch_id),
1113 NetAction::FinishEpoch(epoch_id) => self.finish_epoch(epoch_id),
1114 NetAction::CancelEpoch(epoch_id) => self.cancel_epoch(epoch_id),
1115 NetAction::CreateAndStartEpoch(node_name, salvo) => self.create_and_start_epoch(node_name, salvo),
1116 NetAction::LoadPacketIntoOutputPort(packet_id, port_name) => self.load_packet_into_output_port(packet_id, port_name),
1117 NetAction::SendOutputSalvo(epoch_id, salvo_condition_name) => self.send_output_salvo(epoch_id, salvo_condition_name),
1118 NetAction::TransportPacketToLocation(packet_id, location) => self.transport_packet_to_location(packet_id, location),
1119 }
1120 }
1121
1122 pub fn packet_count_at(&self, location: &PacketLocation) -> usize {
1126 self._packets_by_location.get(location).map(|s| s.len()).unwrap_or(0)
1127 }
1128
1129 pub fn get_packets_at_location(&self, location: &PacketLocation) -> Vec<PacketID> {
1131 self._packets_by_location
1132 .get(location)
1133 .map(|s| s.iter().cloned().collect())
1134 .unwrap_or_default()
1135 }
1136
1137 pub fn get_epoch(&self, epoch_id: &EpochID) -> Option<&Epoch> {
1139 self._epochs.get(epoch_id)
1140 }
1141
1142 pub fn get_startable_epochs(&self) -> Vec<EpochID> {
1144 self._startable_epochs.iter().cloned().collect()
1145 }
1146
1147 pub fn get_packet(&self, packet_id: &PacketID) -> Option<&Packet> {
1149 self._packets.get(packet_id)
1150 }
1151
1152 #[cfg(test)]
1155 pub fn startable_epoch_ids(&self) -> Vec<EpochID> {
1156 self.get_startable_epochs()
1157 }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162 use super::*;
1163 use crate::test_fixtures::*;
1164
1165 fn get_packet_id(response: &NetActionResponse) -> PacketID {
1167 match response {
1168 NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id.clone(),
1169 _ => panic!("Expected Packet response, got: {:?}", response),
1170 }
1171 }
1172
1173 fn get_started_epoch(response: &NetActionResponse) -> Epoch {
1175 match response {
1176 NetActionResponse::Success(NetActionResponseData::StartedEpoch(epoch), _) => epoch.clone(),
1177 _ => panic!("Expected StartedEpoch response, got: {:?}", response),
1178 }
1179 }
1180
1181 #[test]
1184 fn test_create_packet_outside_net() {
1185 let graph = linear_graph_3();
1186 let mut net = Net::new(graph);
1187
1188 let response = net.do_action(&NetAction::CreatePacket(None));
1189 assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::Packet(_), _)));
1190 }
1191
1192 #[test]
1193 fn test_consume_packet() {
1194 let graph = linear_graph_3();
1195 let mut net = Net::new(graph);
1196
1197 let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1199
1200 let response = net.do_action(&NetAction::ConsumePacket(packet_id));
1202 assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::None, _)));
1203 }
1204
1205 #[test]
1206 fn test_consume_nonexistent_packet_fails() {
1207 let graph = linear_graph_3();
1208 let mut net = Net::new(graph);
1209
1210 let fake_id = Ulid::new();
1211 let response = net.do_action(&NetAction::ConsumePacket(fake_id));
1212 assert!(matches!(response, NetActionResponse::Error(NetActionError::PacketNotFound { .. })));
1213 }
1214
1215 #[test]
1218 fn test_epoch_lifecycle_via_run_until_blocked() {
1219 let graph = linear_graph_3();
1220 let mut net = Net::new(graph);
1221
1222 let response = net.do_action(&NetAction::CreatePacket(None));
1224 let packet_id = get_packet_id(&response);
1225
1226 let edge_location = PacketLocation::Edge(Edge {
1228 source: PortRef {
1229 node_name: "A".to_string(),
1230 port_type: PortType::Output,
1231 port_name: "out".to_string(),
1232 },
1233 target: PortRef {
1234 node_name: "B".to_string(),
1235 port_type: PortType::Input,
1236 port_name: "in".to_string(),
1237 },
1238 });
1239 net._packets.get_mut(&packet_id).unwrap().location = edge_location.clone();
1240 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1241 net._packets_by_location.get_mut(&edge_location).unwrap().insert(packet_id.clone());
1242
1243 net.do_action(&NetAction::RunNetUntilBlocked);
1245
1246 let startable = net.startable_epoch_ids();
1248 assert_eq!(startable.len(), 1);
1249
1250 let epoch_id = startable[0].clone();
1252 let epoch = get_started_epoch(&net.do_action(&NetAction::StartEpoch(epoch_id.clone())));
1253 assert!(matches!(epoch.state, EpochState::Running));
1254
1255 net.do_action(&NetAction::ConsumePacket(packet_id));
1257
1258 let response = net.do_action(&NetAction::FinishEpoch(epoch_id));
1260 assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::FinishedEpoch(_), _)));
1261 }
1262
1263 #[test]
1264 fn test_cannot_start_nonexistent_epoch() {
1265 let graph = linear_graph_3();
1266 let mut net = Net::new(graph);
1267
1268 let fake_id = Ulid::new();
1269 let response = net.do_action(&NetAction::StartEpoch(fake_id));
1270 assert!(matches!(response, NetActionResponse::Error(NetActionError::EpochNotFound { .. })));
1271 }
1272
1273 #[test]
1274 fn test_cannot_finish_epoch_with_packets() {
1275 let graph = linear_graph_3();
1276 let mut net = Net::new(graph);
1277
1278 let response = net.do_action(&NetAction::CreatePacket(None));
1281 let packet_id = get_packet_id(&response);
1282
1283 let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1284 net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1285 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1286 net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1287
1288 let salvo = Salvo {
1290 salvo_condition: "manual".to_string(),
1291 packets: vec![("in".to_string(), packet_id)],
1292 };
1293 let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1294
1295 let response = net.do_action(&NetAction::FinishEpoch(epoch.id));
1297 assert!(matches!(response, NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch { .. })));
1298 }
1299
1300 #[test]
1303 fn test_cancel_epoch_destroys_packets() {
1304 let graph = linear_graph_3();
1305 let mut net = Net::new(graph);
1306
1307 let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1309 let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1310 net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1311 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1312 net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1313
1314 let salvo = Salvo {
1316 salvo_condition: "manual".to_string(),
1317 packets: vec![("in".to_string(), packet_id.clone())],
1318 };
1319 let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1320
1321 let response = net.do_action(&NetAction::CancelEpoch(epoch.id));
1323 match response {
1324 NetActionResponse::Success(NetActionResponseData::CancelledEpoch(_, destroyed), _) => {
1325 assert_eq!(destroyed.len(), 1);
1326 assert_eq!(destroyed[0], packet_id);
1327 }
1328 _ => panic!("Expected CancelledEpoch response"),
1329 }
1330
1331 assert!(!net._packets.contains_key(&packet_id));
1333 }
1334
1335 #[test]
1338 fn test_run_until_blocked_moves_packet_to_input_port() {
1339 let graph = linear_graph_3();
1340 let mut net = Net::new(graph);
1341
1342 let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1344 let edge_location = PacketLocation::Edge(Edge {
1345 source: PortRef {
1346 node_name: "A".to_string(),
1347 port_type: PortType::Output,
1348 port_name: "out".to_string(),
1349 },
1350 target: PortRef {
1351 node_name: "B".to_string(),
1352 port_type: PortType::Input,
1353 port_name: "in".to_string(),
1354 },
1355 });
1356 net._packets.get_mut(&packet_id).unwrap().location = edge_location.clone();
1357 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1358 net._packets_by_location.get_mut(&edge_location).unwrap().insert(packet_id.clone());
1359
1360 net.do_action(&NetAction::RunNetUntilBlocked);
1362
1363 assert_eq!(net.startable_epoch_ids().len(), 1);
1365 }
1366
1367 #[test]
1368 fn test_run_until_blocked_respects_port_capacity() {
1369 use std::collections::HashMap;
1372 use crate::graph::Node;
1373 let node_b = Node {
1374 name: "B".to_string(),
1375 in_ports: {
1376 let mut ports = HashMap::new();
1377 ports.insert("in".to_string(), Port { slots_spec: PortSlotSpec::Finite(1) });
1378 ports
1379 },
1380 out_ports: HashMap::new(),
1381 in_salvo_conditions: HashMap::new(), out_salvo_conditions: HashMap::new(),
1383 };
1384
1385 let nodes = vec![
1386 simple_node("A", vec![], vec!["out"]),
1387 node_b,
1388 ];
1389 let edges = vec![edge("A", "out", "B", "in")];
1390 let graph = Graph::new(nodes, edges);
1391 let mut net = Net::new(graph);
1392
1393 let edge_location = PacketLocation::Edge(Edge {
1395 source: PortRef {
1396 node_name: "A".to_string(),
1397 port_type: PortType::Output,
1398 port_name: "out".to_string(),
1399 },
1400 target: PortRef {
1401 node_name: "B".to_string(),
1402 port_type: PortType::Input,
1403 port_name: "in".to_string(),
1404 },
1405 });
1406
1407 let packet1 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1408 let packet2 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1409
1410 for pid in [&packet1, &packet2] {
1412 net._packets.get_mut(pid).unwrap().location = edge_location.clone();
1413 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(pid);
1414 net._packets_by_location.get_mut(&edge_location).unwrap().insert(pid.clone());
1415 }
1416
1417 net.do_action(&NetAction::RunNetUntilBlocked);
1419
1420 assert_eq!(net.startable_epoch_ids().len(), 0);
1422 let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1423 assert_eq!(net.packet_count_at(&input_port_loc), 1);
1424 assert_eq!(net.packet_count_at(&edge_location), 1);
1425 }
1426
1427 #[test]
1428 fn test_fifo_packet_ordering() {
1429 let graph = linear_graph_3();
1432 let mut net = Net::new(graph);
1433
1434 let edge_location = PacketLocation::Edge(Edge {
1435 source: PortRef {
1436 node_name: "A".to_string(),
1437 port_type: PortType::Output,
1438 port_name: "out".to_string(),
1439 },
1440 target: PortRef {
1441 node_name: "B".to_string(),
1442 port_type: PortType::Input,
1443 port_name: "in".to_string(),
1444 },
1445 });
1446
1447 let packet1 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1449 let packet2 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1450 let packet3 = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1451
1452 for pid in [&packet1, &packet2, &packet3] {
1454 net._packets.get_mut(pid).unwrap().location = edge_location.clone();
1455 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(pid);
1456 net._packets_by_location.get_mut(&edge_location).unwrap().insert(pid.clone());
1457 }
1458
1459 let response = net.do_action(&NetAction::RunNetUntilBlocked);
1461
1462 let events = match response {
1464 NetActionResponse::Success(_, events) => events,
1465 _ => panic!("Expected success response"),
1466 };
1467
1468 let packet_move_order: Vec<PacketID> = events.iter()
1471 .filter_map(|event| {
1472 if let NetEvent::PacketMoved(_, packet_id, PacketLocation::InputPort(_, _)) = event {
1473 Some(packet_id.clone())
1474 } else {
1475 None
1476 }
1477 })
1478 .collect();
1479
1480 assert_eq!(packet_move_order.len(), 3, "Expected 3 packets to move to input port");
1482 assert_eq!(packet_move_order[0], packet1, "First packet to move should be packet1");
1483 assert_eq!(packet_move_order[1], packet2, "Second packet to move should be packet2");
1484 assert_eq!(packet_move_order[2], packet3, "Third packet to move should be packet3");
1485 }
1486
1487 #[test]
1490 fn test_load_packet_into_output_port() {
1491 let graph = linear_graph_3();
1492 let mut net = Net::new(graph);
1493
1494 let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1496 let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1497 net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1498 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1499 net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1500
1501 let salvo = Salvo {
1503 salvo_condition: "manual".to_string(),
1504 packets: vec![("in".to_string(), packet_id.clone())],
1505 };
1506 let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1507
1508 let response = net.do_action(&NetAction::LoadPacketIntoOutputPort(packet_id.clone(), "out".to_string()));
1510 assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::None, _)));
1511
1512 let output_loc = PacketLocation::OutputPort(epoch.id, "out".to_string());
1514 assert_eq!(net.packet_count_at(&output_loc), 1);
1515 }
1516
1517 #[test]
1518 fn test_send_output_salvo() {
1519 let graph = linear_graph_3();
1520 let mut net = Net::new(graph);
1521
1522 let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1524 let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1525 net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1526 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1527 net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1528
1529 let salvo = Salvo {
1531 salvo_condition: "manual".to_string(),
1532 packets: vec![("in".to_string(), packet_id.clone())],
1533 };
1534 let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1535
1536 net.do_action(&NetAction::LoadPacketIntoOutputPort(packet_id.clone(), "out".to_string()));
1538
1539 let response = net.do_action(&NetAction::SendOutputSalvo(epoch.id.clone(), "default".to_string()));
1541 assert!(matches!(response, NetActionResponse::Success(NetActionResponseData::None, _)));
1542
1543 let edge_loc = PacketLocation::Edge(Edge {
1545 source: PortRef {
1546 node_name: "B".to_string(),
1547 port_type: PortType::Output,
1548 port_name: "out".to_string(),
1549 },
1550 target: PortRef {
1551 node_name: "C".to_string(),
1552 port_type: PortType::Input,
1553 port_name: "in".to_string(),
1554 },
1555 });
1556 assert_eq!(net.packet_count_at(&edge_loc), 1);
1557 }
1558
1559 #[test]
1562 fn test_create_and_start_epoch() {
1563 let graph = linear_graph_3();
1564 let mut net = Net::new(graph);
1565
1566 let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1568 let input_port_loc = PacketLocation::InputPort("B".to_string(), "in".to_string());
1569 net._packets.get_mut(&packet_id).unwrap().location = input_port_loc.clone();
1570 net._packets_by_location.get_mut(&PacketLocation::OutsideNet).unwrap().shift_remove(&packet_id);
1571 net._packets_by_location.get_mut(&input_port_loc).unwrap().insert(packet_id.clone());
1572
1573 let salvo = Salvo {
1575 salvo_condition: "manual".to_string(),
1576 packets: vec![("in".to_string(), packet_id.clone())],
1577 };
1578 let epoch = get_started_epoch(&net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo)));
1579
1580 assert!(matches!(epoch.state, EpochState::Running));
1581 assert_eq!(epoch.node_name, "B");
1582 }
1583
1584 #[test]
1585 fn test_create_and_start_epoch_validates_node() {
1586 let graph = linear_graph_3();
1587 let mut net = Net::new(graph);
1588
1589 let salvo = Salvo {
1590 salvo_condition: "manual".to_string(),
1591 packets: vec![],
1592 };
1593 let response = net.do_action(&NetAction::CreateAndStartEpoch("NonExistent".to_string(), salvo));
1594 assert!(matches!(response, NetActionResponse::Error(NetActionError::NodeNotFound { .. })));
1595 }
1596
1597 #[test]
1598 fn test_create_and_start_epoch_validates_packet_location() {
1599 let graph = linear_graph_3();
1600 let mut net = Net::new(graph);
1601
1602 let packet_id = get_packet_id(&net.do_action(&NetAction::CreatePacket(None)));
1604
1605 let salvo = Salvo {
1606 salvo_condition: "manual".to_string(),
1607 packets: vec![("in".to_string(), packet_id)],
1608 };
1609 let response = net.do_action(&NetAction::CreateAndStartEpoch("B".to_string(), salvo));
1610 assert!(matches!(response, NetActionResponse::Error(NetActionError::PacketNotAtInputPort { .. })));
1611 }
1612}