1use crate::_utils::get_utc_now;
10use crate::graph::{
11 Edge, Graph, MaxSalvos, NodeName, PacketCount, Port, PortName, PortRef, PortSlotSpec, PortType,
12 SalvoConditionName, SalvoConditionTerm, evaluate_salvo_condition,
13};
14use indexmap::IndexSet;
15use std::collections::{HashMap, HashSet};
16use ulid::Ulid;
17
18pub type PacketID = Ulid;
20
21pub type EpochID = Ulid;
23
24#[derive(Debug, PartialEq, Eq, Hash, Clone)]
32pub enum PacketLocation {
33 Node(EpochID),
35 InputPort(NodeName, PortName),
37 OutputPort(EpochID, PortName),
39 Edge(Edge),
41 OutsideNet,
43}
44
45#[derive(Debug)]
47pub struct Packet {
48 pub id: PacketID,
50 pub location: PacketLocation,
52}
53
54#[derive(Debug, Clone)]
60pub struct Salvo {
61 pub salvo_condition: SalvoConditionName,
63 pub packets: Vec<(PortName, PacketID)>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int))]
70pub enum EpochState {
71 Startable,
73 Running,
75 Finished,
77}
78
79#[cfg(feature = "python")]
80#[pyo3::pymethods]
81impl EpochState {
82 fn __repr__(&self) -> String {
83 match self {
84 EpochState::Startable => "EpochState.Startable".to_string(),
85 EpochState::Running => "EpochState.Running".to_string(),
86 EpochState::Finished => "EpochState.Finished".to_string(),
87 }
88 }
89}
90
91#[derive(Debug, Clone)]
97pub struct Epoch {
98 pub id: EpochID,
100 pub node_name: NodeName,
102 pub in_salvo: Salvo,
104 pub out_salvos: Vec<Salvo>,
106 pub state: EpochState,
108 pub orphaned_packets: Vec<OrphanedPacketInfo>,
110}
111
112#[derive(Debug, Clone)]
117pub struct OrphanedPacketInfo {
118 pub packet_id: PacketID,
120 pub from_port: PortName,
122 pub salvo_condition: SalvoConditionName,
124}
125
126impl Epoch {
127 pub fn start_time(&self) -> u64 {
129 self.id.timestamp_ms()
130 }
131}
132
133pub type EventUTC = i128;
135
136#[derive(Debug, Clone)]
141pub enum NetAction {
142 RunStep,
151 CreatePacket(Option<EpochID>),
154 ConsumePacket(PacketID),
156 DestroyPacket(PacketID),
158 StartEpoch(EpochID),
160 FinishEpoch(EpochID),
162 CancelEpoch(EpochID),
164 CreateEpoch(NodeName, Salvo),
168 LoadPacketIntoOutputPort(PacketID, PortName),
170 SendOutputSalvo(EpochID, SalvoConditionName),
172 TransportPacketToLocation(PacketID, PacketLocation),
177}
178
179#[derive(Debug, thiserror::Error)]
181pub enum UndoError {
182 #[error("state mismatch: {0}")]
184 StateMismatch(String),
185
186 #[error("entity not found: {0}")]
188 NotFound(String),
189
190 #[error("action not undoable: {0}")]
192 NotUndoable(String),
193
194 #[error("internal error: {0}")]
196 InternalError(String),
197}
198
199#[derive(Debug, thiserror::Error)]
201pub enum NetActionError {
202 #[error("packet not found: {packet_id}")]
204 PacketNotFound { packet_id: PacketID },
205
206 #[error("epoch not found: {epoch_id}")]
208 EpochNotFound { epoch_id: EpochID },
209
210 #[error("epoch {epoch_id} is not running")]
212 EpochNotRunning { epoch_id: EpochID },
213
214 #[error("epoch {epoch_id} is not startable")]
216 EpochNotStartable { epoch_id: EpochID },
217
218 #[error("cannot finish epoch {epoch_id}: epoch still contains packets")]
220 CannotFinishNonEmptyEpoch { epoch_id: EpochID },
221
222 #[error("cannot finish epoch {epoch_id}: output port '{port_name}' has unsent packets")]
224 UnsentOutputSalvo {
225 epoch_id: EpochID,
226 port_name: PortName,
227 },
228
229 #[error("packet {packet_id} is not inside any epoch")]
231 PacketNotInAnyNode {
232 packet_id: PacketID,
233 },
234
235 #[error("output port '{port_name}' not found on node for epoch {epoch_id}")]
237 OutputPortNotFound {
238 port_name: PortName,
239 epoch_id: EpochID,
240 },
241
242 #[error("output salvo condition '{condition_name}' not found on node for epoch {epoch_id}")]
244 OutputSalvoConditionNotFound {
245 condition_name: SalvoConditionName,
246 epoch_id: EpochID,
247 },
248
249 #[error("max output salvos reached for condition '{condition_name}' on epoch {epoch_id}")]
251 MaxOutputSalvosReached {
252 condition_name: SalvoConditionName,
253 epoch_id: EpochID,
254 },
255
256 #[error("salvo condition '{condition_name}' not met for epoch {epoch_id}")]
258 SalvoConditionNotMet {
259 condition_name: SalvoConditionName,
260 epoch_id: EpochID,
261 },
262
263 #[error("output port '{port_name}' is full for epoch {epoch_id}")]
265 OutputPortFull {
266 port_name: PortName,
267 epoch_id: EpochID,
268 },
269
270 #[error("node not found: '{node_name}'")]
272 NodeNotFound { node_name: NodeName },
273
274 #[error("packet {packet_id} is not at input port '{port_name}' of node '{node_name}'")]
276 PacketNotAtInputPort {
277 packet_id: PacketID,
278 port_name: PortName,
279 node_name: NodeName,
280 },
281
282 #[error("input port '{port_name}' not found on node '{node_name}'")]
284 InputPortNotFound {
285 port_name: PortName,
286 node_name: NodeName,
287 },
288
289 #[error("input port '{port_name}' on node '{node_name}' is full")]
291 InputPortFull {
292 port_name: PortName,
293 node_name: NodeName,
294 },
295
296 #[error("cannot move packet {packet_id} out of running epoch {epoch_id}")]
298 CannotMovePacketFromRunningEpoch {
299 packet_id: PacketID,
300 epoch_id: EpochID,
301 },
302
303 #[error("cannot move packet {packet_id} into running epoch {epoch_id}")]
305 CannotMovePacketIntoRunningEpoch {
306 packet_id: PacketID,
307 epoch_id: EpochID,
308 },
309
310 #[error("edge not found: {edge}")]
312 EdgeNotFound { edge: Edge },
313}
314
315#[derive(Debug, Clone)]
321pub enum NetEvent {
322 PacketCreated(EventUTC, PacketID),
324 PacketConsumed(EventUTC, PacketID, PacketLocation),
327 PacketDestroyed(EventUTC, PacketID, PacketLocation),
330 EpochCreated(EventUTC, EpochID),
332 EpochStarted(EventUTC, EpochID),
334 EpochFinished(EventUTC, Epoch),
337 EpochCancelled(EventUTC, Epoch),
340 PacketMoved(EventUTC, PacketID, PacketLocation, PacketLocation, usize),
344 InputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
346 OutputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
348 PacketOrphaned(
351 EventUTC,
352 PacketID,
353 EpochID,
354 NodeName,
355 PortName,
356 SalvoConditionName,
357 ),
358}
359
360#[derive(Debug, Clone)]
362pub enum NetActionResponseData {
363 StepResult {
365 made_progress: bool,
368 },
369 Packet(PacketID),
371 CreatedEpoch(Epoch),
373 StartedEpoch(Epoch),
375 FinishedEpoch(Epoch),
377 CancelledEpoch(Epoch, Vec<PacketID>),
379 None,
381}
382
383#[derive(Debug)]
385pub enum NetActionResponse {
386 Success(NetActionResponseData, Vec<NetEvent>),
388 Error(NetActionError),
390}
391
392#[derive(Debug)]
401pub struct NetSim {
402 pub graph: Graph,
404 _packets: HashMap<PacketID, Packet>,
405 _packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>>,
406 _epochs: HashMap<EpochID, Epoch>,
407 _startable_epochs: HashSet<EpochID>,
408 _node_to_epochs: HashMap<NodeName, Vec<EpochID>>,
409}
410
411impl NetSim {
412 pub fn new(graph: Graph) -> Self {
416 let errors = graph.validate();
417 if !errors.is_empty() {
418 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
419 panic!("Cannot create NetSim with invalid graph:\n - {}", msgs.join("\n - "));
420 }
421
422 let mut packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>> = HashMap::new();
423
424 for edge in graph.edges() {
426 packets_by_location.insert(PacketLocation::Edge(edge.clone()), IndexSet::new());
427 }
428
429 for (node_name, node) in graph.nodes() {
431 for port_name in node.in_ports.keys() {
432 packets_by_location.insert(
433 PacketLocation::InputPort(node_name.clone(), port_name.clone()),
434 IndexSet::new(),
435 );
436 }
437 }
438
439 packets_by_location.insert(PacketLocation::OutsideNet, IndexSet::new());
441
442 NetSim {
446 graph,
447 _packets: HashMap::new(),
448 _packets_by_location: packets_by_location,
449 _epochs: HashMap::new(),
450 _startable_epochs: HashSet::new(),
451 _node_to_epochs: HashMap::new(),
452 }
453 }
454
455 fn move_packet(&mut self, packet_id: &PacketID, new_location: PacketLocation) {
456 let packet = self._packets.get_mut(packet_id).unwrap();
457 let packets_at_old_location = self
458 ._packets_by_location
459 .get_mut(&packet.location)
460 .expect("Packet location has no entry in self._packets_by_location.");
461 packets_at_old_location.shift_remove(packet_id);
462 packet.location = new_location;
463 if !self
464 ._packets_by_location
465 .get_mut(&packet.location)
466 .expect("Packet location has no entry in self._packets_by_location")
467 .insert(*packet_id)
468 {
469 panic!("Attempted to move packet to a location that already contains it.");
470 }
471 }
472
473 fn try_trigger_input_salvo(&mut self, node_name: &NodeName) -> (bool, Vec<NetEvent>) {
478 let mut events: Vec<NetEvent> = Vec::new();
479
480 let node = match self.graph.nodes().get(node_name) {
481 Some(n) => n,
482 None => return (false, events),
483 };
484
485 let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
486 let in_ports_clone: HashMap<PortName, Port> = node.in_ports.clone();
487
488 struct SalvoConditionData {
490 name: SalvoConditionName,
491 ports: HashMap<PortName, PacketCount>,
492 term: SalvoConditionTerm,
493 }
494
495 let salvo_conditions: Vec<SalvoConditionData> = node
496 .in_salvo_conditions
497 .iter()
498 .map(|(name, cond)| SalvoConditionData {
499 name: name.clone(),
500 ports: cond.ports.clone(),
501 term: cond.term.clone(),
502 })
503 .collect();
504
505 for salvo_cond_data in salvo_conditions {
507 let port_packet_counts: HashMap<PortName, u64> = in_port_names
509 .iter()
510 .map(|port_name| {
511 let count = self
512 ._packets_by_location
513 .get(&PacketLocation::InputPort(
514 node_name.clone(),
515 port_name.clone(),
516 ))
517 .map(|packets| packets.len() as u64)
518 .unwrap_or(0);
519 (port_name.clone(), count)
520 })
521 .collect();
522
523 if evaluate_salvo_condition(&salvo_cond_data.term, &port_packet_counts, &in_ports_clone)
525 {
526 let epoch_id = Ulid::new();
528
529 let mut salvo_packets: Vec<(PortName, PacketID)> = Vec::new();
532 let mut packets_to_move: Vec<(PacketID, PortName)> = Vec::new();
533
534 for (port_name, packet_count) in &salvo_cond_data.ports {
535 let port_location =
536 PacketLocation::InputPort(node_name.clone(), port_name.clone());
537 if let Some(packet_ids) = self._packets_by_location.get(&port_location) {
538 let take_count = match packet_count {
539 PacketCount::All => packet_ids.len(),
540 PacketCount::Count(n) => std::cmp::min(*n as usize, packet_ids.len()),
541 };
542 for pid in packet_ids.iter().take(take_count) {
543 salvo_packets.push((port_name.clone(), *pid));
544 packets_to_move.push((*pid, port_name.clone()));
545 }
546 }
547 }
548
549 let in_salvo = Salvo {
551 salvo_condition: salvo_cond_data.name.clone(),
552 packets: salvo_packets,
553 };
554
555 let epoch = Epoch {
557 id: epoch_id,
558 node_name: node_name.clone(),
559 in_salvo,
560 out_salvos: Vec::new(),
561 state: EpochState::Startable,
562 orphaned_packets: Vec::new(),
563 };
564
565 self._epochs.insert(epoch_id, epoch);
567 self._startable_epochs.insert(epoch_id);
568 self._node_to_epochs
569 .entry(node_name.clone())
570 .or_default()
571 .push(epoch_id);
572
573 let epoch_location = PacketLocation::Node(epoch_id);
575 self._packets_by_location
576 .insert(epoch_location.clone(), IndexSet::new());
577
578 let node = self
580 .graph
581 .nodes()
582 .get(node_name)
583 .expect("Node not found for epoch creation");
584 for out_port_name in node.out_ports.keys() {
585 let output_port_location =
586 PacketLocation::OutputPort(epoch_id, out_port_name.clone());
587 self._packets_by_location
588 .insert(output_port_location, IndexSet::new());
589 }
590
591 events.push(NetEvent::InputSalvoTriggered(
596 get_utc_now(),
597 epoch_id,
598 salvo_cond_data.name.clone(),
599 ));
600 events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
601
602 for (pid, port_name) in &packets_to_move {
604 let from_location =
605 PacketLocation::InputPort(node_name.clone(), port_name.clone());
606
607 let from_index = self
609 ._packets_by_location
610 .get(&from_location)
611 .and_then(|packets| packets.get_index_of(pid))
612 .expect("Packet should exist at from_location");
613
614 self.move_packet(pid, epoch_location.clone());
615 events.push(NetEvent::PacketMoved(
616 get_utc_now(),
617 *pid,
618 from_location,
619 epoch_location.clone(),
620 from_index,
621 ));
622 }
623
624 return (true, events);
626 }
627 }
628
629 (false, events)
630 }
631
632 fn run_step(&mut self) -> NetActionResponse {
633 let mut all_events: Vec<NetEvent> = Vec::new();
634 let mut made_progress = false;
635
636 struct EdgeMoveCandidate {
640 packet_id: PacketID,
641 from_location: PacketLocation,
642 from_index: usize,
643 input_port_location: PacketLocation,
644 can_move: bool,
645 }
646
647 let mut edge_candidates: Vec<EdgeMoveCandidate> = Vec::new();
648
649 for (location, packets) in &self._packets_by_location {
651 if let PacketLocation::Edge(edge_ref) = location {
652 if let Some(first_packet_id) = packets.first() {
654 let target_node_name = edge_ref.target.node_name.clone();
655 let target_port_name = edge_ref.target.port_name.clone();
656
657 let node = self
659 .graph
660 .nodes()
661 .get(&target_node_name)
662 .expect("Edge targets a non-existent node");
663 let port = node
664 .in_ports
665 .get(&target_port_name)
666 .expect("Edge targets a non-existent input port");
667
668 let input_port_location = PacketLocation::InputPort(
669 target_node_name.clone(),
670 target_port_name.clone(),
671 );
672 let current_count = self
673 ._packets_by_location
674 .get(&input_port_location)
675 .map(|packets| packets.len() as u64)
676 .unwrap_or(0);
677
678 let can_move = match port.slots_spec {
679 PortSlotSpec::Infinite => true,
680 PortSlotSpec::Finite(max_slots) => current_count < max_slots,
681 };
682
683 edge_candidates.push(EdgeMoveCandidate {
684 packet_id: *first_packet_id,
685 from_location: location.clone(),
686 from_index: 0, input_port_location,
688 can_move,
689 });
690 }
691 }
692 }
693
694 for candidate in edge_candidates {
696 if !candidate.can_move {
697 continue;
698 }
699
700 self.move_packet(&candidate.packet_id, candidate.input_port_location.clone());
702 all_events.push(NetEvent::PacketMoved(
703 get_utc_now(),
704 candidate.packet_id,
705 candidate.from_location,
706 candidate.input_port_location.clone(),
707 candidate.from_index,
708 ));
709 made_progress = true;
710 }
711
712 let mut nodes_with_input_packets: Vec<NodeName> = Vec::new();
714 for (location, packets) in &self._packets_by_location {
715 if let PacketLocation::InputPort(node_name, _) = location
716 && !packets.is_empty()
717 && !nodes_with_input_packets.contains(node_name)
718 {
719 nodes_with_input_packets.push(node_name.clone());
720 }
721 }
722
723 for node_name in nodes_with_input_packets {
724 let (triggered, events) = self.try_trigger_input_salvo(&node_name);
725 all_events.extend(events);
726 if triggered {
727 made_progress = true;
728 }
729 }
730
731 NetActionResponse::Success(
732 NetActionResponseData::StepResult { made_progress },
733 all_events,
734 )
735 }
736
737 fn create_packet(&mut self, maybe_epoch_id: &Option<EpochID>) -> NetActionResponse {
738 if let Some(epoch_id) = maybe_epoch_id {
740 if !self._epochs.contains_key(epoch_id) {
741 return NetActionResponse::Error(NetActionError::EpochNotFound {
742 epoch_id: *epoch_id,
743 });
744 }
745 if !matches!(self._epochs[epoch_id].state, EpochState::Running) {
746 return NetActionResponse::Error(NetActionError::EpochNotRunning {
747 epoch_id: *epoch_id,
748 });
749 }
750 }
751
752 let packet_location = match maybe_epoch_id {
753 Some(epoch_id) => PacketLocation::Node(*epoch_id),
754 None => PacketLocation::OutsideNet,
755 };
756
757 let packet = Packet {
758 id: Ulid::new(),
759 location: packet_location.clone(),
760 };
761
762 let packet_id = packet.id;
763 self._packets.insert(packet.id, packet);
764
765 self._packets_by_location
767 .entry(packet_location)
768 .or_default()
769 .insert(packet_id);
770
771 NetActionResponse::Success(
772 NetActionResponseData::Packet(packet_id),
773 vec![NetEvent::PacketCreated(get_utc_now(), packet_id)],
774 )
775 }
776
777 fn consume_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
778 if !self._packets.contains_key(packet_id) {
779 return NetActionResponse::Error(NetActionError::PacketNotFound {
780 packet_id: *packet_id,
781 });
782 }
783
784 let location = self._packets[packet_id].location.clone();
785
786 if let Some(packets) = self._packets_by_location.get_mut(&location) {
787 if packets.shift_remove(packet_id) {
788 self._packets.remove(packet_id);
789 NetActionResponse::Success(
790 NetActionResponseData::None,
791 vec![NetEvent::PacketConsumed(
792 get_utc_now(),
793 *packet_id,
794 location,
795 )],
796 )
797 } else {
798 panic!(
799 "Packet with ID {} not found in location {:?}",
800 packet_id, location
801 );
802 }
803 } else {
804 panic!("Packet location {:?} not found", location);
805 }
806 }
807
808 fn destroy_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
809 if !self._packets.contains_key(packet_id) {
810 return NetActionResponse::Error(NetActionError::PacketNotFound {
811 packet_id: *packet_id,
812 });
813 }
814
815 let location = self._packets[packet_id].location.clone();
816
817 if let Some(packets) = self._packets_by_location.get_mut(&location) {
818 if packets.shift_remove(packet_id) {
819 self._packets.remove(packet_id);
820 NetActionResponse::Success(
821 NetActionResponseData::None,
822 vec![NetEvent::PacketDestroyed(
823 get_utc_now(),
824 *packet_id,
825 location,
826 )],
827 )
828 } else {
829 panic!(
830 "Packet with ID {} not found in location {:?}",
831 packet_id, location
832 );
833 }
834 } else {
835 panic!("Packet location {:?} not found", location);
836 }
837 }
838
839 fn start_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
840 if let Some(epoch) = self._epochs.get_mut(epoch_id) {
841 if !self._startable_epochs.contains(epoch_id) {
842 return NetActionResponse::Error(NetActionError::EpochNotStartable {
843 epoch_id: *epoch_id,
844 });
845 }
846 debug_assert!(
847 matches!(epoch.state, EpochState::Startable),
848 "Epoch state is not Startable but was in net._startable_epochs."
849 );
850 epoch.state = EpochState::Running;
851 self._startable_epochs.remove(epoch_id);
852 NetActionResponse::Success(
853 NetActionResponseData::StartedEpoch(epoch.clone()),
854 vec![NetEvent::EpochStarted(get_utc_now(), *epoch_id)],
855 )
856 } else {
857 NetActionResponse::Error(NetActionError::EpochNotFound {
858 epoch_id: *epoch_id,
859 })
860 }
861 }
862
863 fn finish_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
864 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
866 epoch.clone()
867 } else {
868 return NetActionResponse::Error(NetActionError::EpochNotFound {
869 epoch_id: *epoch_id,
870 });
871 };
872
873 if epoch.state != EpochState::Running {
875 return NetActionResponse::Error(NetActionError::EpochNotRunning {
876 epoch_id: *epoch_id,
877 });
878 }
879
880 let epoch_loc = PacketLocation::Node(*epoch_id);
882 if let Some(packets) = self._packets_by_location.get(&epoch_loc) {
883 if !packets.is_empty() {
884 return NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch {
885 epoch_id: *epoch_id,
886 });
887 }
888 } else {
889 panic!("Epoch {} not found in location {:?}", epoch_id, epoch_loc);
890 }
891
892 let node = self
894 .graph
895 .nodes()
896 .get(&epoch.node_name)
897 .expect("Epoch references non-existent node");
898 for port_name in node.out_ports.keys() {
899 let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
900 if let Some(packets) = self._packets_by_location.get(&output_port_loc)
901 && !packets.is_empty()
902 {
903 return NetActionResponse::Error(NetActionError::UnsentOutputSalvo {
904 epoch_id: *epoch_id,
905 port_name: port_name.clone(),
906 });
907 }
908 }
909
910 let epoch_before_finish = self._epochs[epoch_id].clone();
913
914 let mut epoch = self._epochs.remove(epoch_id).unwrap();
915 epoch.state = EpochState::Finished;
916
917 self._packets_by_location.remove(&epoch_loc);
919 for port_name in node.out_ports.keys() {
920 let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
921 self._packets_by_location.remove(&output_port_loc);
922 }
923
924 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
926 epoch_ids.retain(|id| id != epoch_id);
927 }
928
929 NetActionResponse::Success(
930 NetActionResponseData::FinishedEpoch(epoch),
931 vec![NetEvent::EpochFinished(get_utc_now(), epoch_before_finish)],
932 )
933 }
934
935 fn cancel_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
936 let epoch_for_event = if let Some(epoch) = self._epochs.get(epoch_id) {
938 epoch.clone()
939 } else {
940 return NetActionResponse::Error(NetActionError::EpochNotFound {
941 epoch_id: *epoch_id,
942 });
943 };
944
945 let mut events: Vec<NetEvent> = Vec::new();
946 let mut destroyed_packets: Vec<PacketID> = Vec::new();
947
948 let epoch_location = PacketLocation::Node(*epoch_id);
950 if let Some(packet_ids) = self._packets_by_location.get(&epoch_location) {
951 destroyed_packets.extend(packet_ids.iter().cloned());
952 }
953
954 let node = self
956 .graph
957 .nodes()
958 .get(&epoch_for_event.node_name)
959 .expect("Epoch references non-existent node");
960 for port_name in node.out_ports.keys() {
961 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
962 if let Some(packet_ids) = self._packets_by_location.get(&output_port_location) {
963 destroyed_packets.extend(packet_ids.iter().cloned());
964 }
965 }
966
967 for packet_id in &destroyed_packets {
969 let packet = self
970 ._packets
971 .remove(packet_id)
972 .expect("Packet in location map not found in packets map");
973 let packet_location = packet.location.clone();
974 if let Some(packets_at_location) = self._packets_by_location.get_mut(&packet_location) {
975 packets_at_location.shift_remove(packet_id);
976 }
977 events.push(NetEvent::PacketDestroyed(
978 get_utc_now(),
979 *packet_id,
980 packet_location,
981 ));
982 }
983
984 for port_name in node.out_ports.keys() {
986 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
987 self._packets_by_location.remove(&output_port_location);
988 }
989
990 self._packets_by_location.remove(&epoch_location);
992
993 self._startable_epochs.remove(epoch_id);
995
996 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch_for_event.node_name) {
998 epoch_ids.retain(|id| id != epoch_id);
999 }
1000
1001 let epoch = self._epochs.remove(epoch_id).expect("Epoch should exist");
1003
1004 events.push(NetEvent::EpochCancelled(get_utc_now(), epoch_for_event));
1005
1006 NetActionResponse::Success(
1007 NetActionResponseData::CancelledEpoch(epoch, destroyed_packets),
1008 events,
1009 )
1010 }
1011
1012 fn create_epoch(&mut self, node_name: &NodeName, salvo: &Salvo) -> NetActionResponse {
1013 let node = match self.graph.nodes().get(node_name) {
1015 Some(node) => node,
1016 None => {
1017 return NetActionResponse::Error(NetActionError::NodeNotFound {
1018 node_name: node_name.clone(),
1019 });
1020 }
1021 };
1022
1023 for (port_name, packet_id) in &salvo.packets {
1025 if !node.in_ports.contains_key(port_name) {
1027 return NetActionResponse::Error(NetActionError::InputPortNotFound {
1028 port_name: port_name.clone(),
1029 node_name: node_name.clone(),
1030 });
1031 }
1032
1033 let packet = match self._packets.get(packet_id) {
1035 Some(packet) => packet,
1036 None => {
1037 return NetActionResponse::Error(NetActionError::PacketNotFound {
1038 packet_id: *packet_id,
1039 });
1040 }
1041 };
1042
1043 let expected_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1045 if packet.location != expected_location {
1046 return NetActionResponse::Error(NetActionError::PacketNotAtInputPort {
1047 packet_id: *packet_id,
1048 port_name: port_name.clone(),
1049 node_name: node_name.clone(),
1050 });
1051 }
1052 }
1053
1054 let mut events: Vec<NetEvent> = Vec::new();
1055
1056 let epoch_id = Ulid::new();
1058 let epoch = Epoch {
1059 id: epoch_id,
1060 node_name: node_name.clone(),
1061 in_salvo: salvo.clone(),
1062 out_salvos: Vec::new(),
1063 state: EpochState::Startable,
1064 orphaned_packets: Vec::new(),
1065 };
1066
1067 self._epochs.insert(epoch_id, epoch.clone());
1069 self._startable_epochs.insert(epoch_id);
1070 self._node_to_epochs
1071 .entry(node_name.clone())
1072 .or_default()
1073 .push(epoch_id);
1074
1075 let epoch_location = PacketLocation::Node(epoch_id);
1077 self._packets_by_location
1078 .insert(epoch_location.clone(), IndexSet::new());
1079
1080 for port_name in node.out_ports.keys() {
1082 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
1083 self._packets_by_location
1084 .insert(output_port_location, IndexSet::new());
1085 }
1086
1087 events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
1088
1089 for (port_name, packet_id) in &salvo.packets {
1091 let from_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1092
1093 let from_index = self
1095 ._packets_by_location
1096 .get(&from_location)
1097 .and_then(|packets| packets.get_index_of(packet_id))
1098 .expect("Packet should exist at from_location");
1099
1100 self.move_packet(packet_id, epoch_location.clone());
1101 events.push(NetEvent::PacketMoved(
1102 get_utc_now(),
1103 *packet_id,
1104 from_location,
1105 epoch_location.clone(),
1106 from_index,
1107 ));
1108 }
1109
1110 NetActionResponse::Success(NetActionResponseData::CreatedEpoch(epoch), events)
1111 }
1112
1113 fn load_packet_into_output_port(
1114 &mut self,
1115 packet_id: &PacketID,
1116 port_name: &str,
1117 ) -> NetActionResponse {
1118 let (epoch_id, old_location) = if let Some(packet) = self._packets.get(packet_id) {
1119 if let PacketLocation::Node(epoch_id) = packet.location {
1120 (epoch_id, packet.location.clone())
1121 } else {
1122 return NetActionResponse::Error(NetActionError::PacketNotInAnyNode {
1123 packet_id: *packet_id,
1124 });
1125 }
1126 } else {
1127 return NetActionResponse::Error(NetActionError::PacketNotFound {
1128 packet_id: *packet_id,
1129 });
1130 };
1131
1132 let node_name = self
1133 ._epochs
1134 .get(&epoch_id)
1135 .expect("The epoch id in the location of a packet could not be found.")
1136 .node_name
1137 .clone();
1138 let node = self
1139 .graph
1140 .nodes()
1141 .get(&node_name)
1142 .expect("Packet located in a non-existing node (yet the node has an epoch).");
1143
1144 if !node.out_ports.contains_key(port_name) {
1145 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1146 port_name: port_name.to_string(),
1147 epoch_id,
1148 });
1149 }
1150
1151 let port = node.out_ports.get(port_name).unwrap();
1152 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.to_string());
1153 let port_packets = self
1154 ._packets_by_location
1155 .get(&output_port_location)
1156 .expect("No entry in NetSim._packets_by_location found for output port.");
1157
1158 if let PortSlotSpec::Finite(num_slots) = port.slots_spec
1160 && port_packets.len() as u64 >= num_slots
1161 {
1162 return NetActionResponse::Error(NetActionError::OutputPortFull {
1163 port_name: port_name.to_string(),
1164 epoch_id,
1165 });
1166 }
1167
1168 let from_index = self
1170 ._packets_by_location
1171 .get(&old_location)
1172 .and_then(|packets| packets.get_index_of(packet_id))
1173 .expect("Packet should exist at old_location");
1174
1175 let new_location = output_port_location;
1176 self.move_packet(packet_id, new_location.clone());
1177 NetActionResponse::Success(
1178 NetActionResponseData::None,
1179 vec![NetEvent::PacketMoved(
1180 get_utc_now(),
1181 *packet_id,
1182 old_location,
1183 new_location,
1184 from_index,
1185 )],
1186 )
1187 }
1188
1189 fn send_output_salvo(
1190 &mut self,
1191 epoch_id: &EpochID,
1192 salvo_condition_name: &SalvoConditionName,
1193 ) -> NetActionResponse {
1194 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
1196 epoch
1197 } else {
1198 return NetActionResponse::Error(NetActionError::EpochNotFound {
1199 epoch_id: *epoch_id,
1200 });
1201 };
1202
1203 let node = self
1205 .graph
1206 .nodes()
1207 .get(&epoch.node_name)
1208 .expect("Node associated with epoch could not be found.");
1209 let node_name = node.name.clone();
1210
1211 let salvo_condition =
1213 if let Some(salvo_condition) = node.out_salvo_conditions.get(salvo_condition_name) {
1214 salvo_condition
1215 } else {
1216 return NetActionResponse::Error(NetActionError::OutputSalvoConditionNotFound {
1217 condition_name: salvo_condition_name.clone(),
1218 epoch_id: *epoch_id,
1219 });
1220 };
1221
1222 if let MaxSalvos::Finite(max) = salvo_condition.max_salvos {
1224 let condition_salvo_count = epoch
1225 .out_salvos
1226 .iter()
1227 .filter(|s| s.salvo_condition == *salvo_condition_name)
1228 .count() as u64;
1229 if condition_salvo_count >= max {
1230 return NetActionResponse::Error(NetActionError::MaxOutputSalvosReached {
1231 condition_name: salvo_condition_name.clone(),
1232 epoch_id: *epoch_id,
1233 });
1234 }
1235 }
1236
1237 let port_packet_counts: HashMap<PortName, u64> = node
1239 .out_ports
1240 .keys()
1241 .map(|port_name| {
1242 let count = self
1243 ._packets_by_location
1244 .get(&PacketLocation::OutputPort(*epoch_id, port_name.clone()))
1245 .map(|packets| packets.len() as u64)
1246 .unwrap_or(0);
1247 (port_name.clone(), count)
1248 })
1249 .collect();
1250 if !evaluate_salvo_condition(&salvo_condition.term, &port_packet_counts, &node.out_ports) {
1251 return NetActionResponse::Error(NetActionError::SalvoConditionNotMet {
1252 condition_name: salvo_condition_name.clone(),
1253 epoch_id: *epoch_id,
1254 });
1255 }
1256
1257 let mut packets_to_move: Vec<(
1260 PacketID,
1261 PortName,
1262 PacketLocation,
1263 PacketLocation,
1264 bool,
1265 )> = Vec::new();
1266 for (port_name, packet_count) in &salvo_condition.ports {
1267 let from_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1268 let packets = self
1269 ._packets_by_location
1270 .get(&from_location)
1271 .unwrap_or_else(|| {
1272 panic!(
1273 "Output port '{}' of node '{}' does not have an entry in self._packets_by_location",
1274 port_name,
1275 node_name
1276 )
1277 })
1278 .clone();
1279
1280 let (to_location, is_orphaned) = if let Some(edge_ref) =
1282 self.graph.get_edge_by_tail(&PortRef {
1283 node_name: node_name.clone(),
1284 port_type: PortType::Output,
1285 port_name: port_name.clone(),
1286 }) {
1287 (PacketLocation::Edge(edge_ref.clone()), false)
1289 } else {
1290 (PacketLocation::OutsideNet, true)
1292 };
1293
1294 let take_count = match packet_count {
1295 PacketCount::All => packets.len(),
1296 PacketCount::Count(n) => std::cmp::min(*n as usize, packets.len()),
1297 };
1298 for packet_id in packets.into_iter().take(take_count) {
1299 packets_to_move.push((
1300 packet_id,
1301 port_name.clone(),
1302 from_location.clone(),
1303 to_location.clone(),
1304 is_orphaned,
1305 ));
1306 }
1307 }
1308
1309 let salvo = Salvo {
1311 salvo_condition: salvo_condition_name.clone(),
1312 packets: packets_to_move
1313 .iter()
1314 .map(|(packet_id, port_name, _, _, _)| (port_name.clone(), *packet_id))
1315 .collect(),
1316 };
1317 self._epochs
1318 .get_mut(epoch_id)
1319 .unwrap()
1320 .out_salvos
1321 .push(salvo);
1322
1323 let mut net_events = Vec::new();
1325 let mut orphaned_infos: Vec<OrphanedPacketInfo> = Vec::new();
1326
1327 for (packet_id, port_name, from_location, to_location, is_orphaned) in
1328 packets_to_move
1329 {
1330 if is_orphaned {
1331 net_events.push(NetEvent::PacketOrphaned(
1333 get_utc_now(),
1334 packet_id,
1335 *epoch_id,
1336 node_name.clone(),
1337 port_name.clone(),
1338 salvo_condition_name.clone(),
1339 ));
1340 orphaned_infos.push(OrphanedPacketInfo {
1341 packet_id,
1342 from_port: port_name,
1343 salvo_condition: salvo_condition_name.clone(),
1344 });
1345 } else {
1346 let from_index = self
1348 ._packets_by_location
1349 .get(&from_location)
1350 .and_then(|packets| packets.get_index_of(&packet_id))
1351 .expect("Packet should exist at from_location");
1352
1353 net_events.push(NetEvent::PacketMoved(
1355 get_utc_now(),
1356 packet_id,
1357 from_location,
1358 to_location.clone(),
1359 from_index,
1360 ));
1361 }
1362 self.move_packet(&packet_id, to_location);
1363 }
1364
1365 if !orphaned_infos.is_empty() {
1367 self._epochs
1368 .get_mut(epoch_id)
1369 .unwrap()
1370 .orphaned_packets
1371 .extend(orphaned_infos);
1372 }
1373
1374 net_events.push(NetEvent::OutputSalvoTriggered(
1376 get_utc_now(),
1377 *epoch_id,
1378 salvo_condition_name.clone(),
1379 ));
1380
1381 NetActionResponse::Success(NetActionResponseData::None, net_events)
1382 }
1383
1384 fn transport_packet_to_location(
1385 &mut self,
1386 packet_id: &PacketID,
1387 destination: &PacketLocation,
1388 ) -> NetActionResponse {
1389 let packet = if let Some(p) = self._packets.get(packet_id) {
1391 p
1392 } else {
1393 return NetActionResponse::Error(NetActionError::PacketNotFound {
1394 packet_id: *packet_id,
1395 });
1396 };
1397 let current_location = packet.location.clone();
1398
1399 match ¤t_location {
1401 PacketLocation::Node(epoch_id) => {
1402 if let Some(epoch) = self._epochs.get(epoch_id)
1403 && epoch.state == EpochState::Running
1404 {
1405 return NetActionResponse::Error(
1406 NetActionError::CannotMovePacketFromRunningEpoch {
1407 packet_id: *packet_id,
1408 epoch_id: *epoch_id,
1409 },
1410 );
1411 }
1412 }
1413 PacketLocation::OutputPort(epoch_id, _) => {
1414 if let Some(epoch) = self._epochs.get(epoch_id)
1415 && epoch.state == EpochState::Running
1416 {
1417 return NetActionResponse::Error(
1418 NetActionError::CannotMovePacketFromRunningEpoch {
1419 packet_id: *packet_id,
1420 epoch_id: *epoch_id,
1421 },
1422 );
1423 }
1424 }
1425 _ => {}
1426 }
1427
1428 match destination {
1430 PacketLocation::Node(epoch_id) => {
1431 if let Some(epoch) = self._epochs.get(epoch_id) {
1432 if epoch.state == EpochState::Running {
1433 return NetActionResponse::Error(
1434 NetActionError::CannotMovePacketIntoRunningEpoch {
1435 packet_id: *packet_id,
1436 epoch_id: *epoch_id,
1437 },
1438 );
1439 }
1440 } else {
1441 return NetActionResponse::Error(NetActionError::EpochNotFound {
1442 epoch_id: *epoch_id,
1443 });
1444 }
1445 }
1446 PacketLocation::OutputPort(epoch_id, port_name) => {
1447 if let Some(epoch) = self._epochs.get(epoch_id) {
1448 if epoch.state == EpochState::Running {
1449 return NetActionResponse::Error(
1450 NetActionError::CannotMovePacketIntoRunningEpoch {
1451 packet_id: *packet_id,
1452 epoch_id: *epoch_id,
1453 },
1454 );
1455 }
1456 let node = self
1458 .graph
1459 .nodes()
1460 .get(&epoch.node_name)
1461 .expect("Node associated with epoch could not be found.");
1462 if !node.out_ports.contains_key(port_name) {
1463 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1464 port_name: port_name.clone(),
1465 epoch_id: *epoch_id,
1466 });
1467 }
1468 } else {
1469 return NetActionResponse::Error(NetActionError::EpochNotFound {
1470 epoch_id: *epoch_id,
1471 });
1472 }
1473 }
1474 PacketLocation::InputPort(node_name, port_name) => {
1475 let node = if let Some(n) = self.graph.nodes().get(node_name) {
1477 n
1478 } else {
1479 return NetActionResponse::Error(NetActionError::NodeNotFound {
1480 node_name: node_name.clone(),
1481 });
1482 };
1483 let port = if let Some(p) = node.in_ports.get(port_name) {
1485 p
1486 } else {
1487 return NetActionResponse::Error(NetActionError::InputPortNotFound {
1488 port_name: port_name.clone(),
1489 node_name: node_name.clone(),
1490 });
1491 };
1492 let current_count = self
1494 ._packets_by_location
1495 .get(destination)
1496 .map(|s| s.len())
1497 .unwrap_or(0);
1498 let is_full = match &port.slots_spec {
1499 PortSlotSpec::Infinite => false,
1500 PortSlotSpec::Finite(capacity) => current_count >= *capacity as usize,
1501 };
1502 if is_full {
1503 return NetActionResponse::Error(NetActionError::InputPortFull {
1504 port_name: port_name.clone(),
1505 node_name: node_name.clone(),
1506 });
1507 }
1508 }
1509 PacketLocation::Edge(edge) => {
1510 if !self.graph.edges().contains(edge) {
1512 return NetActionResponse::Error(NetActionError::EdgeNotFound {
1513 edge: edge.clone(),
1514 });
1515 }
1516 }
1517 PacketLocation::OutsideNet => {
1518 }
1520 }
1521
1522 let from_index = self
1524 ._packets_by_location
1525 .get(¤t_location)
1526 .and_then(|packets| packets.get_index_of(packet_id))
1527 .expect("Packet should exist at current_location");
1528
1529 self.move_packet(packet_id, destination.clone());
1531
1532 NetActionResponse::Success(
1533 NetActionResponseData::None,
1534 vec![NetEvent::PacketMoved(
1535 get_utc_now(),
1536 *packet_id,
1537 current_location,
1538 destination.clone(),
1539 from_index,
1540 )],
1541 )
1542 }
1543
1544 pub fn do_action(&mut self, action: &NetAction) -> NetActionResponse {
1577 match action {
1578 NetAction::RunStep => self.run_step(),
1579 NetAction::CreatePacket(maybe_epoch_id) => self.create_packet(maybe_epoch_id),
1580 NetAction::ConsumePacket(packet_id) => self.consume_packet(packet_id),
1581 NetAction::DestroyPacket(packet_id) => self.destroy_packet(packet_id),
1582 NetAction::StartEpoch(epoch_id) => self.start_epoch(epoch_id),
1583 NetAction::FinishEpoch(epoch_id) => self.finish_epoch(epoch_id),
1584 NetAction::CancelEpoch(epoch_id) => self.cancel_epoch(epoch_id),
1585 NetAction::CreateEpoch(node_name, salvo) => self.create_epoch(node_name, salvo),
1586 NetAction::LoadPacketIntoOutputPort(packet_id, port_name) => {
1587 self.load_packet_into_output_port(packet_id, port_name)
1588 }
1589 NetAction::SendOutputSalvo(epoch_id, salvo_condition_name) => {
1590 self.send_output_salvo(epoch_id, salvo_condition_name)
1591 }
1592 NetAction::TransportPacketToLocation(packet_id, location) => {
1593 self.transport_packet_to_location(packet_id, location)
1594 }
1595 }
1596 }
1597
1598 pub fn packet_count_at(&self, location: &PacketLocation) -> usize {
1602 self._packets_by_location
1603 .get(location)
1604 .map(|s| s.len())
1605 .unwrap_or(0)
1606 }
1607
1608 pub fn get_packets_at_location(&self, location: &PacketLocation) -> Vec<PacketID> {
1610 self._packets_by_location
1611 .get(location)
1612 .map(|s| s.iter().cloned().collect())
1613 .unwrap_or_default()
1614 }
1615
1616 pub fn get_epoch(&self, epoch_id: &EpochID) -> Option<&Epoch> {
1618 self._epochs.get(epoch_id)
1619 }
1620
1621 pub fn get_startable_epochs(&self) -> Vec<EpochID> {
1623 self._startable_epochs.iter().cloned().collect()
1624 }
1625
1626 pub fn get_packet(&self, packet_id: &PacketID) -> Option<&Packet> {
1628 self._packets.get(packet_id)
1629 }
1630
1631 pub fn run_until_blocked(&mut self) -> Vec<NetEvent> {
1641 let mut all_events = Vec::new();
1642 while !self.is_blocked() {
1643 if let NetActionResponse::Success(_, events) = self.do_action(&NetAction::RunStep) {
1644 all_events.extend(events);
1645 }
1646 }
1647 all_events
1648 }
1649
1650 pub fn is_blocked(&self) -> bool {
1656 for (location, packets) in &self._packets_by_location {
1658 if let PacketLocation::Edge(edge_ref) = location {
1659 if packets.is_empty() {
1660 continue;
1661 }
1662
1663 let target_node_name = &edge_ref.target.node_name;
1664 let target_port_name = &edge_ref.target.port_name;
1665
1666 let node = match self.graph.nodes().get(target_node_name) {
1667 Some(n) => n,
1668 None => continue,
1669 };
1670 let port = match node.in_ports.get(target_port_name) {
1671 Some(p) => p,
1672 None => continue,
1673 };
1674
1675 let input_port_location =
1676 PacketLocation::InputPort(target_node_name.clone(), target_port_name.clone());
1677 let current_count = self
1678 ._packets_by_location
1679 .get(&input_port_location)
1680 .map(|p| p.len() as u64)
1681 .unwrap_or(0);
1682
1683 let can_move = match port.slots_spec {
1684 PortSlotSpec::Infinite => true,
1685 PortSlotSpec::Finite(max_slots) => current_count < max_slots,
1686 };
1687
1688 if can_move {
1689 return false; }
1691 }
1692 }
1693
1694 for (location, packets) in &self._packets_by_location {
1696 if let PacketLocation::InputPort(node_name, _) = location {
1697 if packets.is_empty() {
1698 continue;
1699 }
1700
1701 if self.can_trigger_input_salvo(node_name) {
1703 return false; }
1705 }
1706 }
1707
1708 true }
1710
1711 fn can_trigger_input_salvo(&self, node_name: &NodeName) -> bool {
1713 let node = match self.graph.nodes().get(node_name) {
1714 Some(n) => n,
1715 None => return false,
1716 };
1717
1718 let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
1719
1720 let port_packet_counts: HashMap<PortName, u64> = in_port_names
1722 .iter()
1723 .map(|port_name| {
1724 let count = self
1725 ._packets_by_location
1726 .get(&PacketLocation::InputPort(
1727 node_name.clone(),
1728 port_name.clone(),
1729 ))
1730 .map(|packets| packets.len() as u64)
1731 .unwrap_or(0);
1732 (port_name.clone(), count)
1733 })
1734 .collect();
1735
1736 for cond in node.in_salvo_conditions.values() {
1738 if evaluate_salvo_condition(&cond.term, &port_packet_counts, &node.in_ports) {
1739 return true;
1740 }
1741 }
1742
1743 false
1744 }
1745
1746 pub fn undo_action(
1767 &mut self,
1768 action: &NetAction,
1769 events: &[NetEvent],
1770 ) -> Result<(), UndoError> {
1771 for event in events.iter().rev() {
1773 self.undo_event(action, event)?;
1774 }
1775 Ok(())
1776 }
1777
1778 fn undo_event(&mut self, action: &NetAction, event: &NetEvent) -> Result<(), UndoError> {
1780 match event {
1781 NetEvent::PacketCreated(_, packet_id) => self.undo_packet_created(packet_id),
1782 NetEvent::PacketConsumed(_, packet_id, location) => {
1783 self.undo_packet_consumed(packet_id, location)
1784 }
1785 NetEvent::PacketDestroyed(_, packet_id, location) => {
1786 self.undo_packet_destroyed(packet_id, location)
1787 }
1788 NetEvent::EpochCreated(_, epoch_id) => self.undo_epoch_created(epoch_id),
1789 NetEvent::EpochStarted(_, epoch_id) => self.undo_epoch_started(epoch_id),
1790 NetEvent::EpochFinished(_, epoch) => self.undo_epoch_finished(epoch),
1791 NetEvent::EpochCancelled(_, epoch) => self.undo_epoch_cancelled(epoch),
1792 NetEvent::PacketMoved(_, packet_id, from, to, from_index) => {
1793 self.undo_packet_moved(packet_id, from, to, *from_index)
1794 }
1795 NetEvent::InputSalvoTriggered(_, _, _) => {
1796 Ok(())
1798 }
1799 NetEvent::OutputSalvoTriggered(_, epoch_id, _) => {
1800 self.undo_output_salvo_triggered(epoch_id, action)
1802 }
1803 NetEvent::PacketOrphaned(_, packet_id, epoch_id, _, port_name, _) => {
1804 self.undo_packet_orphaned(packet_id, epoch_id, port_name)
1806 }
1807 }
1808 }
1809
1810 fn undo_packet_created(&mut self, packet_id: &PacketID) -> Result<(), UndoError> {
1812 let location = match self._packets.get(packet_id) {
1814 Some(p) => p.location.clone(),
1815 None => {
1816 return Err(UndoError::NotFound(format!(
1817 "packet {} not found",
1818 packet_id
1819 )));
1820 }
1821 };
1822
1823 if let Some(packets) = self._packets_by_location.get_mut(&location) {
1825 packets.shift_remove(packet_id);
1826 }
1827
1828 self._packets.remove(packet_id);
1830
1831 Ok(())
1832 }
1833
1834 fn undo_packet_consumed(
1836 &mut self,
1837 packet_id: &PacketID,
1838 location: &PacketLocation,
1839 ) -> Result<(), UndoError> {
1840 self.recreate_packet(packet_id, location)
1841 }
1842
1843 fn undo_packet_destroyed(
1845 &mut self,
1846 packet_id: &PacketID,
1847 location: &PacketLocation,
1848 ) -> Result<(), UndoError> {
1849 self.recreate_packet(packet_id, location)
1850 }
1851
1852 fn recreate_packet(
1854 &mut self,
1855 packet_id: &PacketID,
1856 location: &PacketLocation,
1857 ) -> Result<(), UndoError> {
1858 if self._packets.contains_key(packet_id) {
1860 return Err(UndoError::StateMismatch(format!(
1861 "packet {} already exists",
1862 packet_id
1863 )));
1864 }
1865
1866 let packet = Packet {
1868 id: *packet_id,
1869 location: location.clone(),
1870 };
1871 self._packets.insert(*packet_id, packet);
1872
1873 self._packets_by_location
1875 .entry(location.clone())
1876 .or_default()
1877 .insert(*packet_id);
1878
1879 Ok(())
1880 }
1881
1882 fn undo_epoch_created(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
1884 let epoch = match self._epochs.get(epoch_id) {
1886 Some(e) => e.clone(),
1887 None => {
1888 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
1889 }
1890 };
1891
1892 self._epochs.remove(epoch_id);
1894
1895 self._startable_epochs.remove(epoch_id);
1897
1898 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
1900 epoch_ids.retain(|id| id != epoch_id);
1901 if epoch_ids.is_empty() {
1903 self._node_to_epochs.remove(&epoch.node_name);
1904 }
1905 }
1906
1907 let epoch_location = PacketLocation::Node(*epoch_id);
1909 self._packets_by_location.remove(&epoch_location);
1910
1911 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
1913 for port_name in node.out_ports.keys() {
1914 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1915 self._packets_by_location.remove(&output_port_location);
1916 }
1917 }
1918
1919 Ok(())
1920 }
1921
1922 fn undo_epoch_started(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
1924 let epoch = match self._epochs.get_mut(epoch_id) {
1925 Some(e) => e,
1926 None => {
1927 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
1928 }
1929 };
1930
1931 if epoch.state != EpochState::Running {
1933 return Err(UndoError::StateMismatch(format!(
1934 "epoch {} is not in Running state, cannot undo start",
1935 epoch_id
1936 )));
1937 }
1938
1939 epoch.state = EpochState::Startable;
1941
1942 self._startable_epochs.insert(*epoch_id);
1944
1945 Ok(())
1946 }
1947
1948 fn undo_epoch_finished(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
1950 let epoch_id = epoch.id;
1951
1952 if self._epochs.contains_key(&epoch_id) {
1954 return Err(UndoError::StateMismatch(format!(
1955 "epoch {} already exists",
1956 epoch_id
1957 )));
1958 }
1959
1960 self._epochs.insert(epoch_id, epoch.clone());
1963
1964 let epoch_location = PacketLocation::Node(epoch_id);
1966 self._packets_by_location
1967 .insert(epoch_location, IndexSet::new());
1968
1969 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
1971 for port_name in node.out_ports.keys() {
1972 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
1973 self._packets_by_location
1974 .insert(output_port_location, IndexSet::new());
1975 }
1976 }
1977
1978 self._node_to_epochs
1980 .entry(epoch.node_name.clone())
1981 .or_default()
1982 .push(epoch_id);
1983
1984 Ok(())
1985 }
1986
1987 fn undo_epoch_cancelled(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
1990 let epoch_id = epoch.id;
1991
1992 if self._epochs.contains_key(&epoch_id) {
1994 return Err(UndoError::StateMismatch(format!(
1995 "epoch {} already exists",
1996 epoch_id
1997 )));
1998 }
1999
2000 self._epochs.insert(epoch_id, epoch.clone());
2002
2003 let epoch_location = PacketLocation::Node(epoch_id);
2005 self._packets_by_location
2006 .insert(epoch_location, IndexSet::new());
2007
2008 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2010 for port_name in node.out_ports.keys() {
2011 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
2012 self._packets_by_location
2013 .insert(output_port_location, IndexSet::new());
2014 }
2015 }
2016
2017 self._node_to_epochs
2019 .entry(epoch.node_name.clone())
2020 .or_default()
2021 .push(epoch_id);
2022
2023 if epoch.state == EpochState::Startable {
2025 self._startable_epochs.insert(epoch_id);
2026 }
2027
2028 Ok(())
2029 }
2030
2031 fn undo_packet_moved(
2033 &mut self,
2034 packet_id: &PacketID,
2035 from: &PacketLocation,
2036 to: &PacketLocation,
2037 from_index: usize,
2038 ) -> Result<(), UndoError> {
2039 let packet = match self._packets.get(packet_id) {
2041 Some(p) => p,
2042 None => {
2043 return Err(UndoError::NotFound(format!(
2044 "packet {} not found",
2045 packet_id
2046 )));
2047 }
2048 };
2049
2050 if packet.location != *to {
2051 return Err(UndoError::StateMismatch(format!(
2052 "packet {} is not at expected location {:?}, found at {:?}",
2053 packet_id, to, packet.location
2054 )));
2055 }
2056
2057 if let Some(packets) = self._packets_by_location.get_mut(to) {
2059 packets.shift_remove(packet_id);
2060 }
2061
2062 let packets_at_from = self._packets_by_location.entry(from.clone()).or_default();
2064 packets_at_from.shift_insert(from_index, *packet_id);
2065
2066 self._packets.get_mut(packet_id).unwrap().location = from.clone();
2068
2069 Ok(())
2070 }
2071
2072 fn undo_output_salvo_triggered(
2074 &mut self,
2075 epoch_id: &EpochID,
2076 action: &NetAction,
2077 ) -> Result<(), UndoError> {
2078 if !matches!(action, NetAction::SendOutputSalvo(_, _)) {
2081 return Ok(());
2082 }
2083
2084 let epoch = match self._epochs.get_mut(epoch_id) {
2085 Some(e) => e,
2086 None => {
2087 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2088 }
2089 };
2090
2091 if epoch.out_salvos.pop().is_none() {
2093 return Err(UndoError::StateMismatch(format!(
2094 "epoch {} has no out_salvos to pop",
2095 epoch_id
2096 )));
2097 }
2098
2099 Ok(())
2102 }
2103
2104 fn undo_packet_orphaned(
2106 &mut self,
2107 packet_id: &PacketID,
2108 epoch_id: &EpochID,
2109 port_name: &PortName,
2110 ) -> Result<(), UndoError> {
2111 let packet = match self._packets.get(packet_id) {
2113 Some(p) => p,
2114 None => {
2115 return Err(UndoError::NotFound(format!(
2116 "packet {} not found",
2117 packet_id
2118 )));
2119 }
2120 };
2121
2122 if packet.location != PacketLocation::OutsideNet {
2123 return Err(UndoError::StateMismatch(format!(
2124 "packet {} is not at OutsideNet, found at {:?}",
2125 packet_id, packet.location
2126 )));
2127 }
2128
2129 if let Some(packets) = self
2131 ._packets_by_location
2132 .get_mut(&PacketLocation::OutsideNet)
2133 {
2134 packets.shift_remove(packet_id);
2135 }
2136
2137 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
2139 self._packets_by_location
2140 .entry(output_port_location.clone())
2141 .or_default()
2142 .insert(*packet_id);
2143
2144 self._packets.get_mut(packet_id).unwrap().location = output_port_location;
2146
2147 if let Some(epoch) = self._epochs.get_mut(epoch_id) {
2149 epoch
2150 .orphaned_packets
2151 .retain(|info| info.packet_id != *packet_id);
2152 }
2153
2154 Ok(())
2155 }
2156
2157 #[cfg(test)]
2160 pub fn startable_epoch_ids(&self) -> Vec<EpochID> {
2161 self.get_startable_epochs()
2162 }
2163}
2164
2165#[cfg(test)]
2166#[path = "net_tests.rs"]
2167mod tests;