1use crate::_utils::get_utc_now;
10use crate::graph::{
11 Edge, Graph, MaxSalvos, NodeName, PacketCount, Port, PortName, PortRef, PortSlotSpec, PortType,
12 SalvoConditionName, SalvoConditionTerm, evaluate_salvo_condition,
13};
14use indexmap::IndexSet;
15use std::collections::{HashMap, HashSet};
16use ulid::Ulid;
17
18pub type PacketID = Ulid;
20
21pub type EpochID = Ulid;
23
24#[derive(Debug, PartialEq, Eq, Hash, Clone)]
32pub enum PacketLocation {
33 Node(EpochID),
35 InputPort(NodeName, PortName),
37 OutputPort(EpochID, PortName),
39 Edge(Edge),
41 OutsideNet,
43}
44
45#[derive(Debug)]
47pub struct Packet {
48 pub id: PacketID,
50 pub location: PacketLocation,
52}
53
54#[derive(Debug, Clone)]
60pub struct Salvo {
61 pub salvo_condition: SalvoConditionName,
63 pub packets: Vec<(PortName, PacketID)>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int))]
70pub enum EpochState {
71 Startable,
73 Running,
75 Finished,
77}
78
79#[cfg(feature = "python")]
80#[pyo3::pymethods]
81impl EpochState {
82 fn __repr__(&self) -> String {
83 match self {
84 EpochState::Startable => "EpochState.Startable".to_string(),
85 EpochState::Running => "EpochState.Running".to_string(),
86 EpochState::Finished => "EpochState.Finished".to_string(),
87 }
88 }
89}
90
91#[derive(Debug, Clone)]
97pub struct Epoch {
98 pub id: EpochID,
100 pub node_name: NodeName,
102 pub in_salvo: Salvo,
104 pub out_salvos: Vec<Salvo>,
106 pub state: EpochState,
108 pub orphaned_packets: Vec<OrphanedPacketInfo>,
110}
111
112#[derive(Debug, Clone)]
117pub struct OrphanedPacketInfo {
118 pub packet_id: PacketID,
120 pub from_port: PortName,
122 pub salvo_condition: SalvoConditionName,
124}
125
126impl Epoch {
127 pub fn start_time(&self) -> u64 {
129 self.id.timestamp_ms()
130 }
131}
132
133pub type EventUTC = i128;
135
136#[derive(Debug, Clone)]
141pub enum NetAction {
142 RunStep,
151 CreatePacket(Option<EpochID>),
154 ConsumePacket(PacketID),
156 DestroyPacket(PacketID),
158 StartEpoch(EpochID),
160 FinishEpoch(EpochID),
162 CancelEpoch(EpochID),
164 CreateEpoch(NodeName, Salvo),
168 LoadPacketIntoOutputPort(PacketID, PortName),
170 SendOutputSalvo(EpochID, SalvoConditionName),
172 TransportPacketToLocation(PacketID, PacketLocation),
177}
178
179#[derive(Debug, thiserror::Error)]
181pub enum UndoError {
182 #[error("state mismatch: {0}")]
184 StateMismatch(String),
185
186 #[error("entity not found: {0}")]
188 NotFound(String),
189
190 #[error("action not undoable: {0}")]
192 NotUndoable(String),
193
194 #[error("internal error: {0}")]
196 InternalError(String),
197}
198
199#[derive(Debug, thiserror::Error)]
201pub enum NetActionError {
202 #[error("packet not found: {packet_id}")]
204 PacketNotFound { packet_id: PacketID },
205
206 #[error("epoch not found: {epoch_id}")]
208 EpochNotFound { epoch_id: EpochID },
209
210 #[error("epoch {epoch_id} is not running")]
212 EpochNotRunning { epoch_id: EpochID },
213
214 #[error("epoch {epoch_id} is not startable")]
216 EpochNotStartable { epoch_id: EpochID },
217
218 #[error("cannot finish epoch {epoch_id}: epoch still contains packets")]
220 CannotFinishNonEmptyEpoch { epoch_id: EpochID },
221
222 #[error("cannot finish epoch {epoch_id}: output port '{port_name}' has unsent packets")]
224 UnsentOutputSalvo {
225 epoch_id: EpochID,
226 port_name: PortName,
227 },
228
229 #[error("packet {packet_id} is not inside epoch {epoch_id}")]
231 PacketNotInNode {
232 packet_id: PacketID,
233 epoch_id: EpochID,
234 },
235
236 #[error("output port '{port_name}' not found on node for epoch {epoch_id}")]
238 OutputPortNotFound {
239 port_name: PortName,
240 epoch_id: EpochID,
241 },
242
243 #[error("output salvo condition '{condition_name}' not found on node for epoch {epoch_id}")]
245 OutputSalvoConditionNotFound {
246 condition_name: SalvoConditionName,
247 epoch_id: EpochID,
248 },
249
250 #[error("max output salvos reached for condition '{condition_name}' on epoch {epoch_id}")]
252 MaxOutputSalvosReached {
253 condition_name: SalvoConditionName,
254 epoch_id: EpochID,
255 },
256
257 #[error("salvo condition '{condition_name}' not met for epoch {epoch_id}")]
259 SalvoConditionNotMet {
260 condition_name: SalvoConditionName,
261 epoch_id: EpochID,
262 },
263
264 #[error("output port '{port_name}' is full for epoch {epoch_id}")]
266 OutputPortFull {
267 port_name: PortName,
268 epoch_id: EpochID,
269 },
270
271 #[error("output port '{port_name}' on node '{node_name}' is not connected to any edge")]
273 CannotPutPacketIntoUnconnectedOutputPort {
274 port_name: PortName,
275 node_name: NodeName,
276 },
277
278 #[error("node not found: '{node_name}'")]
280 NodeNotFound { node_name: NodeName },
281
282 #[error("packet {packet_id} is not at input port '{port_name}' of node '{node_name}'")]
284 PacketNotAtInputPort {
285 packet_id: PacketID,
286 port_name: PortName,
287 node_name: NodeName,
288 },
289
290 #[error("input port '{port_name}' not found on node '{node_name}'")]
292 InputPortNotFound {
293 port_name: PortName,
294 node_name: NodeName,
295 },
296
297 #[error("input port '{port_name}' on node '{node_name}' is full")]
299 InputPortFull {
300 port_name: PortName,
301 node_name: NodeName,
302 },
303
304 #[error("cannot move packet {packet_id} out of running epoch {epoch_id}")]
306 CannotMovePacketFromRunningEpoch {
307 packet_id: PacketID,
308 epoch_id: EpochID,
309 },
310
311 #[error("cannot move packet {packet_id} into running epoch {epoch_id}")]
313 CannotMovePacketIntoRunningEpoch {
314 packet_id: PacketID,
315 epoch_id: EpochID,
316 },
317
318 #[error("edge not found: {edge}")]
320 EdgeNotFound { edge: Edge },
321}
322
323#[derive(Debug, Clone)]
329pub enum NetEvent {
330 PacketCreated(EventUTC, PacketID),
332 PacketConsumed(EventUTC, PacketID, PacketLocation),
335 PacketDestroyed(EventUTC, PacketID, PacketLocation),
338 EpochCreated(EventUTC, EpochID),
340 EpochStarted(EventUTC, EpochID),
342 EpochFinished(EventUTC, Epoch),
345 EpochCancelled(EventUTC, Epoch),
348 PacketMoved(EventUTC, PacketID, PacketLocation, PacketLocation, usize),
352 InputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
354 OutputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
356 PacketOrphaned(
359 EventUTC,
360 PacketID,
361 EpochID,
362 NodeName,
363 PortName,
364 SalvoConditionName,
365 ),
366}
367
368#[derive(Debug, Clone)]
370pub enum NetActionResponseData {
371 StepResult {
373 made_progress: bool,
376 },
377 Packet(PacketID),
379 CreatedEpoch(Epoch),
381 StartedEpoch(Epoch),
383 FinishedEpoch(Epoch),
385 CancelledEpoch(Epoch, Vec<PacketID>),
387 None,
389}
390
391#[derive(Debug)]
393pub enum NetActionResponse {
394 Success(NetActionResponseData, Vec<NetEvent>),
396 Error(NetActionError),
398}
399
400#[derive(Debug)]
409pub struct NetSim {
410 pub graph: Graph,
412 _packets: HashMap<PacketID, Packet>,
413 _packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>>,
414 _epochs: HashMap<EpochID, Epoch>,
415 _startable_epochs: HashSet<EpochID>,
416 _node_to_epochs: HashMap<NodeName, Vec<EpochID>>,
417}
418
419impl NetSim {
420 pub fn new(graph: Graph) -> Self {
424 let mut packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>> = HashMap::new();
425
426 for edge in graph.edges() {
428 packets_by_location.insert(PacketLocation::Edge(edge.clone()), IndexSet::new());
429 }
430
431 for (node_name, node) in graph.nodes() {
433 for port_name in node.in_ports.keys() {
434 packets_by_location.insert(
435 PacketLocation::InputPort(node_name.clone(), port_name.clone()),
436 IndexSet::new(),
437 );
438 }
439 }
440
441 packets_by_location.insert(PacketLocation::OutsideNet, IndexSet::new());
443
444 NetSim {
448 graph,
449 _packets: HashMap::new(),
450 _packets_by_location: packets_by_location,
451 _epochs: HashMap::new(),
452 _startable_epochs: HashSet::new(),
453 _node_to_epochs: HashMap::new(),
454 }
455 }
456
457 fn move_packet(&mut self, packet_id: &PacketID, new_location: PacketLocation) {
458 let packet = self._packets.get_mut(packet_id).unwrap();
459 let packets_at_old_location = self
460 ._packets_by_location
461 .get_mut(&packet.location)
462 .expect("Packet location has no entry in self._packets_by_location.");
463 packets_at_old_location.shift_remove(packet_id);
464 packet.location = new_location;
465 if !self
466 ._packets_by_location
467 .get_mut(&packet.location)
468 .expect("Packet location has no entry in self._packets_by_location")
469 .insert(*packet_id)
470 {
471 panic!("Attempted to move packet to a location that already contains it.");
472 }
473 }
474
475 fn try_trigger_input_salvo(&mut self, node_name: &NodeName) -> (bool, Vec<NetEvent>) {
480 let mut events: Vec<NetEvent> = Vec::new();
481
482 let node = match self.graph.nodes().get(node_name) {
483 Some(n) => n,
484 None => return (false, events),
485 };
486
487 let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
488 let in_ports_clone: HashMap<PortName, Port> = node
489 .in_ports
490 .iter()
491 .map(|(k, v)| {
492 (
493 k.clone(),
494 Port {
495 slots_spec: match v.slots_spec {
496 PortSlotSpec::Infinite => PortSlotSpec::Infinite,
497 PortSlotSpec::Finite(n) => PortSlotSpec::Finite(n),
498 },
499 },
500 )
501 })
502 .collect();
503
504 struct SalvoConditionData {
506 name: SalvoConditionName,
507 ports: HashMap<PortName, PacketCount>,
508 term: SalvoConditionTerm,
509 }
510
511 let salvo_conditions: Vec<SalvoConditionData> = node
512 .in_salvo_conditions
513 .iter()
514 .map(|(name, cond)| SalvoConditionData {
515 name: name.clone(),
516 ports: cond.ports.clone(),
517 term: cond.term.clone(),
518 })
519 .collect();
520
521 for salvo_cond_data in salvo_conditions {
523 let port_packet_counts: HashMap<PortName, u64> = in_port_names
525 .iter()
526 .map(|port_name| {
527 let count = self
528 ._packets_by_location
529 .get(&PacketLocation::InputPort(
530 node_name.clone(),
531 port_name.clone(),
532 ))
533 .map(|packets| packets.len() as u64)
534 .unwrap_or(0);
535 (port_name.clone(), count)
536 })
537 .collect();
538
539 if evaluate_salvo_condition(&salvo_cond_data.term, &port_packet_counts, &in_ports_clone)
541 {
542 let epoch_id = Ulid::new();
544
545 let mut salvo_packets: Vec<(PortName, PacketID)> = Vec::new();
548 let mut packets_to_move: Vec<(PacketID, PortName, usize)> = Vec::new();
549
550 for (port_name, packet_count) in &salvo_cond_data.ports {
551 let port_location =
552 PacketLocation::InputPort(node_name.clone(), port_name.clone());
553 if let Some(packet_ids) = self._packets_by_location.get(&port_location) {
554 let take_count = match packet_count {
555 PacketCount::All => packet_ids.len(),
556 PacketCount::Count(n) => std::cmp::min(*n as usize, packet_ids.len()),
557 };
558 for (idx, pid) in packet_ids.iter().enumerate().take(take_count) {
559 salvo_packets.push((port_name.clone(), *pid));
560 packets_to_move.push((*pid, port_name.clone(), idx));
561 }
562 }
563 }
564
565 let in_salvo = Salvo {
567 salvo_condition: salvo_cond_data.name.clone(),
568 packets: salvo_packets,
569 };
570
571 let epoch = Epoch {
573 id: epoch_id,
574 node_name: node_name.clone(),
575 in_salvo,
576 out_salvos: Vec::new(),
577 state: EpochState::Startable,
578 orphaned_packets: Vec::new(),
579 };
580
581 self._epochs.insert(epoch_id, epoch);
583 self._startable_epochs.insert(epoch_id);
584 self._node_to_epochs
585 .entry(node_name.clone())
586 .or_default()
587 .push(epoch_id);
588
589 let epoch_location = PacketLocation::Node(epoch_id);
591 self._packets_by_location
592 .insert(epoch_location.clone(), IndexSet::new());
593
594 let node = self
596 .graph
597 .nodes()
598 .get(node_name)
599 .expect("Node not found for epoch creation");
600 for out_port_name in node.out_ports.keys() {
601 let output_port_location =
602 PacketLocation::OutputPort(epoch_id, out_port_name.clone());
603 self._packets_by_location
604 .insert(output_port_location, IndexSet::new());
605 }
606
607 events.push(NetEvent::InputSalvoTriggered(
612 get_utc_now(),
613 epoch_id,
614 salvo_cond_data.name.clone(),
615 ));
616 events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
617
618 for (pid, port_name, from_index) in &packets_to_move {
620 let from_location =
621 PacketLocation::InputPort(node_name.clone(), port_name.clone());
622 self.move_packet(pid, epoch_location.clone());
623 events.push(NetEvent::PacketMoved(
624 get_utc_now(),
625 *pid,
626 from_location,
627 epoch_location.clone(),
628 *from_index,
629 ));
630 }
631
632 return (true, events);
634 }
635 }
636
637 (false, events)
638 }
639
640 fn run_step(&mut self) -> NetActionResponse {
641 let mut all_events: Vec<NetEvent> = Vec::new();
642 let mut made_progress = false;
643
644 struct EdgeMoveCandidate {
648 packet_id: PacketID,
649 from_location: PacketLocation,
650 from_index: usize,
651 input_port_location: PacketLocation,
652 can_move: bool,
653 }
654
655 let mut edge_candidates: Vec<EdgeMoveCandidate> = Vec::new();
656
657 for (location, packets) in &self._packets_by_location {
659 if let PacketLocation::Edge(edge_ref) = location {
660 if let Some(first_packet_id) = packets.first() {
662 let target_node_name = edge_ref.target.node_name.clone();
663 let target_port_name = edge_ref.target.port_name.clone();
664
665 let node = self
667 .graph
668 .nodes()
669 .get(&target_node_name)
670 .expect("Edge targets a non-existent node");
671 let port = node
672 .in_ports
673 .get(&target_port_name)
674 .expect("Edge targets a non-existent input port");
675
676 let input_port_location = PacketLocation::InputPort(
677 target_node_name.clone(),
678 target_port_name.clone(),
679 );
680 let current_count = self
681 ._packets_by_location
682 .get(&input_port_location)
683 .map(|packets| packets.len() as u64)
684 .unwrap_or(0);
685
686 let can_move = match port.slots_spec {
687 PortSlotSpec::Infinite => true,
688 PortSlotSpec::Finite(max_slots) => current_count < max_slots,
689 };
690
691 edge_candidates.push(EdgeMoveCandidate {
692 packet_id: *first_packet_id,
693 from_location: location.clone(),
694 from_index: 0, input_port_location,
696 can_move,
697 });
698 }
699 }
700 }
701
702 for candidate in edge_candidates {
704 if !candidate.can_move {
705 continue;
706 }
707
708 self.move_packet(&candidate.packet_id, candidate.input_port_location.clone());
710 all_events.push(NetEvent::PacketMoved(
711 get_utc_now(),
712 candidate.packet_id,
713 candidate.from_location,
714 candidate.input_port_location.clone(),
715 candidate.from_index,
716 ));
717 made_progress = true;
718 }
719
720 let mut nodes_with_input_packets: Vec<NodeName> = Vec::new();
722 for (location, packets) in &self._packets_by_location {
723 if let PacketLocation::InputPort(node_name, _) = location
724 && !packets.is_empty()
725 && !nodes_with_input_packets.contains(node_name)
726 {
727 nodes_with_input_packets.push(node_name.clone());
728 }
729 }
730
731 for node_name in nodes_with_input_packets {
732 let (triggered, events) = self.try_trigger_input_salvo(&node_name);
733 all_events.extend(events);
734 if triggered {
735 made_progress = true;
736 }
737 }
738
739 NetActionResponse::Success(
740 NetActionResponseData::StepResult { made_progress },
741 all_events,
742 )
743 }
744
745 fn create_packet(&mut self, maybe_epoch_id: &Option<EpochID>) -> NetActionResponse {
746 if let Some(epoch_id) = maybe_epoch_id {
748 if !self._epochs.contains_key(epoch_id) {
749 return NetActionResponse::Error(NetActionError::EpochNotFound {
750 epoch_id: *epoch_id,
751 });
752 }
753 if !matches!(self._epochs[epoch_id].state, EpochState::Running) {
754 return NetActionResponse::Error(NetActionError::EpochNotRunning {
755 epoch_id: *epoch_id,
756 });
757 }
758 }
759
760 let packet_location = match maybe_epoch_id {
761 Some(epoch_id) => PacketLocation::Node(*epoch_id),
762 None => PacketLocation::OutsideNet,
763 };
764
765 let packet = Packet {
766 id: Ulid::new(),
767 location: packet_location.clone(),
768 };
769
770 let packet_id = packet.id;
771 self._packets.insert(packet.id, packet);
772
773 self._packets_by_location
775 .entry(packet_location)
776 .or_default()
777 .insert(packet_id);
778
779 NetActionResponse::Success(
780 NetActionResponseData::Packet(packet_id),
781 vec![NetEvent::PacketCreated(get_utc_now(), packet_id)],
782 )
783 }
784
785 fn consume_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
786 if !self._packets.contains_key(packet_id) {
787 return NetActionResponse::Error(NetActionError::PacketNotFound {
788 packet_id: *packet_id,
789 });
790 }
791
792 let location = self._packets[packet_id].location.clone();
793
794 if let Some(packets) = self._packets_by_location.get_mut(&location) {
795 if packets.shift_remove(packet_id) {
796 self._packets.remove(packet_id);
797 NetActionResponse::Success(
798 NetActionResponseData::None,
799 vec![NetEvent::PacketConsumed(
800 get_utc_now(),
801 *packet_id,
802 location,
803 )],
804 )
805 } else {
806 panic!(
807 "Packet with ID {} not found in location {:?}",
808 packet_id, location
809 );
810 }
811 } else {
812 panic!("Packet location {:?} not found", location);
813 }
814 }
815
816 fn destroy_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
817 if !self._packets.contains_key(packet_id) {
818 return NetActionResponse::Error(NetActionError::PacketNotFound {
819 packet_id: *packet_id,
820 });
821 }
822
823 let location = self._packets[packet_id].location.clone();
824
825 if let Some(packets) = self._packets_by_location.get_mut(&location) {
826 if packets.shift_remove(packet_id) {
827 self._packets.remove(packet_id);
828 NetActionResponse::Success(
829 NetActionResponseData::None,
830 vec![NetEvent::PacketDestroyed(
831 get_utc_now(),
832 *packet_id,
833 location,
834 )],
835 )
836 } else {
837 panic!(
838 "Packet with ID {} not found in location {:?}",
839 packet_id, location
840 );
841 }
842 } else {
843 panic!("Packet location {:?} not found", location);
844 }
845 }
846
847 fn start_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
848 if let Some(epoch) = self._epochs.get_mut(epoch_id) {
849 if !self._startable_epochs.contains(epoch_id) {
850 return NetActionResponse::Error(NetActionError::EpochNotStartable {
851 epoch_id: *epoch_id,
852 });
853 }
854 debug_assert!(
855 matches!(epoch.state, EpochState::Startable),
856 "Epoch state is not Startable but was in net._startable_epochs."
857 );
858 epoch.state = EpochState::Running;
859 self._startable_epochs.remove(epoch_id);
860 NetActionResponse::Success(
861 NetActionResponseData::StartedEpoch(epoch.clone()),
862 vec![NetEvent::EpochStarted(get_utc_now(), *epoch_id)],
863 )
864 } else {
865 NetActionResponse::Error(NetActionError::EpochNotFound {
866 epoch_id: *epoch_id,
867 })
868 }
869 }
870
871 fn finish_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
872 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
874 epoch.clone()
875 } else {
876 return NetActionResponse::Error(NetActionError::EpochNotFound {
877 epoch_id: *epoch_id,
878 });
879 };
880
881 if epoch.state != EpochState::Running {
883 return NetActionResponse::Error(NetActionError::EpochNotRunning {
884 epoch_id: *epoch_id,
885 });
886 }
887
888 let epoch_loc = PacketLocation::Node(*epoch_id);
890 if let Some(packets) = self._packets_by_location.get(&epoch_loc) {
891 if !packets.is_empty() {
892 return NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch {
893 epoch_id: *epoch_id,
894 });
895 }
896 } else {
897 panic!("Epoch {} not found in location {:?}", epoch_id, epoch_loc);
898 }
899
900 let node = self
902 .graph
903 .nodes()
904 .get(&epoch.node_name)
905 .expect("Epoch references non-existent node");
906 for port_name in node.out_ports.keys() {
907 let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
908 if let Some(packets) = self._packets_by_location.get(&output_port_loc)
909 && !packets.is_empty()
910 {
911 return NetActionResponse::Error(NetActionError::UnsentOutputSalvo {
912 epoch_id: *epoch_id,
913 port_name: port_name.clone(),
914 });
915 }
916 }
917
918 let epoch_before_finish = self._epochs[epoch_id].clone();
921
922 let mut epoch = self._epochs.remove(epoch_id).unwrap();
923 epoch.state = EpochState::Finished;
924
925 self._packets_by_location.remove(&epoch_loc);
927 for port_name in node.out_ports.keys() {
928 let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
929 self._packets_by_location.remove(&output_port_loc);
930 }
931
932 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
934 epoch_ids.retain(|id| id != epoch_id);
935 }
936
937 NetActionResponse::Success(
938 NetActionResponseData::FinishedEpoch(epoch),
939 vec![NetEvent::EpochFinished(get_utc_now(), epoch_before_finish)],
940 )
941 }
942
943 fn cancel_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
944 let epoch_for_event = if let Some(epoch) = self._epochs.get(epoch_id) {
946 epoch.clone()
947 } else {
948 return NetActionResponse::Error(NetActionError::EpochNotFound {
949 epoch_id: *epoch_id,
950 });
951 };
952
953 let mut events: Vec<NetEvent> = Vec::new();
954 let mut destroyed_packets: Vec<PacketID> = Vec::new();
955
956 let epoch_location = PacketLocation::Node(*epoch_id);
958 if let Some(packet_ids) = self._packets_by_location.get(&epoch_location) {
959 destroyed_packets.extend(packet_ids.iter().cloned());
960 }
961
962 let node = self
964 .graph
965 .nodes()
966 .get(&epoch_for_event.node_name)
967 .expect("Epoch references non-existent node");
968 for port_name in node.out_ports.keys() {
969 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
970 if let Some(packet_ids) = self._packets_by_location.get(&output_port_location) {
971 destroyed_packets.extend(packet_ids.iter().cloned());
972 }
973 }
974
975 for packet_id in &destroyed_packets {
977 let packet = self
978 ._packets
979 .remove(packet_id)
980 .expect("Packet in location map not found in packets map");
981 let packet_location = packet.location.clone();
982 if let Some(packets_at_location) = self._packets_by_location.get_mut(&packet_location) {
983 packets_at_location.shift_remove(packet_id);
984 }
985 events.push(NetEvent::PacketDestroyed(
986 get_utc_now(),
987 *packet_id,
988 packet_location,
989 ));
990 }
991
992 for port_name in node.out_ports.keys() {
994 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
995 self._packets_by_location.remove(&output_port_location);
996 }
997
998 self._packets_by_location.remove(&epoch_location);
1000
1001 self._startable_epochs.remove(epoch_id);
1003
1004 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch_for_event.node_name) {
1006 epoch_ids.retain(|id| id != epoch_id);
1007 }
1008
1009 let epoch = self._epochs.remove(epoch_id).expect("Epoch should exist");
1011
1012 events.push(NetEvent::EpochCancelled(get_utc_now(), epoch_for_event));
1013
1014 NetActionResponse::Success(
1015 NetActionResponseData::CancelledEpoch(epoch, destroyed_packets),
1016 events,
1017 )
1018 }
1019
1020 fn create_epoch(&mut self, node_name: &NodeName, salvo: &Salvo) -> NetActionResponse {
1021 let node = match self.graph.nodes().get(node_name) {
1023 Some(node) => node,
1024 None => {
1025 return NetActionResponse::Error(NetActionError::NodeNotFound {
1026 node_name: node_name.clone(),
1027 });
1028 }
1029 };
1030
1031 for (port_name, packet_id) in &salvo.packets {
1033 if !node.in_ports.contains_key(port_name) {
1035 return NetActionResponse::Error(NetActionError::InputPortNotFound {
1036 port_name: port_name.clone(),
1037 node_name: node_name.clone(),
1038 });
1039 }
1040
1041 let packet = match self._packets.get(packet_id) {
1043 Some(packet) => packet,
1044 None => {
1045 return NetActionResponse::Error(NetActionError::PacketNotFound {
1046 packet_id: *packet_id,
1047 });
1048 }
1049 };
1050
1051 let expected_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1053 if packet.location != expected_location {
1054 return NetActionResponse::Error(NetActionError::PacketNotAtInputPort {
1055 packet_id: *packet_id,
1056 port_name: port_name.clone(),
1057 node_name: node_name.clone(),
1058 });
1059 }
1060 }
1061
1062 let mut events: Vec<NetEvent> = Vec::new();
1063
1064 let epoch_id = Ulid::new();
1066 let epoch = Epoch {
1067 id: epoch_id,
1068 node_name: node_name.clone(),
1069 in_salvo: salvo.clone(),
1070 out_salvos: Vec::new(),
1071 state: EpochState::Startable,
1072 orphaned_packets: Vec::new(),
1073 };
1074
1075 self._epochs.insert(epoch_id, epoch.clone());
1077 self._startable_epochs.insert(epoch_id);
1078 self._node_to_epochs
1079 .entry(node_name.clone())
1080 .or_default()
1081 .push(epoch_id);
1082
1083 let epoch_location = PacketLocation::Node(epoch_id);
1085 self._packets_by_location
1086 .insert(epoch_location.clone(), IndexSet::new());
1087
1088 for port_name in node.out_ports.keys() {
1090 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
1091 self._packets_by_location
1092 .insert(output_port_location, IndexSet::new());
1093 }
1094
1095 events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
1096
1097 for (port_name, packet_id) in &salvo.packets {
1099 let from_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1100
1101 let from_index = self
1103 ._packets_by_location
1104 .get(&from_location)
1105 .and_then(|packets| packets.get_index_of(packet_id))
1106 .expect("Packet should exist at from_location");
1107
1108 self.move_packet(packet_id, epoch_location.clone());
1109 events.push(NetEvent::PacketMoved(
1110 get_utc_now(),
1111 *packet_id,
1112 from_location,
1113 epoch_location.clone(),
1114 from_index,
1115 ));
1116 }
1117
1118 NetActionResponse::Success(NetActionResponseData::CreatedEpoch(epoch), events)
1119 }
1120
1121 fn load_packet_into_output_port(
1122 &mut self,
1123 packet_id: &PacketID,
1124 port_name: &String,
1125 ) -> NetActionResponse {
1126 let (epoch_id, old_location) = if let Some(packet) = self._packets.get(packet_id) {
1127 if let PacketLocation::Node(epoch_id) = packet.location {
1128 (epoch_id, packet.location.clone())
1129 } else {
1130 return NetActionResponse::Error(NetActionError::PacketNotInNode {
1133 packet_id: *packet_id,
1134 epoch_id: Ulid::nil(), });
1136 }
1137 } else {
1138 return NetActionResponse::Error(NetActionError::PacketNotFound {
1139 packet_id: *packet_id,
1140 });
1141 };
1142
1143 let node_name = self
1144 ._epochs
1145 .get(&epoch_id)
1146 .expect("The epoch id in the location of a packet could not be found.")
1147 .node_name
1148 .clone();
1149 let node = self
1150 .graph
1151 .nodes()
1152 .get(&node_name)
1153 .expect("Packet located in a non-existing node (yet the node has an epoch).");
1154
1155 if !node.out_ports.contains_key(port_name) {
1156 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1157 port_name: port_name.clone(),
1158 epoch_id,
1159 });
1160 }
1161
1162 let port = node.out_ports.get(port_name).unwrap();
1163 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
1164 let port_packets = self
1165 ._packets_by_location
1166 .get(&output_port_location)
1167 .expect("No entry in NetSim._packets_by_location found for output port.");
1168
1169 if let PortSlotSpec::Finite(num_slots) = port.slots_spec
1171 && port_packets.len() as u64 >= num_slots
1172 {
1173 return NetActionResponse::Error(NetActionError::OutputPortFull {
1174 port_name: port_name.clone(),
1175 epoch_id,
1176 });
1177 }
1178
1179 let from_index = self
1181 ._packets_by_location
1182 .get(&old_location)
1183 .and_then(|packets| packets.get_index_of(packet_id))
1184 .expect("Packet should exist at old_location");
1185
1186 let new_location = output_port_location;
1187 self.move_packet(packet_id, new_location.clone());
1188 NetActionResponse::Success(
1189 NetActionResponseData::None,
1190 vec![NetEvent::PacketMoved(
1191 get_utc_now(),
1192 *packet_id,
1193 old_location,
1194 new_location,
1195 from_index,
1196 )],
1197 )
1198 }
1199
1200 fn send_output_salvo(
1201 &mut self,
1202 epoch_id: &EpochID,
1203 salvo_condition_name: &SalvoConditionName,
1204 ) -> NetActionResponse {
1205 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
1207 epoch
1208 } else {
1209 return NetActionResponse::Error(NetActionError::EpochNotFound {
1210 epoch_id: *epoch_id,
1211 });
1212 };
1213
1214 let node = self
1216 .graph
1217 .nodes()
1218 .get(&epoch.node_name)
1219 .expect("Node associated with epoch could not be found.");
1220 let node_name = node.name.clone();
1221
1222 let salvo_condition =
1224 if let Some(salvo_condition) = node.out_salvo_conditions.get(salvo_condition_name) {
1225 salvo_condition
1226 } else {
1227 return NetActionResponse::Error(NetActionError::OutputSalvoConditionNotFound {
1228 condition_name: salvo_condition_name.clone(),
1229 epoch_id: *epoch_id,
1230 });
1231 };
1232
1233 if let MaxSalvos::Finite(max) = salvo_condition.max_salvos {
1235 let condition_salvo_count = epoch
1236 .out_salvos
1237 .iter()
1238 .filter(|s| s.salvo_condition == *salvo_condition_name)
1239 .count() as u64;
1240 if condition_salvo_count >= max {
1241 return NetActionResponse::Error(NetActionError::MaxOutputSalvosReached {
1242 condition_name: salvo_condition_name.clone(),
1243 epoch_id: *epoch_id,
1244 });
1245 }
1246 }
1247
1248 let port_packet_counts: HashMap<PortName, u64> = node
1250 .out_ports
1251 .keys()
1252 .map(|port_name| {
1253 let count = self
1254 ._packets_by_location
1255 .get(&PacketLocation::OutputPort(*epoch_id, port_name.clone()))
1256 .map(|packets| packets.len() as u64)
1257 .unwrap_or(0);
1258 (port_name.clone(), count)
1259 })
1260 .collect();
1261 if !evaluate_salvo_condition(&salvo_condition.term, &port_packet_counts, &node.out_ports) {
1262 return NetActionResponse::Error(NetActionError::SalvoConditionNotMet {
1263 condition_name: salvo_condition_name.clone(),
1264 epoch_id: *epoch_id,
1265 });
1266 }
1267
1268 let mut packets_to_move: Vec<(
1271 PacketID,
1272 PortName,
1273 PacketLocation,
1274 PacketLocation,
1275 usize,
1276 bool,
1277 )> = Vec::new();
1278 for (port_name, packet_count) in &salvo_condition.ports {
1279 let from_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1280 let packets = self
1281 ._packets_by_location
1282 .get(&from_location)
1283 .unwrap_or_else(|| {
1284 panic!(
1285 "Output port '{}' of node '{}' does not have an entry in self._packets_by_location",
1286 port_name,
1287 node_name
1288 )
1289 })
1290 .clone();
1291
1292 let (to_location, is_orphaned) = if let Some(edge_ref) =
1294 self.graph.get_edge_by_tail(&PortRef {
1295 node_name: node_name.clone(),
1296 port_type: PortType::Output,
1297 port_name: port_name.clone(),
1298 }) {
1299 (PacketLocation::Edge(edge_ref.clone()), false)
1301 } else {
1302 (PacketLocation::OutsideNet, true)
1304 };
1305
1306 let take_count = match packet_count {
1307 PacketCount::All => packets.len(),
1308 PacketCount::Count(n) => std::cmp::min(*n as usize, packets.len()),
1309 };
1310 for (idx, packet_id) in packets.into_iter().enumerate().take(take_count) {
1312 packets_to_move.push((
1313 packet_id,
1314 port_name.clone(),
1315 from_location.clone(),
1316 to_location.clone(),
1317 idx,
1318 is_orphaned,
1319 ));
1320 }
1321 }
1322
1323 let salvo = Salvo {
1325 salvo_condition: salvo_condition_name.clone(),
1326 packets: packets_to_move
1327 .iter()
1328 .map(|(packet_id, port_name, _, _, _, _)| (port_name.clone(), *packet_id))
1329 .collect(),
1330 };
1331 self._epochs
1332 .get_mut(epoch_id)
1333 .unwrap()
1334 .out_salvos
1335 .push(salvo);
1336
1337 let mut net_events = Vec::new();
1339 let mut orphaned_infos: Vec<OrphanedPacketInfo> = Vec::new();
1340
1341 for (packet_id, port_name, from_location, to_location, from_index, is_orphaned) in
1342 packets_to_move
1343 {
1344 if is_orphaned {
1345 net_events.push(NetEvent::PacketOrphaned(
1347 get_utc_now(),
1348 packet_id,
1349 *epoch_id,
1350 node_name.clone(),
1351 port_name.clone(),
1352 salvo_condition_name.clone(),
1353 ));
1354 orphaned_infos.push(OrphanedPacketInfo {
1355 packet_id,
1356 from_port: port_name,
1357 salvo_condition: salvo_condition_name.clone(),
1358 });
1359 } else {
1360 net_events.push(NetEvent::PacketMoved(
1362 get_utc_now(),
1363 packet_id,
1364 from_location,
1365 to_location.clone(),
1366 from_index,
1367 ));
1368 }
1369 self.move_packet(&packet_id, to_location);
1370 }
1371
1372 if !orphaned_infos.is_empty() {
1374 self._epochs
1375 .get_mut(epoch_id)
1376 .unwrap()
1377 .orphaned_packets
1378 .extend(orphaned_infos);
1379 }
1380
1381 net_events.push(NetEvent::OutputSalvoTriggered(
1383 get_utc_now(),
1384 *epoch_id,
1385 salvo_condition_name.clone(),
1386 ));
1387
1388 NetActionResponse::Success(NetActionResponseData::None, net_events)
1389 }
1390
1391 fn transport_packet_to_location(
1392 &mut self,
1393 packet_id: &PacketID,
1394 destination: &PacketLocation,
1395 ) -> NetActionResponse {
1396 let packet = if let Some(p) = self._packets.get(packet_id) {
1398 p
1399 } else {
1400 return NetActionResponse::Error(NetActionError::PacketNotFound {
1401 packet_id: *packet_id,
1402 });
1403 };
1404 let current_location = packet.location.clone();
1405
1406 match ¤t_location {
1408 PacketLocation::Node(epoch_id) => {
1409 if let Some(epoch) = self._epochs.get(epoch_id)
1410 && epoch.state == EpochState::Running
1411 {
1412 return NetActionResponse::Error(
1413 NetActionError::CannotMovePacketFromRunningEpoch {
1414 packet_id: *packet_id,
1415 epoch_id: *epoch_id,
1416 },
1417 );
1418 }
1419 }
1420 PacketLocation::OutputPort(epoch_id, _) => {
1421 if let Some(epoch) = self._epochs.get(epoch_id)
1422 && epoch.state == EpochState::Running
1423 {
1424 return NetActionResponse::Error(
1425 NetActionError::CannotMovePacketFromRunningEpoch {
1426 packet_id: *packet_id,
1427 epoch_id: *epoch_id,
1428 },
1429 );
1430 }
1431 }
1432 _ => {}
1433 }
1434
1435 match destination {
1437 PacketLocation::Node(epoch_id) => {
1438 if let Some(epoch) = self._epochs.get(epoch_id) {
1439 if epoch.state == EpochState::Running {
1440 return NetActionResponse::Error(
1441 NetActionError::CannotMovePacketIntoRunningEpoch {
1442 packet_id: *packet_id,
1443 epoch_id: *epoch_id,
1444 },
1445 );
1446 }
1447 } else {
1448 return NetActionResponse::Error(NetActionError::EpochNotFound {
1449 epoch_id: *epoch_id,
1450 });
1451 }
1452 }
1453 PacketLocation::OutputPort(epoch_id, port_name) => {
1454 if let Some(epoch) = self._epochs.get(epoch_id) {
1455 if epoch.state == EpochState::Running {
1456 return NetActionResponse::Error(
1457 NetActionError::CannotMovePacketIntoRunningEpoch {
1458 packet_id: *packet_id,
1459 epoch_id: *epoch_id,
1460 },
1461 );
1462 }
1463 let node = self
1465 .graph
1466 .nodes()
1467 .get(&epoch.node_name)
1468 .expect("Node associated with epoch could not be found.");
1469 if !node.out_ports.contains_key(port_name) {
1470 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1471 port_name: port_name.clone(),
1472 epoch_id: *epoch_id,
1473 });
1474 }
1475 } else {
1476 return NetActionResponse::Error(NetActionError::EpochNotFound {
1477 epoch_id: *epoch_id,
1478 });
1479 }
1480 }
1481 PacketLocation::InputPort(node_name, port_name) => {
1482 let node = if let Some(n) = self.graph.nodes().get(node_name) {
1484 n
1485 } else {
1486 return NetActionResponse::Error(NetActionError::NodeNotFound {
1487 node_name: node_name.clone(),
1488 });
1489 };
1490 let port = if let Some(p) = node.in_ports.get(port_name) {
1492 p
1493 } else {
1494 return NetActionResponse::Error(NetActionError::InputPortNotFound {
1495 port_name: port_name.clone(),
1496 node_name: node_name.clone(),
1497 });
1498 };
1499 let current_count = self
1501 ._packets_by_location
1502 .get(destination)
1503 .map(|s| s.len())
1504 .unwrap_or(0);
1505 let is_full = match &port.slots_spec {
1506 PortSlotSpec::Infinite => false,
1507 PortSlotSpec::Finite(capacity) => current_count >= *capacity as usize,
1508 };
1509 if is_full {
1510 return NetActionResponse::Error(NetActionError::InputPortFull {
1511 port_name: port_name.clone(),
1512 node_name: node_name.clone(),
1513 });
1514 }
1515 }
1516 PacketLocation::Edge(edge) => {
1517 if !self.graph.edges().contains(edge) {
1519 return NetActionResponse::Error(NetActionError::EdgeNotFound {
1520 edge: edge.clone(),
1521 });
1522 }
1523 }
1524 PacketLocation::OutsideNet => {
1525 }
1527 }
1528
1529 let from_index = self
1531 ._packets_by_location
1532 .get(¤t_location)
1533 .and_then(|packets| packets.get_index_of(packet_id))
1534 .expect("Packet should exist at current_location");
1535
1536 self.move_packet(packet_id, destination.clone());
1538
1539 NetActionResponse::Success(
1540 NetActionResponseData::None,
1541 vec![NetEvent::PacketMoved(
1542 get_utc_now(),
1543 *packet_id,
1544 current_location,
1545 destination.clone(),
1546 from_index,
1547 )],
1548 )
1549 }
1550
1551 pub fn do_action(&mut self, action: &NetAction) -> NetActionResponse {
1583 match action {
1584 NetAction::RunStep => self.run_step(),
1585 NetAction::CreatePacket(maybe_epoch_id) => self.create_packet(maybe_epoch_id),
1586 NetAction::ConsumePacket(packet_id) => self.consume_packet(packet_id),
1587 NetAction::DestroyPacket(packet_id) => self.destroy_packet(packet_id),
1588 NetAction::StartEpoch(epoch_id) => self.start_epoch(epoch_id),
1589 NetAction::FinishEpoch(epoch_id) => self.finish_epoch(epoch_id),
1590 NetAction::CancelEpoch(epoch_id) => self.cancel_epoch(epoch_id),
1591 NetAction::CreateEpoch(node_name, salvo) => self.create_epoch(node_name, salvo),
1592 NetAction::LoadPacketIntoOutputPort(packet_id, port_name) => {
1593 self.load_packet_into_output_port(packet_id, port_name)
1594 }
1595 NetAction::SendOutputSalvo(epoch_id, salvo_condition_name) => {
1596 self.send_output_salvo(epoch_id, salvo_condition_name)
1597 }
1598 NetAction::TransportPacketToLocation(packet_id, location) => {
1599 self.transport_packet_to_location(packet_id, location)
1600 }
1601 }
1602 }
1603
1604 pub fn packet_count_at(&self, location: &PacketLocation) -> usize {
1608 self._packets_by_location
1609 .get(location)
1610 .map(|s| s.len())
1611 .unwrap_or(0)
1612 }
1613
1614 pub fn get_packets_at_location(&self, location: &PacketLocation) -> Vec<PacketID> {
1616 self._packets_by_location
1617 .get(location)
1618 .map(|s| s.iter().cloned().collect())
1619 .unwrap_or_default()
1620 }
1621
1622 pub fn get_epoch(&self, epoch_id: &EpochID) -> Option<&Epoch> {
1624 self._epochs.get(epoch_id)
1625 }
1626
1627 pub fn get_startable_epochs(&self) -> Vec<EpochID> {
1629 self._startable_epochs.iter().cloned().collect()
1630 }
1631
1632 pub fn get_packet(&self, packet_id: &PacketID) -> Option<&Packet> {
1634 self._packets.get(packet_id)
1635 }
1636
1637 pub fn run_until_blocked(&mut self) -> Vec<NetEvent> {
1647 let mut all_events = Vec::new();
1648 while !self.is_blocked() {
1649 if let NetActionResponse::Success(_, events) = self.do_action(&NetAction::RunStep) {
1650 all_events.extend(events);
1651 }
1652 }
1653 all_events
1654 }
1655
1656 pub fn is_blocked(&self) -> bool {
1662 for (location, packets) in &self._packets_by_location {
1664 if let PacketLocation::Edge(edge_ref) = location {
1665 if packets.is_empty() {
1666 continue;
1667 }
1668
1669 let target_node_name = &edge_ref.target.node_name;
1670 let target_port_name = &edge_ref.target.port_name;
1671
1672 let node = match self.graph.nodes().get(target_node_name) {
1673 Some(n) => n,
1674 None => continue,
1675 };
1676 let port = match node.in_ports.get(target_port_name) {
1677 Some(p) => p,
1678 None => continue,
1679 };
1680
1681 let input_port_location =
1682 PacketLocation::InputPort(target_node_name.clone(), target_port_name.clone());
1683 let current_count = self
1684 ._packets_by_location
1685 .get(&input_port_location)
1686 .map(|p| p.len() as u64)
1687 .unwrap_or(0);
1688
1689 let can_move = match port.slots_spec {
1690 PortSlotSpec::Infinite => true,
1691 PortSlotSpec::Finite(max_slots) => current_count < max_slots,
1692 };
1693
1694 if can_move {
1695 return false; }
1697 }
1698 }
1699
1700 for (location, packets) in &self._packets_by_location {
1702 if let PacketLocation::InputPort(node_name, _) = location {
1703 if packets.is_empty() {
1704 continue;
1705 }
1706
1707 if self.can_trigger_input_salvo(node_name) {
1709 return false; }
1711 }
1712 }
1713
1714 true }
1716
1717 fn can_trigger_input_salvo(&self, node_name: &NodeName) -> bool {
1719 let node = match self.graph.nodes().get(node_name) {
1720 Some(n) => n,
1721 None => return false,
1722 };
1723
1724 let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
1725
1726 let port_packet_counts: HashMap<PortName, u64> = in_port_names
1728 .iter()
1729 .map(|port_name| {
1730 let count = self
1731 ._packets_by_location
1732 .get(&PacketLocation::InputPort(
1733 node_name.clone(),
1734 port_name.clone(),
1735 ))
1736 .map(|packets| packets.len() as u64)
1737 .unwrap_or(0);
1738 (port_name.clone(), count)
1739 })
1740 .collect();
1741
1742 for cond in node.in_salvo_conditions.values() {
1744 if evaluate_salvo_condition(&cond.term, &port_packet_counts, &node.in_ports) {
1745 return true;
1746 }
1747 }
1748
1749 false
1750 }
1751
1752 pub fn undo_action(
1773 &mut self,
1774 action: &NetAction,
1775 events: &[NetEvent],
1776 ) -> Result<(), UndoError> {
1777 for event in events.iter().rev() {
1779 self.undo_event(action, event)?;
1780 }
1781 Ok(())
1782 }
1783
1784 fn undo_event(&mut self, action: &NetAction, event: &NetEvent) -> Result<(), UndoError> {
1786 match event {
1787 NetEvent::PacketCreated(_, packet_id) => self.undo_packet_created(packet_id),
1788 NetEvent::PacketConsumed(_, packet_id, location) => {
1789 self.undo_packet_consumed(packet_id, location)
1790 }
1791 NetEvent::PacketDestroyed(_, packet_id, location) => {
1792 self.undo_packet_destroyed(packet_id, location)
1793 }
1794 NetEvent::EpochCreated(_, epoch_id) => self.undo_epoch_created(epoch_id),
1795 NetEvent::EpochStarted(_, epoch_id) => self.undo_epoch_started(epoch_id),
1796 NetEvent::EpochFinished(_, epoch) => self.undo_epoch_finished(epoch),
1797 NetEvent::EpochCancelled(_, epoch) => self.undo_epoch_cancelled(epoch),
1798 NetEvent::PacketMoved(_, packet_id, from, to, from_index) => {
1799 self.undo_packet_moved(packet_id, from, to, *from_index)
1800 }
1801 NetEvent::InputSalvoTriggered(_, _, _) => {
1802 Ok(())
1804 }
1805 NetEvent::OutputSalvoTriggered(_, epoch_id, _) => {
1806 self.undo_output_salvo_triggered(epoch_id, action)
1808 }
1809 NetEvent::PacketOrphaned(_, packet_id, epoch_id, _, port_name, _) => {
1810 self.undo_packet_orphaned(packet_id, epoch_id, port_name)
1812 }
1813 }
1814 }
1815
1816 fn undo_packet_created(&mut self, packet_id: &PacketID) -> Result<(), UndoError> {
1818 let location = match self._packets.get(packet_id) {
1820 Some(p) => p.location.clone(),
1821 None => {
1822 return Err(UndoError::NotFound(format!(
1823 "packet {} not found",
1824 packet_id
1825 )));
1826 }
1827 };
1828
1829 if let Some(packets) = self._packets_by_location.get_mut(&location) {
1831 packets.shift_remove(packet_id);
1832 }
1833
1834 self._packets.remove(packet_id);
1836
1837 Ok(())
1838 }
1839
1840 fn undo_packet_consumed(
1842 &mut self,
1843 packet_id: &PacketID,
1844 location: &PacketLocation,
1845 ) -> Result<(), UndoError> {
1846 self.recreate_packet(packet_id, location)
1847 }
1848
1849 fn undo_packet_destroyed(
1851 &mut self,
1852 packet_id: &PacketID,
1853 location: &PacketLocation,
1854 ) -> Result<(), UndoError> {
1855 self.recreate_packet(packet_id, location)
1856 }
1857
1858 fn recreate_packet(
1860 &mut self,
1861 packet_id: &PacketID,
1862 location: &PacketLocation,
1863 ) -> Result<(), UndoError> {
1864 if self._packets.contains_key(packet_id) {
1866 return Err(UndoError::StateMismatch(format!(
1867 "packet {} already exists",
1868 packet_id
1869 )));
1870 }
1871
1872 let packet = Packet {
1874 id: *packet_id,
1875 location: location.clone(),
1876 };
1877 self._packets.insert(*packet_id, packet);
1878
1879 self._packets_by_location
1881 .entry(location.clone())
1882 .or_default()
1883 .insert(*packet_id);
1884
1885 Ok(())
1886 }
1887
1888 fn undo_epoch_created(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
1890 let epoch = match self._epochs.get(epoch_id) {
1892 Some(e) => e.clone(),
1893 None => {
1894 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
1895 }
1896 };
1897
1898 self._epochs.remove(epoch_id);
1900
1901 self._startable_epochs.remove(epoch_id);
1903
1904 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
1906 epoch_ids.retain(|id| id != epoch_id);
1907 if epoch_ids.is_empty() {
1909 self._node_to_epochs.remove(&epoch.node_name);
1910 }
1911 }
1912
1913 let epoch_location = PacketLocation::Node(*epoch_id);
1915 self._packets_by_location.remove(&epoch_location);
1916
1917 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
1919 for port_name in node.out_ports.keys() {
1920 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1921 self._packets_by_location.remove(&output_port_location);
1922 }
1923 }
1924
1925 Ok(())
1926 }
1927
1928 fn undo_epoch_started(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
1930 let epoch = match self._epochs.get_mut(epoch_id) {
1931 Some(e) => e,
1932 None => {
1933 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
1934 }
1935 };
1936
1937 if epoch.state != EpochState::Running {
1939 return Err(UndoError::StateMismatch(format!(
1940 "epoch {} is not in Running state, cannot undo start",
1941 epoch_id
1942 )));
1943 }
1944
1945 epoch.state = EpochState::Startable;
1947
1948 self._startable_epochs.insert(*epoch_id);
1950
1951 Ok(())
1952 }
1953
1954 fn undo_epoch_finished(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
1956 let epoch_id = epoch.id;
1957
1958 if self._epochs.contains_key(&epoch_id) {
1960 return Err(UndoError::StateMismatch(format!(
1961 "epoch {} already exists",
1962 epoch_id
1963 )));
1964 }
1965
1966 self._epochs.insert(epoch_id, epoch.clone());
1969
1970 let epoch_location = PacketLocation::Node(epoch_id);
1972 self._packets_by_location
1973 .insert(epoch_location, IndexSet::new());
1974
1975 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
1977 for port_name in node.out_ports.keys() {
1978 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
1979 self._packets_by_location
1980 .insert(output_port_location, IndexSet::new());
1981 }
1982 }
1983
1984 self._node_to_epochs
1986 .entry(epoch.node_name.clone())
1987 .or_default()
1988 .push(epoch_id);
1989
1990 Ok(())
1991 }
1992
1993 fn undo_epoch_cancelled(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
1996 let epoch_id = epoch.id;
1997
1998 if self._epochs.contains_key(&epoch_id) {
2000 return Err(UndoError::StateMismatch(format!(
2001 "epoch {} already exists",
2002 epoch_id
2003 )));
2004 }
2005
2006 self._epochs.insert(epoch_id, epoch.clone());
2008
2009 let epoch_location = PacketLocation::Node(epoch_id);
2011 self._packets_by_location
2012 .insert(epoch_location, IndexSet::new());
2013
2014 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2016 for port_name in node.out_ports.keys() {
2017 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
2018 self._packets_by_location
2019 .insert(output_port_location, IndexSet::new());
2020 }
2021 }
2022
2023 self._node_to_epochs
2025 .entry(epoch.node_name.clone())
2026 .or_default()
2027 .push(epoch_id);
2028
2029 if epoch.state == EpochState::Startable {
2031 self._startable_epochs.insert(epoch_id);
2032 }
2033
2034 Ok(())
2035 }
2036
2037 fn undo_packet_moved(
2039 &mut self,
2040 packet_id: &PacketID,
2041 from: &PacketLocation,
2042 to: &PacketLocation,
2043 from_index: usize,
2044 ) -> Result<(), UndoError> {
2045 let packet = match self._packets.get(packet_id) {
2047 Some(p) => p,
2048 None => {
2049 return Err(UndoError::NotFound(format!(
2050 "packet {} not found",
2051 packet_id
2052 )));
2053 }
2054 };
2055
2056 if packet.location != *to {
2057 return Err(UndoError::StateMismatch(format!(
2058 "packet {} is not at expected location {:?}, found at {:?}",
2059 packet_id, to, packet.location
2060 )));
2061 }
2062
2063 if let Some(packets) = self._packets_by_location.get_mut(to) {
2065 packets.shift_remove(packet_id);
2066 }
2067
2068 let packets_at_from = self._packets_by_location.entry(from.clone()).or_default();
2070 packets_at_from.shift_insert(from_index, *packet_id);
2071
2072 self._packets.get_mut(packet_id).unwrap().location = from.clone();
2074
2075 Ok(())
2076 }
2077
2078 fn undo_output_salvo_triggered(
2080 &mut self,
2081 epoch_id: &EpochID,
2082 action: &NetAction,
2083 ) -> Result<(), UndoError> {
2084 if !matches!(action, NetAction::SendOutputSalvo(_, _)) {
2087 return Ok(());
2088 }
2089
2090 let epoch = match self._epochs.get_mut(epoch_id) {
2091 Some(e) => e,
2092 None => {
2093 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2094 }
2095 };
2096
2097 if epoch.out_salvos.pop().is_none() {
2099 return Err(UndoError::StateMismatch(format!(
2100 "epoch {} has no out_salvos to pop",
2101 epoch_id
2102 )));
2103 }
2104
2105 Ok(())
2108 }
2109
2110 fn undo_packet_orphaned(
2112 &mut self,
2113 packet_id: &PacketID,
2114 epoch_id: &EpochID,
2115 port_name: &PortName,
2116 ) -> Result<(), UndoError> {
2117 let packet = match self._packets.get(packet_id) {
2119 Some(p) => p,
2120 None => {
2121 return Err(UndoError::NotFound(format!(
2122 "packet {} not found",
2123 packet_id
2124 )));
2125 }
2126 };
2127
2128 if packet.location != PacketLocation::OutsideNet {
2129 return Err(UndoError::StateMismatch(format!(
2130 "packet {} is not at OutsideNet, found at {:?}",
2131 packet_id, packet.location
2132 )));
2133 }
2134
2135 if let Some(packets) = self
2137 ._packets_by_location
2138 .get_mut(&PacketLocation::OutsideNet)
2139 {
2140 packets.shift_remove(packet_id);
2141 }
2142
2143 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
2145 self._packets_by_location
2146 .entry(output_port_location.clone())
2147 .or_default()
2148 .insert(*packet_id);
2149
2150 self._packets.get_mut(packet_id).unwrap().location = output_port_location;
2152
2153 if let Some(epoch) = self._epochs.get_mut(epoch_id) {
2155 epoch
2156 .orphaned_packets
2157 .retain(|info| info.packet_id != *packet_id);
2158 }
2159
2160 Ok(())
2161 }
2162
2163 #[cfg(test)]
2166 pub fn startable_epoch_ids(&self) -> Vec<EpochID> {
2167 self.get_startable_epochs()
2168 }
2169}
2170
2171#[cfg(test)]
2172#[path = "net_tests.rs"]
2173mod tests;