1use crate::_utils::get_utc_now;
10use crate::graph::{
11 DependencyRequestTrigger, Edge, Graph, MaxSalvos, NodeName, PacketCount, Port, PortName,
12 PortRef, PortSlotSpec, PortType, SalvoConditionName, SalvoConditionTerm,
13 evaluate_salvo_condition,
14};
15use indexmap::IndexSet;
16use std::collections::{HashMap, HashSet};
17use ulid::Ulid;
18
19pub const REQUEST_SALVO_CONDITION: &str = "__request__";
21
22pub type PacketID = Ulid;
24
25pub type EpochID = Ulid;
27
28#[derive(Debug, PartialEq, Eq, Hash, Clone)]
36pub enum PacketLocation {
37 Node(EpochID),
39 InputPort(NodeName, PortName),
41 OutputPort(EpochID, PortName),
43 Edge(Edge),
45 OutsideNet,
47}
48
49#[derive(Debug)]
51pub struct Packet {
52 pub id: PacketID,
54 pub location: PacketLocation,
56}
57
58#[derive(Debug, Clone)]
64pub struct Salvo {
65 pub salvo_condition: SalvoConditionName,
67 pub packets: Vec<(PortName, PacketID)>,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int))]
74pub enum EpochState {
75 Startable,
77 Running,
79 Finished,
81}
82
83#[cfg(feature = "python")]
84#[pyo3::pymethods]
85impl EpochState {
86 fn __repr__(&self) -> String {
87 match self {
88 EpochState::Startable => "EpochState.Startable".to_string(),
89 EpochState::Running => "EpochState.Running".to_string(),
90 EpochState::Finished => "EpochState.Finished".to_string(),
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
101pub struct Epoch {
102 pub id: EpochID,
104 pub node_name: NodeName,
106 pub in_salvo: Salvo,
108 pub out_salvos: Vec<Salvo>,
110 pub state: EpochState,
112 pub orphaned_packets: Vec<OrphanedPacketInfo>,
114}
115
116#[derive(Debug, Clone)]
121pub struct OrphanedPacketInfo {
122 pub packet_id: PacketID,
124 pub from_port: PortName,
126 pub salvo_condition: SalvoConditionName,
128}
129
130impl Epoch {
131 pub fn start_time(&self) -> u64 {
133 self.id.timestamp_ms()
134 }
135}
136
137pub type EventUTC = i128;
139
140#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum RequestCreatedSource {
143 External,
145 OnStartup,
147 OnNoSalvoTriggered,
149}
150
151#[derive(Debug, Clone)]
153pub struct PendingRequest {
154 pub node_name: NodeName,
156 pub label: String,
158}
159
160#[derive(Debug, Clone)]
165pub enum NetAction {
166 RunStep,
175 CreatePacket(Option<EpochID>),
178 ConsumePacket(PacketID),
180 DestroyPacket(PacketID),
182 StartEpoch(EpochID),
184 FinishEpoch(EpochID),
186 CancelEpoch(EpochID),
188 CreateEpoch(NodeName, Salvo),
192 LoadPacketIntoOutputPort(PacketID, PortName),
194 SendOutputSalvo(EpochID, SalvoConditionName),
196 TransportPacketToLocation(PacketID, PacketLocation),
201 CreateRequest(NodeName, String),
204}
205
206#[derive(Debug, thiserror::Error)]
208pub enum UndoError {
209 #[error("state mismatch: {0}")]
211 StateMismatch(String),
212
213 #[error("entity not found: {0}")]
215 NotFound(String),
216
217 #[error("action not undoable: {0}")]
219 NotUndoable(String),
220
221 #[error("internal error: {0}")]
223 InternalError(String),
224}
225
226#[derive(Debug, thiserror::Error)]
228pub enum NetActionError {
229 #[error("packet not found: {packet_id}")]
231 PacketNotFound { packet_id: PacketID },
232
233 #[error("epoch not found: {epoch_id}")]
235 EpochNotFound { epoch_id: EpochID },
236
237 #[error("epoch {epoch_id} is not running")]
239 EpochNotRunning { epoch_id: EpochID },
240
241 #[error("epoch {epoch_id} is not startable")]
243 EpochNotStartable { epoch_id: EpochID },
244
245 #[error("cannot finish epoch {epoch_id}: epoch still contains packets")]
247 CannotFinishNonEmptyEpoch { epoch_id: EpochID },
248
249 #[error("cannot finish epoch {epoch_id}: output port '{port_name}' has unsent packets")]
251 UnsentOutputSalvo {
252 epoch_id: EpochID,
253 port_name: PortName,
254 },
255
256 #[error("packet {packet_id} is not inside any epoch")]
258 PacketNotInAnyNode { packet_id: PacketID },
259
260 #[error("output port '{port_name}' not found on node for epoch {epoch_id}")]
262 OutputPortNotFound {
263 port_name: PortName,
264 epoch_id: EpochID,
265 },
266
267 #[error("output salvo condition '{condition_name}' not found on node for epoch {epoch_id}")]
269 OutputSalvoConditionNotFound {
270 condition_name: SalvoConditionName,
271 epoch_id: EpochID,
272 },
273
274 #[error("max output salvos reached for condition '{condition_name}' on epoch {epoch_id}")]
276 MaxOutputSalvosReached {
277 condition_name: SalvoConditionName,
278 epoch_id: EpochID,
279 },
280
281 #[error("salvo condition '{condition_name}' not met for epoch {epoch_id}")]
283 SalvoConditionNotMet {
284 condition_name: SalvoConditionName,
285 epoch_id: EpochID,
286 },
287
288 #[error("output port '{port_name}' is full for epoch {epoch_id}")]
290 OutputPortFull {
291 port_name: PortName,
292 epoch_id: EpochID,
293 },
294
295 #[error("node not found: '{node_name}'")]
297 NodeNotFound { node_name: NodeName },
298
299 #[error("packet {packet_id} is not at input port '{port_name}' of node '{node_name}'")]
301 PacketNotAtInputPort {
302 packet_id: PacketID,
303 port_name: PortName,
304 node_name: NodeName,
305 },
306
307 #[error("input port '{port_name}' not found on node '{node_name}'")]
309 InputPortNotFound {
310 port_name: PortName,
311 node_name: NodeName,
312 },
313
314 #[error("input port '{port_name}' on node '{node_name}' is full")]
316 InputPortFull {
317 port_name: PortName,
318 node_name: NodeName,
319 },
320
321 #[error("cannot move packet {packet_id} out of running epoch {epoch_id}")]
323 CannotMovePacketFromRunningEpoch {
324 packet_id: PacketID,
325 epoch_id: EpochID,
326 },
327
328 #[error("cannot move packet {packet_id} into running epoch {epoch_id}")]
330 CannotMovePacketIntoRunningEpoch {
331 packet_id: PacketID,
332 epoch_id: EpochID,
333 },
334
335 #[error("edge not found: {edge}")]
337 EdgeNotFound { edge: Edge },
338
339 #[error("request cascade cycle detected at node '{node_name}'")]
341 RequestCycleDetected { node_name: NodeName },
342
343 #[error("request cascade reached unconnected input port '{port_name}' on node '{node_name}'")]
345 RequestUnconnectedPort {
346 node_name: NodeName,
347 port_name: PortName,
348 },
349
350 #[error("request targets non-existent node '{node_name}'")]
352 RequestNodeNotFound { node_name: NodeName },
353}
354
355#[derive(Debug, Clone)]
361pub enum NetEvent {
362 PacketCreated(EventUTC, PacketID),
364 PacketConsumed(EventUTC, PacketID, PacketLocation),
367 PacketDestroyed(EventUTC, PacketID, PacketLocation),
370 EpochCreated(EventUTC, EpochID),
372 EpochStarted(EventUTC, EpochID),
374 EpochFinished(EventUTC, Epoch),
377 EpochCancelled(EventUTC, Epoch),
380 PacketMoved(EventUTC, PacketID, PacketLocation, PacketLocation, usize),
384 InputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
386 OutputSalvoTriggered(EventUTC, EpochID, SalvoConditionName),
388 PacketOrphaned(
391 EventUTC,
392 PacketID,
393 EpochID,
394 NodeName,
395 PortName,
396 SalvoConditionName,
397 ),
398 RequestCreated(EventUTC, NodeName, String, RequestCreatedSource),
401 RequestCascadeResolved(EventUTC, Vec<NodeName>, String),
404 RequestEpochCreated(EventUTC, EpochID, NodeName, String),
407}
408
409#[derive(Debug, Clone)]
411pub enum NetActionResponseData {
412 StepResult {
414 made_progress: bool,
417 },
418 Packet(PacketID),
420 CreatedEpoch(Epoch),
422 StartedEpoch(Epoch),
424 FinishedEpoch(Epoch),
426 CancelledEpoch(Epoch, Vec<PacketID>),
428 None,
430}
431
432#[derive(Debug)]
434pub enum NetActionResponse {
435 Success(NetActionResponseData, Vec<NetEvent>),
437 Error(NetActionError),
439}
440
441#[derive(Debug)]
450pub struct NetSim {
451 pub graph: Graph,
453 _packets: HashMap<PacketID, Packet>,
454 _packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>>,
455 _epochs: HashMap<EpochID, Epoch>,
456 _startable_epochs: HashSet<EpochID>,
457 _node_to_epochs: HashMap<NodeName, Vec<EpochID>>,
458 _pending_requests: Vec<PendingRequest>,
459 _request_tokens: HashMap<NodeName, bool>,
460 _startup_requests_sent: bool,
461}
462
463impl NetSim {
464 pub fn new(graph: Graph) -> Self {
468 let errors = graph.validate();
469 if !errors.is_empty() {
470 let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
471 panic!(
472 "Cannot create NetSim with invalid graph:\n - {}",
473 msgs.join("\n - ")
474 );
475 }
476
477 let mut packets_by_location: HashMap<PacketLocation, IndexSet<PacketID>> = HashMap::new();
478
479 for edge in graph.edges() {
481 packets_by_location.insert(PacketLocation::Edge(edge.clone()), IndexSet::new());
482 }
483
484 for (node_name, node) in graph.nodes() {
486 for port_name in node.in_ports.keys() {
487 packets_by_location.insert(
488 PacketLocation::InputPort(node_name.clone(), port_name.clone()),
489 IndexSet::new(),
490 );
491 }
492 }
493
494 packets_by_location.insert(PacketLocation::OutsideNet, IndexSet::new());
496
497 let mut request_tokens: HashMap<NodeName, bool> = HashMap::new();
502 for (node_name, node) in graph.nodes() {
503 if let Some(config) = &node.dependency_request_config
504 && config
505 .triggers
506 .contains(&DependencyRequestTrigger::OnNoSalvoTriggered)
507 {
508 request_tokens.insert(node_name.clone(), true);
509 }
510 }
511
512 NetSim {
513 graph,
514 _packets: HashMap::new(),
515 _packets_by_location: packets_by_location,
516 _epochs: HashMap::new(),
517 _startable_epochs: HashSet::new(),
518 _node_to_epochs: HashMap::new(),
519 _pending_requests: Vec::new(),
520 _request_tokens: request_tokens,
521 _startup_requests_sent: false,
522 }
523 }
524
525 fn move_packet(&mut self, packet_id: &PacketID, new_location: PacketLocation) {
526 let packet = self._packets.get_mut(packet_id).unwrap();
527 let packets_at_old_location = self
528 ._packets_by_location
529 .get_mut(&packet.location)
530 .expect("Packet location has no entry in self._packets_by_location.");
531 packets_at_old_location.shift_remove(packet_id);
532 packet.location = new_location;
533 if !self
534 ._packets_by_location
535 .get_mut(&packet.location)
536 .expect("Packet location has no entry in self._packets_by_location")
537 .insert(*packet_id)
538 {
539 panic!("Attempted to move packet to a location that already contains it.");
540 }
541 }
542
543 fn try_trigger_input_salvo(&mut self, node_name: &NodeName) -> (bool, Vec<NetEvent>) {
548 let mut events: Vec<NetEvent> = Vec::new();
549
550 let node = match self.graph.nodes().get(node_name) {
551 Some(n) => n,
552 None => return (false, events),
553 };
554
555 let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
556 let in_ports_clone: HashMap<PortName, Port> = node.in_ports.clone();
557
558 struct SalvoConditionData {
560 name: SalvoConditionName,
561 ports: HashMap<PortName, PacketCount>,
562 term: SalvoConditionTerm,
563 }
564
565 let salvo_conditions: Vec<SalvoConditionData> = node
566 .in_salvo_conditions
567 .iter()
568 .map(|(name, cond)| SalvoConditionData {
569 name: name.clone(),
570 ports: cond.ports.clone(),
571 term: cond.term.clone(),
572 })
573 .collect();
574
575 for salvo_cond_data in salvo_conditions {
577 let port_packet_counts: HashMap<PortName, u64> = in_port_names
579 .iter()
580 .map(|port_name| {
581 let count = self
582 ._packets_by_location
583 .get(&PacketLocation::InputPort(
584 node_name.clone(),
585 port_name.clone(),
586 ))
587 .map(|packets| packets.len() as u64)
588 .unwrap_or(0);
589 (port_name.clone(), count)
590 })
591 .collect();
592
593 if evaluate_salvo_condition(&salvo_cond_data.term, &port_packet_counts, &in_ports_clone)
595 {
596 let epoch_id = Ulid::new();
598
599 let mut salvo_packets: Vec<(PortName, PacketID)> = Vec::new();
602 let mut packets_to_move: Vec<(PacketID, PortName)> = Vec::new();
603
604 for (port_name, packet_count) in &salvo_cond_data.ports {
605 let port_location =
606 PacketLocation::InputPort(node_name.clone(), port_name.clone());
607 if let Some(packet_ids) = self._packets_by_location.get(&port_location) {
608 let take_count = match packet_count {
609 PacketCount::All => packet_ids.len(),
610 PacketCount::Count(n) => std::cmp::min(*n as usize, packet_ids.len()),
611 };
612 for pid in packet_ids.iter().take(take_count) {
613 salvo_packets.push((port_name.clone(), *pid));
614 packets_to_move.push((*pid, port_name.clone()));
615 }
616 }
617 }
618
619 let in_salvo = Salvo {
621 salvo_condition: salvo_cond_data.name.clone(),
622 packets: salvo_packets,
623 };
624
625 let epoch = Epoch {
627 id: epoch_id,
628 node_name: node_name.clone(),
629 in_salvo,
630 out_salvos: Vec::new(),
631 state: EpochState::Startable,
632 orphaned_packets: Vec::new(),
633 };
634
635 self._epochs.insert(epoch_id, epoch);
637 self._startable_epochs.insert(epoch_id);
638 self._node_to_epochs
639 .entry(node_name.clone())
640 .or_default()
641 .push(epoch_id);
642
643 let epoch_location = PacketLocation::Node(epoch_id);
645 self._packets_by_location
646 .insert(epoch_location.clone(), IndexSet::new());
647
648 let node = self
650 .graph
651 .nodes()
652 .get(node_name)
653 .expect("Node not found for epoch creation");
654 for out_port_name in node.out_ports.keys() {
655 let output_port_location =
656 PacketLocation::OutputPort(epoch_id, out_port_name.clone());
657 self._packets_by_location
658 .insert(output_port_location, IndexSet::new());
659 }
660
661 events.push(NetEvent::InputSalvoTriggered(
666 get_utc_now(),
667 epoch_id,
668 salvo_cond_data.name.clone(),
669 ));
670 events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
671
672 for (pid, port_name) in &packets_to_move {
674 let from_location =
675 PacketLocation::InputPort(node_name.clone(), port_name.clone());
676
677 let from_index = self
679 ._packets_by_location
680 .get(&from_location)
681 .and_then(|packets| packets.get_index_of(pid))
682 .expect("Packet should exist at from_location");
683
684 self.move_packet(pid, epoch_location.clone());
685 events.push(NetEvent::PacketMoved(
686 get_utc_now(),
687 *pid,
688 from_location,
689 epoch_location.clone(),
690 from_index,
691 ));
692 }
693
694 return (true, events);
696 }
697 }
698
699 (false, events)
700 }
701
702 fn run_step(&mut self) -> NetActionResponse {
703 let mut all_events: Vec<NetEvent> = Vec::new();
704 let mut made_progress = false;
705
706 struct EdgeMoveCandidate {
710 packet_id: PacketID,
711 from_location: PacketLocation,
712 from_index: usize,
713 input_port_location: PacketLocation,
714 can_move: bool,
715 }
716
717 let mut edge_candidates: Vec<EdgeMoveCandidate> = Vec::new();
718
719 for (location, packets) in &self._packets_by_location {
721 if let PacketLocation::Edge(edge_ref) = location {
722 if let Some(first_packet_id) = packets.first() {
724 let target_node_name = edge_ref.target.node_name.clone();
725 let target_port_name = edge_ref.target.port_name.clone();
726
727 let node = self
729 .graph
730 .nodes()
731 .get(&target_node_name)
732 .expect("Edge targets a non-existent node");
733 let port = node
734 .in_ports
735 .get(&target_port_name)
736 .expect("Edge targets a non-existent input port");
737
738 let input_port_location = PacketLocation::InputPort(
739 target_node_name.clone(),
740 target_port_name.clone(),
741 );
742 let current_count = self
743 ._packets_by_location
744 .get(&input_port_location)
745 .map(|packets| packets.len() as u64)
746 .unwrap_or(0);
747
748 let can_move = match port.slots_spec {
749 PortSlotSpec::Infinite => true,
750 PortSlotSpec::Finite(max_slots) => current_count < max_slots,
751 };
752
753 edge_candidates.push(EdgeMoveCandidate {
754 packet_id: *first_packet_id,
755 from_location: location.clone(),
756 from_index: 0, input_port_location,
758 can_move,
759 });
760 }
761 }
762 }
763
764 for candidate in edge_candidates {
766 if !candidate.can_move {
767 continue;
768 }
769
770 self.move_packet(&candidate.packet_id, candidate.input_port_location.clone());
772 all_events.push(NetEvent::PacketMoved(
773 get_utc_now(),
774 candidate.packet_id,
775 candidate.from_location,
776 candidate.input_port_location.clone(),
777 candidate.from_index,
778 ));
779 made_progress = true;
780 }
781
782 let mut nodes_with_input_packets: Vec<NodeName> = Vec::new();
784 for (location, packets) in &self._packets_by_location {
785 if let PacketLocation::InputPort(node_name, _) = location
786 && !packets.is_empty()
787 && !nodes_with_input_packets.contains(node_name)
788 {
789 nodes_with_input_packets.push(node_name.clone());
790 }
791 }
792
793 let mut nodes_that_triggered_salvo: HashSet<NodeName> = HashSet::new();
794 for node_name in &nodes_with_input_packets {
795 let (triggered, events) = self.try_trigger_input_salvo(node_name);
796 all_events.extend(events);
797 if triggered {
798 made_progress = true;
799 nodes_that_triggered_salvo.insert(node_name.clone());
800 }
801 }
802
803 if !self._startup_requests_sent {
808 let mut found_startup = false;
809 for (node_name, node) in self.graph.nodes() {
810 if let Some(config) = &node.dependency_request_config
811 && config
812 .triggers
813 .contains(&DependencyRequestTrigger::OnStartup)
814 {
815 found_startup = true;
816 all_events.push(NetEvent::RequestCreated(
817 get_utc_now(),
818 node_name.clone(),
819 config.label.clone(),
820 RequestCreatedSource::OnStartup,
821 ));
822 self._pending_requests.push(PendingRequest {
823 node_name: node_name.clone(),
824 label: config.label.clone(),
825 });
826 }
827 }
828 if found_startup {
829 self._startup_requests_sent = true;
830 }
831 }
832
833 {
835 let no_salvo_nodes: Vec<(NodeName, String)> = nodes_with_input_packets
836 .iter()
837 .filter(|node_name| !nodes_that_triggered_salvo.contains(*node_name))
838 .filter_map(|node_name| {
839 let node = self.graph.nodes().get(node_name)?;
840 let config = node.dependency_request_config.as_ref()?;
841 if config
842 .triggers
843 .contains(&DependencyRequestTrigger::OnNoSalvoTriggered)
844 && self._request_tokens.get(node_name) == Some(&true)
845 {
846 Some((node_name.clone(), config.label.clone()))
847 } else {
848 None
849 }
850 })
851 .collect();
852
853 for (node_name, label) in no_salvo_nodes {
854 self._request_tokens.insert(node_name.clone(), false);
856 all_events.push(NetEvent::RequestCreated(
857 get_utc_now(),
858 node_name.clone(),
859 label.clone(),
860 RequestCreatedSource::OnNoSalvoTriggered,
861 ));
862 self._pending_requests
863 .push(PendingRequest { node_name, label });
864 }
865 }
866
867 if !self._pending_requests.is_empty() {
869 let pending_requests: Vec<PendingRequest> = self._pending_requests.drain(..).collect();
870
871 let mut source_label_pairs: Vec<(NodeName, String)> = Vec::new();
874
875 for request in &pending_requests {
876 let start_ports: Vec<PortRef> = self
878 .graph
879 .dependency_edges()
880 .iter()
881 .filter(|dep_edge| dep_edge.target.node_name == request.node_name)
882 .map(|dep_edge| dep_edge.target.clone())
883 .collect();
884
885 if start_ports.is_empty() {
886 continue;
887 }
888
889 match self.graph.cascade_backward(&start_ports) {
891 Ok(result) => {
892 all_events.push(NetEvent::RequestCascadeResolved(
893 get_utc_now(),
894 result.source_nodes.clone(),
895 request.label.clone(),
896 ));
897
898 for source_node in &result.source_nodes {
899 let pair = (source_node.clone(), request.label.clone());
900 if !source_label_pairs.contains(&pair) {
901 source_label_pairs.push(pair);
902 }
903 }
904 }
905 Err(crate::graph::CascadeError::CycleDetected { node_name }) => {
906 return NetActionResponse::Error(NetActionError::RequestCycleDetected {
907 node_name,
908 });
909 }
910 Err(crate::graph::CascadeError::UnconnectedInputPort {
911 node_name,
912 port_name,
913 }) => {
914 return NetActionResponse::Error(NetActionError::RequestUnconnectedPort {
915 node_name,
916 port_name,
917 });
918 }
919 }
920 }
921
922 for (source_node_name, label) in &source_label_pairs {
924 let epoch_id = Ulid::new();
925 let in_salvo = Salvo {
926 salvo_condition: REQUEST_SALVO_CONDITION.to_string(),
927 packets: vec![],
928 };
929
930 let node = self
931 .graph
932 .nodes()
933 .get(source_node_name)
934 .expect("Source node from cascade should exist");
935
936 let epoch = Epoch {
937 id: epoch_id,
938 node_name: source_node_name.clone(),
939 in_salvo,
940 out_salvos: Vec::new(),
941 state: EpochState::Startable,
942 orphaned_packets: Vec::new(),
943 };
944
945 self._epochs.insert(epoch_id, epoch);
946 self._startable_epochs.insert(epoch_id);
947 self._node_to_epochs
948 .entry(source_node_name.clone())
949 .or_default()
950 .push(epoch_id);
951
952 let epoch_location = PacketLocation::Node(epoch_id);
954 self._packets_by_location
955 .insert(epoch_location, IndexSet::new());
956 for out_port_name in node.out_ports.keys() {
957 let output_port_location =
958 PacketLocation::OutputPort(epoch_id, out_port_name.clone());
959 self._packets_by_location
960 .insert(output_port_location, IndexSet::new());
961 }
962
963 all_events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
964 all_events.push(NetEvent::RequestEpochCreated(
965 get_utc_now(),
966 epoch_id,
967 source_node_name.clone(),
968 label.clone(),
969 ));
970 made_progress = true;
971 }
972 }
973
974 NetActionResponse::Success(
975 NetActionResponseData::StepResult { made_progress },
976 all_events,
977 )
978 }
979
980 fn create_packet(&mut self, maybe_epoch_id: &Option<EpochID>) -> NetActionResponse {
981 if let Some(epoch_id) = maybe_epoch_id {
983 if !self._epochs.contains_key(epoch_id) {
984 return NetActionResponse::Error(NetActionError::EpochNotFound {
985 epoch_id: *epoch_id,
986 });
987 }
988 if !matches!(self._epochs[epoch_id].state, EpochState::Running) {
989 return NetActionResponse::Error(NetActionError::EpochNotRunning {
990 epoch_id: *epoch_id,
991 });
992 }
993 }
994
995 let packet_location = match maybe_epoch_id {
996 Some(epoch_id) => PacketLocation::Node(*epoch_id),
997 None => PacketLocation::OutsideNet,
998 };
999
1000 let packet = Packet {
1001 id: Ulid::new(),
1002 location: packet_location.clone(),
1003 };
1004
1005 let packet_id = packet.id;
1006 self._packets.insert(packet.id, packet);
1007
1008 self._packets_by_location
1010 .entry(packet_location)
1011 .or_default()
1012 .insert(packet_id);
1013
1014 NetActionResponse::Success(
1015 NetActionResponseData::Packet(packet_id),
1016 vec![NetEvent::PacketCreated(get_utc_now(), packet_id)],
1017 )
1018 }
1019
1020 fn consume_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
1021 if !self._packets.contains_key(packet_id) {
1022 return NetActionResponse::Error(NetActionError::PacketNotFound {
1023 packet_id: *packet_id,
1024 });
1025 }
1026
1027 let location = self._packets[packet_id].location.clone();
1028
1029 if let Some(packets) = self._packets_by_location.get_mut(&location) {
1030 if packets.shift_remove(packet_id) {
1031 self._packets.remove(packet_id);
1032 NetActionResponse::Success(
1033 NetActionResponseData::None,
1034 vec![NetEvent::PacketConsumed(
1035 get_utc_now(),
1036 *packet_id,
1037 location,
1038 )],
1039 )
1040 } else {
1041 panic!(
1042 "Packet with ID {} not found in location {:?}",
1043 packet_id, location
1044 );
1045 }
1046 } else {
1047 panic!("Packet location {:?} not found", location);
1048 }
1049 }
1050
1051 fn destroy_packet(&mut self, packet_id: &PacketID) -> NetActionResponse {
1052 if !self._packets.contains_key(packet_id) {
1053 return NetActionResponse::Error(NetActionError::PacketNotFound {
1054 packet_id: *packet_id,
1055 });
1056 }
1057
1058 let location = self._packets[packet_id].location.clone();
1059
1060 if let Some(packets) = self._packets_by_location.get_mut(&location) {
1061 if packets.shift_remove(packet_id) {
1062 self._packets.remove(packet_id);
1063 NetActionResponse::Success(
1064 NetActionResponseData::None,
1065 vec![NetEvent::PacketDestroyed(
1066 get_utc_now(),
1067 *packet_id,
1068 location,
1069 )],
1070 )
1071 } else {
1072 panic!(
1073 "Packet with ID {} not found in location {:?}",
1074 packet_id, location
1075 );
1076 }
1077 } else {
1078 panic!("Packet location {:?} not found", location);
1079 }
1080 }
1081
1082 fn start_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
1083 if let Some(epoch) = self._epochs.get_mut(epoch_id) {
1084 if !self._startable_epochs.contains(epoch_id) {
1085 return NetActionResponse::Error(NetActionError::EpochNotStartable {
1086 epoch_id: *epoch_id,
1087 });
1088 }
1089 debug_assert!(
1090 matches!(epoch.state, EpochState::Startable),
1091 "Epoch state is not Startable but was in net._startable_epochs."
1092 );
1093 epoch.state = EpochState::Running;
1094 self._startable_epochs.remove(epoch_id);
1095 NetActionResponse::Success(
1096 NetActionResponseData::StartedEpoch(epoch.clone()),
1097 vec![NetEvent::EpochStarted(get_utc_now(), *epoch_id)],
1098 )
1099 } else {
1100 NetActionResponse::Error(NetActionError::EpochNotFound {
1101 epoch_id: *epoch_id,
1102 })
1103 }
1104 }
1105
1106 fn set_request_token(&mut self, node_name: &NodeName, value: bool) {
1109 if let Some(node) = self.graph.nodes().get(node_name)
1110 && let Some(config) = &node.dependency_request_config
1111 && config
1112 .triggers
1113 .contains(&DependencyRequestTrigger::OnNoSalvoTriggered)
1114 {
1115 self._request_tokens.insert(node_name.clone(), value);
1116 }
1117 }
1118
1119 fn finish_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
1120 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
1122 epoch.clone()
1123 } else {
1124 return NetActionResponse::Error(NetActionError::EpochNotFound {
1125 epoch_id: *epoch_id,
1126 });
1127 };
1128
1129 if epoch.state != EpochState::Running {
1131 return NetActionResponse::Error(NetActionError::EpochNotRunning {
1132 epoch_id: *epoch_id,
1133 });
1134 }
1135
1136 let epoch_loc = PacketLocation::Node(*epoch_id);
1138 if let Some(packets) = self._packets_by_location.get(&epoch_loc) {
1139 if !packets.is_empty() {
1140 return NetActionResponse::Error(NetActionError::CannotFinishNonEmptyEpoch {
1141 epoch_id: *epoch_id,
1142 });
1143 }
1144 } else {
1145 panic!("Epoch {} not found in location {:?}", epoch_id, epoch_loc);
1146 }
1147
1148 let node = self
1150 .graph
1151 .nodes()
1152 .get(&epoch.node_name)
1153 .expect("Epoch references non-existent node");
1154 for port_name in node.out_ports.keys() {
1155 let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1156 if let Some(packets) = self._packets_by_location.get(&output_port_loc)
1157 && !packets.is_empty()
1158 {
1159 return NetActionResponse::Error(NetActionError::UnsentOutputSalvo {
1160 epoch_id: *epoch_id,
1161 port_name: port_name.clone(),
1162 });
1163 }
1164 }
1165
1166 let out_port_names: Vec<String> = node.out_ports.keys().cloned().collect();
1169
1170 let epoch_before_finish = self._epochs[epoch_id].clone();
1172
1173 let epoch_node_name = self._epochs[epoch_id].node_name.clone();
1175 self.set_request_token(&epoch_node_name, true);
1176
1177 let mut epoch = self._epochs.remove(epoch_id).unwrap();
1178 epoch.state = EpochState::Finished;
1179
1180 self._packets_by_location.remove(&epoch_loc);
1182 for port_name in &out_port_names {
1183 let output_port_loc = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1184 self._packets_by_location.remove(&output_port_loc);
1185 }
1186
1187 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
1189 epoch_ids.retain(|id| id != epoch_id);
1190 }
1191
1192 NetActionResponse::Success(
1193 NetActionResponseData::FinishedEpoch(epoch),
1194 vec![NetEvent::EpochFinished(get_utc_now(), epoch_before_finish)],
1195 )
1196 }
1197
1198 fn cancel_epoch(&mut self, epoch_id: &EpochID) -> NetActionResponse {
1199 let epoch_for_event = if let Some(epoch) = self._epochs.get(epoch_id) {
1201 epoch.clone()
1202 } else {
1203 return NetActionResponse::Error(NetActionError::EpochNotFound {
1204 epoch_id: *epoch_id,
1205 });
1206 };
1207
1208 self.set_request_token(&epoch_for_event.node_name, true);
1210
1211 let mut events: Vec<NetEvent> = Vec::new();
1212 let mut destroyed_packets: Vec<PacketID> = Vec::new();
1213
1214 let epoch_location = PacketLocation::Node(*epoch_id);
1216 if let Some(packet_ids) = self._packets_by_location.get(&epoch_location) {
1217 destroyed_packets.extend(packet_ids.iter().cloned());
1218 }
1219
1220 let node = self
1222 .graph
1223 .nodes()
1224 .get(&epoch_for_event.node_name)
1225 .expect("Epoch references non-existent node");
1226 for port_name in node.out_ports.keys() {
1227 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1228 if let Some(packet_ids) = self._packets_by_location.get(&output_port_location) {
1229 destroyed_packets.extend(packet_ids.iter().cloned());
1230 }
1231 }
1232
1233 for packet_id in &destroyed_packets {
1235 let packet = self
1236 ._packets
1237 .remove(packet_id)
1238 .expect("Packet in location map not found in packets map");
1239 let packet_location = packet.location.clone();
1240 if let Some(packets_at_location) = self._packets_by_location.get_mut(&packet_location) {
1241 packets_at_location.shift_remove(packet_id);
1242 }
1243 events.push(NetEvent::PacketDestroyed(
1244 get_utc_now(),
1245 *packet_id,
1246 packet_location,
1247 ));
1248 }
1249
1250 for port_name in node.out_ports.keys() {
1252 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1253 self._packets_by_location.remove(&output_port_location);
1254 }
1255
1256 self._packets_by_location.remove(&epoch_location);
1258
1259 self._startable_epochs.remove(epoch_id);
1261
1262 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch_for_event.node_name) {
1264 epoch_ids.retain(|id| id != epoch_id);
1265 }
1266
1267 let epoch = self._epochs.remove(epoch_id).expect("Epoch should exist");
1269
1270 events.push(NetEvent::EpochCancelled(get_utc_now(), epoch_for_event));
1271
1272 NetActionResponse::Success(
1273 NetActionResponseData::CancelledEpoch(epoch, destroyed_packets),
1274 events,
1275 )
1276 }
1277
1278 fn create_epoch(&mut self, node_name: &NodeName, salvo: &Salvo) -> NetActionResponse {
1279 let node = match self.graph.nodes().get(node_name) {
1281 Some(node) => node,
1282 None => {
1283 return NetActionResponse::Error(NetActionError::NodeNotFound {
1284 node_name: node_name.clone(),
1285 });
1286 }
1287 };
1288
1289 for (port_name, packet_id) in &salvo.packets {
1291 if !node.in_ports.contains_key(port_name) {
1293 return NetActionResponse::Error(NetActionError::InputPortNotFound {
1294 port_name: port_name.clone(),
1295 node_name: node_name.clone(),
1296 });
1297 }
1298
1299 let packet = match self._packets.get(packet_id) {
1301 Some(packet) => packet,
1302 None => {
1303 return NetActionResponse::Error(NetActionError::PacketNotFound {
1304 packet_id: *packet_id,
1305 });
1306 }
1307 };
1308
1309 let expected_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1311 if packet.location != expected_location {
1312 return NetActionResponse::Error(NetActionError::PacketNotAtInputPort {
1313 packet_id: *packet_id,
1314 port_name: port_name.clone(),
1315 node_name: node_name.clone(),
1316 });
1317 }
1318 }
1319
1320 let mut events: Vec<NetEvent> = Vec::new();
1321
1322 let epoch_id = Ulid::new();
1324 let epoch = Epoch {
1325 id: epoch_id,
1326 node_name: node_name.clone(),
1327 in_salvo: salvo.clone(),
1328 out_salvos: Vec::new(),
1329 state: EpochState::Startable,
1330 orphaned_packets: Vec::new(),
1331 };
1332
1333 self._epochs.insert(epoch_id, epoch.clone());
1335 self._startable_epochs.insert(epoch_id);
1336 self._node_to_epochs
1337 .entry(node_name.clone())
1338 .or_default()
1339 .push(epoch_id);
1340
1341 let epoch_location = PacketLocation::Node(epoch_id);
1343 self._packets_by_location
1344 .insert(epoch_location.clone(), IndexSet::new());
1345
1346 for port_name in node.out_ports.keys() {
1348 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
1349 self._packets_by_location
1350 .insert(output_port_location, IndexSet::new());
1351 }
1352
1353 events.push(NetEvent::EpochCreated(get_utc_now(), epoch_id));
1354
1355 for (port_name, packet_id) in &salvo.packets {
1357 let from_location = PacketLocation::InputPort(node_name.clone(), port_name.clone());
1358
1359 let from_index = self
1361 ._packets_by_location
1362 .get(&from_location)
1363 .and_then(|packets| packets.get_index_of(packet_id))
1364 .expect("Packet should exist at from_location");
1365
1366 self.move_packet(packet_id, epoch_location.clone());
1367 events.push(NetEvent::PacketMoved(
1368 get_utc_now(),
1369 *packet_id,
1370 from_location,
1371 epoch_location.clone(),
1372 from_index,
1373 ));
1374 }
1375
1376 NetActionResponse::Success(NetActionResponseData::CreatedEpoch(epoch), events)
1377 }
1378
1379 fn load_packet_into_output_port(
1380 &mut self,
1381 packet_id: &PacketID,
1382 port_name: &str,
1383 ) -> NetActionResponse {
1384 let (epoch_id, old_location) = if let Some(packet) = self._packets.get(packet_id) {
1385 if let PacketLocation::Node(epoch_id) = packet.location {
1386 (epoch_id, packet.location.clone())
1387 } else {
1388 return NetActionResponse::Error(NetActionError::PacketNotInAnyNode {
1389 packet_id: *packet_id,
1390 });
1391 }
1392 } else {
1393 return NetActionResponse::Error(NetActionError::PacketNotFound {
1394 packet_id: *packet_id,
1395 });
1396 };
1397
1398 let node_name = self
1399 ._epochs
1400 .get(&epoch_id)
1401 .expect("The epoch id in the location of a packet could not be found.")
1402 .node_name
1403 .clone();
1404 let node = self
1405 .graph
1406 .nodes()
1407 .get(&node_name)
1408 .expect("Packet located in a non-existing node (yet the node has an epoch).");
1409
1410 if !node.out_ports.contains_key(port_name) {
1411 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1412 port_name: port_name.to_string(),
1413 epoch_id,
1414 });
1415 }
1416
1417 let port = node.out_ports.get(port_name).unwrap();
1418 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.to_string());
1419 let port_packets = self
1420 ._packets_by_location
1421 .get(&output_port_location)
1422 .expect("No entry in NetSim._packets_by_location found for output port.");
1423
1424 if let PortSlotSpec::Finite(num_slots) = port.slots_spec
1426 && port_packets.len() as u64 >= num_slots
1427 {
1428 return NetActionResponse::Error(NetActionError::OutputPortFull {
1429 port_name: port_name.to_string(),
1430 epoch_id,
1431 });
1432 }
1433
1434 let from_index = self
1436 ._packets_by_location
1437 .get(&old_location)
1438 .and_then(|packets| packets.get_index_of(packet_id))
1439 .expect("Packet should exist at old_location");
1440
1441 let new_location = output_port_location;
1442 self.move_packet(packet_id, new_location.clone());
1443 NetActionResponse::Success(
1444 NetActionResponseData::None,
1445 vec![NetEvent::PacketMoved(
1446 get_utc_now(),
1447 *packet_id,
1448 old_location,
1449 new_location,
1450 from_index,
1451 )],
1452 )
1453 }
1454
1455 fn send_output_salvo(
1456 &mut self,
1457 epoch_id: &EpochID,
1458 salvo_condition_name: &SalvoConditionName,
1459 ) -> NetActionResponse {
1460 let epoch = if let Some(epoch) = self._epochs.get(epoch_id) {
1462 epoch
1463 } else {
1464 return NetActionResponse::Error(NetActionError::EpochNotFound {
1465 epoch_id: *epoch_id,
1466 });
1467 };
1468
1469 let node = self
1471 .graph
1472 .nodes()
1473 .get(&epoch.node_name)
1474 .expect("Node associated with epoch could not be found.");
1475 let node_name = node.name.clone();
1476
1477 let salvo_condition =
1479 if let Some(salvo_condition) = node.out_salvo_conditions.get(salvo_condition_name) {
1480 salvo_condition
1481 } else {
1482 return NetActionResponse::Error(NetActionError::OutputSalvoConditionNotFound {
1483 condition_name: salvo_condition_name.clone(),
1484 epoch_id: *epoch_id,
1485 });
1486 };
1487
1488 if let MaxSalvos::Finite(max) = salvo_condition.max_salvos {
1490 let condition_salvo_count = epoch
1491 .out_salvos
1492 .iter()
1493 .filter(|s| s.salvo_condition == *salvo_condition_name)
1494 .count() as u64;
1495 if condition_salvo_count >= max {
1496 return NetActionResponse::Error(NetActionError::MaxOutputSalvosReached {
1497 condition_name: salvo_condition_name.clone(),
1498 epoch_id: *epoch_id,
1499 });
1500 }
1501 }
1502
1503 let port_packet_counts: HashMap<PortName, u64> = node
1505 .out_ports
1506 .keys()
1507 .map(|port_name| {
1508 let count = self
1509 ._packets_by_location
1510 .get(&PacketLocation::OutputPort(*epoch_id, port_name.clone()))
1511 .map(|packets| packets.len() as u64)
1512 .unwrap_or(0);
1513 (port_name.clone(), count)
1514 })
1515 .collect();
1516 if !evaluate_salvo_condition(&salvo_condition.term, &port_packet_counts, &node.out_ports) {
1517 return NetActionResponse::Error(NetActionError::SalvoConditionNotMet {
1518 condition_name: salvo_condition_name.clone(),
1519 epoch_id: *epoch_id,
1520 });
1521 }
1522
1523 let mut packets_to_move: Vec<(PacketID, PortName, PacketLocation, PacketLocation, bool)> =
1526 Vec::new();
1527 for (port_name, packet_count) in &salvo_condition.ports {
1528 let from_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
1529 let packets = self
1530 ._packets_by_location
1531 .get(&from_location)
1532 .unwrap_or_else(|| {
1533 panic!(
1534 "Output port '{}' of node '{}' does not have an entry in self._packets_by_location",
1535 port_name,
1536 node_name
1537 )
1538 })
1539 .clone();
1540
1541 let (to_location, is_orphaned) = if let Some(edge_ref) =
1543 self.graph.get_edge_by_tail(&PortRef {
1544 node_name: node_name.clone(),
1545 port_type: PortType::Output,
1546 port_name: port_name.clone(),
1547 }) {
1548 (PacketLocation::Edge(edge_ref.clone()), false)
1550 } else {
1551 (PacketLocation::OutsideNet, true)
1553 };
1554
1555 let take_count = match packet_count {
1556 PacketCount::All => packets.len(),
1557 PacketCount::Count(n) => std::cmp::min(*n as usize, packets.len()),
1558 };
1559 for packet_id in packets.into_iter().take(take_count) {
1560 packets_to_move.push((
1561 packet_id,
1562 port_name.clone(),
1563 from_location.clone(),
1564 to_location.clone(),
1565 is_orphaned,
1566 ));
1567 }
1568 }
1569
1570 let salvo = Salvo {
1572 salvo_condition: salvo_condition_name.clone(),
1573 packets: packets_to_move
1574 .iter()
1575 .map(|(packet_id, port_name, _, _, _)| (port_name.clone(), *packet_id))
1576 .collect(),
1577 };
1578 self._epochs
1579 .get_mut(epoch_id)
1580 .unwrap()
1581 .out_salvos
1582 .push(salvo);
1583
1584 let mut net_events = Vec::new();
1586 let mut orphaned_infos: Vec<OrphanedPacketInfo> = Vec::new();
1587
1588 for (packet_id, port_name, from_location, to_location, is_orphaned) in packets_to_move {
1589 if is_orphaned {
1590 net_events.push(NetEvent::PacketOrphaned(
1592 get_utc_now(),
1593 packet_id,
1594 *epoch_id,
1595 node_name.clone(),
1596 port_name.clone(),
1597 salvo_condition_name.clone(),
1598 ));
1599 orphaned_infos.push(OrphanedPacketInfo {
1600 packet_id,
1601 from_port: port_name,
1602 salvo_condition: salvo_condition_name.clone(),
1603 });
1604 } else {
1605 let from_index = self
1607 ._packets_by_location
1608 .get(&from_location)
1609 .and_then(|packets| packets.get_index_of(&packet_id))
1610 .expect("Packet should exist at from_location");
1611
1612 net_events.push(NetEvent::PacketMoved(
1614 get_utc_now(),
1615 packet_id,
1616 from_location,
1617 to_location.clone(),
1618 from_index,
1619 ));
1620 }
1621 self.move_packet(&packet_id, to_location);
1622 }
1623
1624 if !orphaned_infos.is_empty() {
1626 self._epochs
1627 .get_mut(epoch_id)
1628 .unwrap()
1629 .orphaned_packets
1630 .extend(orphaned_infos);
1631 }
1632
1633 net_events.push(NetEvent::OutputSalvoTriggered(
1635 get_utc_now(),
1636 *epoch_id,
1637 salvo_condition_name.clone(),
1638 ));
1639
1640 NetActionResponse::Success(NetActionResponseData::None, net_events)
1641 }
1642
1643 fn create_request(&mut self, node_name: &NodeName, label: &str) -> NetActionResponse {
1644 if !self.graph.nodes().contains_key(node_name) {
1646 return NetActionResponse::Error(NetActionError::RequestNodeNotFound {
1647 node_name: node_name.clone(),
1648 });
1649 }
1650
1651 self._pending_requests.push(PendingRequest {
1652 node_name: node_name.clone(),
1653 label: label.to_string(),
1654 });
1655
1656 NetActionResponse::Success(
1657 NetActionResponseData::None,
1658 vec![NetEvent::RequestCreated(
1659 get_utc_now(),
1660 node_name.clone(),
1661 label.to_string(),
1662 RequestCreatedSource::External,
1663 )],
1664 )
1665 }
1666
1667 fn transport_packet_to_location(
1668 &mut self,
1669 packet_id: &PacketID,
1670 destination: &PacketLocation,
1671 ) -> NetActionResponse {
1672 let packet = if let Some(p) = self._packets.get(packet_id) {
1674 p
1675 } else {
1676 return NetActionResponse::Error(NetActionError::PacketNotFound {
1677 packet_id: *packet_id,
1678 });
1679 };
1680 let current_location = packet.location.clone();
1681
1682 match ¤t_location {
1684 PacketLocation::Node(epoch_id) => {
1685 if let Some(epoch) = self._epochs.get(epoch_id)
1686 && epoch.state == EpochState::Running
1687 {
1688 return NetActionResponse::Error(
1689 NetActionError::CannotMovePacketFromRunningEpoch {
1690 packet_id: *packet_id,
1691 epoch_id: *epoch_id,
1692 },
1693 );
1694 }
1695 }
1696 PacketLocation::OutputPort(epoch_id, _) => {
1697 if let Some(epoch) = self._epochs.get(epoch_id)
1698 && epoch.state == EpochState::Running
1699 {
1700 return NetActionResponse::Error(
1701 NetActionError::CannotMovePacketFromRunningEpoch {
1702 packet_id: *packet_id,
1703 epoch_id: *epoch_id,
1704 },
1705 );
1706 }
1707 }
1708 _ => {}
1709 }
1710
1711 match destination {
1713 PacketLocation::Node(epoch_id) => {
1714 if let Some(epoch) = self._epochs.get(epoch_id) {
1715 if epoch.state == EpochState::Running {
1716 return NetActionResponse::Error(
1717 NetActionError::CannotMovePacketIntoRunningEpoch {
1718 packet_id: *packet_id,
1719 epoch_id: *epoch_id,
1720 },
1721 );
1722 }
1723 } else {
1724 return NetActionResponse::Error(NetActionError::EpochNotFound {
1725 epoch_id: *epoch_id,
1726 });
1727 }
1728 }
1729 PacketLocation::OutputPort(epoch_id, port_name) => {
1730 if let Some(epoch) = self._epochs.get(epoch_id) {
1731 if epoch.state == EpochState::Running {
1732 return NetActionResponse::Error(
1733 NetActionError::CannotMovePacketIntoRunningEpoch {
1734 packet_id: *packet_id,
1735 epoch_id: *epoch_id,
1736 },
1737 );
1738 }
1739 let node = self
1741 .graph
1742 .nodes()
1743 .get(&epoch.node_name)
1744 .expect("Node associated with epoch could not be found.");
1745 if !node.out_ports.contains_key(port_name) {
1746 return NetActionResponse::Error(NetActionError::OutputPortNotFound {
1747 port_name: port_name.clone(),
1748 epoch_id: *epoch_id,
1749 });
1750 }
1751 } else {
1752 return NetActionResponse::Error(NetActionError::EpochNotFound {
1753 epoch_id: *epoch_id,
1754 });
1755 }
1756 }
1757 PacketLocation::InputPort(node_name, port_name) => {
1758 let node = if let Some(n) = self.graph.nodes().get(node_name) {
1760 n
1761 } else {
1762 return NetActionResponse::Error(NetActionError::NodeNotFound {
1763 node_name: node_name.clone(),
1764 });
1765 };
1766 let port = if let Some(p) = node.in_ports.get(port_name) {
1768 p
1769 } else {
1770 return NetActionResponse::Error(NetActionError::InputPortNotFound {
1771 port_name: port_name.clone(),
1772 node_name: node_name.clone(),
1773 });
1774 };
1775 let current_count = self
1777 ._packets_by_location
1778 .get(destination)
1779 .map(|s| s.len())
1780 .unwrap_or(0);
1781 let is_full = match &port.slots_spec {
1782 PortSlotSpec::Infinite => false,
1783 PortSlotSpec::Finite(capacity) => current_count >= *capacity as usize,
1784 };
1785 if is_full {
1786 return NetActionResponse::Error(NetActionError::InputPortFull {
1787 port_name: port_name.clone(),
1788 node_name: node_name.clone(),
1789 });
1790 }
1791 }
1792 PacketLocation::Edge(edge) => {
1793 if !self.graph.edges().contains(edge) {
1795 return NetActionResponse::Error(NetActionError::EdgeNotFound {
1796 edge: edge.clone(),
1797 });
1798 }
1799 }
1800 PacketLocation::OutsideNet => {
1801 }
1803 }
1804
1805 let from_index = self
1807 ._packets_by_location
1808 .get(¤t_location)
1809 .and_then(|packets| packets.get_index_of(packet_id))
1810 .expect("Packet should exist at current_location");
1811
1812 self.move_packet(packet_id, destination.clone());
1814
1815 NetActionResponse::Success(
1816 NetActionResponseData::None,
1817 vec![NetEvent::PacketMoved(
1818 get_utc_now(),
1819 *packet_id,
1820 current_location,
1821 destination.clone(),
1822 from_index,
1823 )],
1824 )
1825 }
1826
1827 pub fn do_action(&mut self, action: &NetAction) -> NetActionResponse {
1861 match action {
1862 NetAction::RunStep => self.run_step(),
1863 NetAction::CreatePacket(maybe_epoch_id) => self.create_packet(maybe_epoch_id),
1864 NetAction::ConsumePacket(packet_id) => self.consume_packet(packet_id),
1865 NetAction::DestroyPacket(packet_id) => self.destroy_packet(packet_id),
1866 NetAction::StartEpoch(epoch_id) => self.start_epoch(epoch_id),
1867 NetAction::FinishEpoch(epoch_id) => self.finish_epoch(epoch_id),
1868 NetAction::CancelEpoch(epoch_id) => self.cancel_epoch(epoch_id),
1869 NetAction::CreateEpoch(node_name, salvo) => self.create_epoch(node_name, salvo),
1870 NetAction::LoadPacketIntoOutputPort(packet_id, port_name) => {
1871 self.load_packet_into_output_port(packet_id, port_name)
1872 }
1873 NetAction::SendOutputSalvo(epoch_id, salvo_condition_name) => {
1874 self.send_output_salvo(epoch_id, salvo_condition_name)
1875 }
1876 NetAction::TransportPacketToLocation(packet_id, location) => {
1877 self.transport_packet_to_location(packet_id, location)
1878 }
1879 NetAction::CreateRequest(node_name, label) => self.create_request(node_name, label),
1880 }
1881 }
1882
1883 pub fn packet_count_at(&self, location: &PacketLocation) -> usize {
1887 self._packets_by_location
1888 .get(location)
1889 .map(|s| s.len())
1890 .unwrap_or(0)
1891 }
1892
1893 pub fn get_packets_at_location(&self, location: &PacketLocation) -> Vec<PacketID> {
1895 self._packets_by_location
1896 .get(location)
1897 .map(|s| s.iter().cloned().collect())
1898 .unwrap_or_default()
1899 }
1900
1901 pub fn get_epoch(&self, epoch_id: &EpochID) -> Option<&Epoch> {
1903 self._epochs.get(epoch_id)
1904 }
1905
1906 pub fn get_startable_epochs(&self) -> Vec<EpochID> {
1908 self._startable_epochs.iter().cloned().collect()
1909 }
1910
1911 pub fn get_packet(&self, packet_id: &PacketID) -> Option<&Packet> {
1913 self._packets.get(packet_id)
1914 }
1915
1916 pub fn run_until_blocked(&mut self) -> Vec<NetEvent> {
1926 let mut all_events = Vec::new();
1927 while !self.is_blocked() {
1928 if let NetActionResponse::Success(_, events) = self.do_action(&NetAction::RunStep) {
1929 all_events.extend(events);
1930 }
1931 }
1932 all_events
1933 }
1934
1935 pub fn is_blocked(&self) -> bool {
1941 for (location, packets) in &self._packets_by_location {
1943 if let PacketLocation::Edge(edge_ref) = location {
1944 if packets.is_empty() {
1945 continue;
1946 }
1947
1948 let target_node_name = &edge_ref.target.node_name;
1949 let target_port_name = &edge_ref.target.port_name;
1950
1951 let node = match self.graph.nodes().get(target_node_name) {
1952 Some(n) => n,
1953 None => continue,
1954 };
1955 let port = match node.in_ports.get(target_port_name) {
1956 Some(p) => p,
1957 None => continue,
1958 };
1959
1960 let input_port_location =
1961 PacketLocation::InputPort(target_node_name.clone(), target_port_name.clone());
1962 let current_count = self
1963 ._packets_by_location
1964 .get(&input_port_location)
1965 .map(|p| p.len() as u64)
1966 .unwrap_or(0);
1967
1968 let can_move = match port.slots_spec {
1969 PortSlotSpec::Infinite => true,
1970 PortSlotSpec::Finite(max_slots) => current_count < max_slots,
1971 };
1972
1973 if can_move {
1974 return false; }
1976 }
1977 }
1978
1979 for (location, packets) in &self._packets_by_location {
1981 if let PacketLocation::InputPort(node_name, _) = location {
1982 if packets.is_empty() {
1983 continue;
1984 }
1985
1986 if self.can_trigger_input_salvo(node_name) {
1988 return false; }
1990 }
1991 }
1992
1993 if !self._pending_requests.is_empty() {
1995 return false;
1996 }
1997 if !self._startup_requests_sent {
1998 let has_startup = self.graph.nodes().values().any(|node| {
2000 node.dependency_request_config
2001 .as_ref()
2002 .is_some_and(|c| c.triggers.contains(&DependencyRequestTrigger::OnStartup))
2003 });
2004 if has_startup {
2005 return false;
2006 }
2007 }
2008
2009 true }
2011
2012 fn can_trigger_input_salvo(&self, node_name: &NodeName) -> bool {
2014 let node = match self.graph.nodes().get(node_name) {
2015 Some(n) => n,
2016 None => return false,
2017 };
2018
2019 let in_port_names: Vec<PortName> = node.in_ports.keys().cloned().collect();
2020
2021 let port_packet_counts: HashMap<PortName, u64> = in_port_names
2023 .iter()
2024 .map(|port_name| {
2025 let count = self
2026 ._packets_by_location
2027 .get(&PacketLocation::InputPort(
2028 node_name.clone(),
2029 port_name.clone(),
2030 ))
2031 .map(|packets| packets.len() as u64)
2032 .unwrap_or(0);
2033 (port_name.clone(), count)
2034 })
2035 .collect();
2036
2037 for cond in node.in_salvo_conditions.values() {
2039 if evaluate_salvo_condition(&cond.term, &port_packet_counts, &node.in_ports) {
2040 return true;
2041 }
2042 }
2043
2044 false
2045 }
2046
2047 pub fn undo_action(
2068 &mut self,
2069 action: &NetAction,
2070 events: &[NetEvent],
2071 ) -> Result<(), UndoError> {
2072 for event in events.iter().rev() {
2074 self.undo_event(action, event)?;
2075 }
2076 Ok(())
2077 }
2078
2079 fn undo_event(&mut self, action: &NetAction, event: &NetEvent) -> Result<(), UndoError> {
2081 match event {
2082 NetEvent::PacketCreated(_, packet_id) => self.undo_packet_created(packet_id),
2083 NetEvent::PacketConsumed(_, packet_id, location) => {
2084 self.undo_packet_consumed(packet_id, location)
2085 }
2086 NetEvent::PacketDestroyed(_, packet_id, location) => {
2087 self.undo_packet_destroyed(packet_id, location)
2088 }
2089 NetEvent::EpochCreated(_, epoch_id) => self.undo_epoch_created(epoch_id),
2090 NetEvent::EpochStarted(_, epoch_id) => self.undo_epoch_started(epoch_id),
2091 NetEvent::EpochFinished(_, epoch) => self.undo_epoch_finished(epoch),
2092 NetEvent::EpochCancelled(_, epoch) => self.undo_epoch_cancelled(epoch),
2093 NetEvent::PacketMoved(_, packet_id, from, to, from_index) => {
2094 self.undo_packet_moved(packet_id, from, to, *from_index)
2095 }
2096 NetEvent::InputSalvoTriggered(_, _, _) => {
2097 Ok(())
2099 }
2100 NetEvent::OutputSalvoTriggered(_, epoch_id, _) => {
2101 self.undo_output_salvo_triggered(epoch_id, action)
2103 }
2104 NetEvent::PacketOrphaned(_, packet_id, epoch_id, _, port_name, _) => {
2105 self.undo_packet_orphaned(packet_id, epoch_id, port_name)
2107 }
2108 NetEvent::RequestCreated(_, node_name, label, source) => {
2109 match source {
2110 RequestCreatedSource::External => {
2111 if let Some(pos) = self
2113 ._pending_requests
2114 .iter()
2115 .rposition(|r| r.node_name == *node_name && r.label == *label)
2116 {
2117 self._pending_requests.remove(pos);
2118 }
2119 Ok(())
2120 }
2121 RequestCreatedSource::OnStartup => {
2122 self._startup_requests_sent = false;
2123 Ok(())
2124 }
2125 RequestCreatedSource::OnNoSalvoTriggered => {
2126 self._request_tokens.insert(node_name.clone(), true);
2128 Ok(())
2129 }
2130 }
2131 }
2132 NetEvent::RequestCascadeResolved(_, _, _)
2133 | NetEvent::RequestEpochCreated(_, _, _, _) => {
2134 Ok(())
2137 }
2138 }
2139 }
2140
2141 fn undo_packet_created(&mut self, packet_id: &PacketID) -> Result<(), UndoError> {
2143 let location = match self._packets.get(packet_id) {
2145 Some(p) => p.location.clone(),
2146 None => {
2147 return Err(UndoError::NotFound(format!(
2148 "packet {} not found",
2149 packet_id
2150 )));
2151 }
2152 };
2153
2154 if let Some(packets) = self._packets_by_location.get_mut(&location) {
2156 packets.shift_remove(packet_id);
2157 }
2158
2159 self._packets.remove(packet_id);
2161
2162 Ok(())
2163 }
2164
2165 fn undo_packet_consumed(
2167 &mut self,
2168 packet_id: &PacketID,
2169 location: &PacketLocation,
2170 ) -> Result<(), UndoError> {
2171 self.recreate_packet(packet_id, location)
2172 }
2173
2174 fn undo_packet_destroyed(
2176 &mut self,
2177 packet_id: &PacketID,
2178 location: &PacketLocation,
2179 ) -> Result<(), UndoError> {
2180 self.recreate_packet(packet_id, location)
2181 }
2182
2183 fn recreate_packet(
2185 &mut self,
2186 packet_id: &PacketID,
2187 location: &PacketLocation,
2188 ) -> Result<(), UndoError> {
2189 if self._packets.contains_key(packet_id) {
2191 return Err(UndoError::StateMismatch(format!(
2192 "packet {} already exists",
2193 packet_id
2194 )));
2195 }
2196
2197 let packet = Packet {
2199 id: *packet_id,
2200 location: location.clone(),
2201 };
2202 self._packets.insert(*packet_id, packet);
2203
2204 self._packets_by_location
2206 .entry(location.clone())
2207 .or_default()
2208 .insert(*packet_id);
2209
2210 Ok(())
2211 }
2212
2213 fn undo_epoch_created(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
2215 let epoch = match self._epochs.get(epoch_id) {
2217 Some(e) => e.clone(),
2218 None => {
2219 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2220 }
2221 };
2222
2223 self._epochs.remove(epoch_id);
2225
2226 self._startable_epochs.remove(epoch_id);
2228
2229 if let Some(epoch_ids) = self._node_to_epochs.get_mut(&epoch.node_name) {
2231 epoch_ids.retain(|id| id != epoch_id);
2232 if epoch_ids.is_empty() {
2234 self._node_to_epochs.remove(&epoch.node_name);
2235 }
2236 }
2237
2238 let epoch_location = PacketLocation::Node(*epoch_id);
2240 self._packets_by_location.remove(&epoch_location);
2241
2242 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2244 for port_name in node.out_ports.keys() {
2245 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
2246 self._packets_by_location.remove(&output_port_location);
2247 }
2248 }
2249
2250 Ok(())
2251 }
2252
2253 fn undo_epoch_started(&mut self, epoch_id: &EpochID) -> Result<(), UndoError> {
2255 let epoch = match self._epochs.get_mut(epoch_id) {
2256 Some(e) => e,
2257 None => {
2258 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2259 }
2260 };
2261
2262 if epoch.state != EpochState::Running {
2264 return Err(UndoError::StateMismatch(format!(
2265 "epoch {} is not in Running state, cannot undo start",
2266 epoch_id
2267 )));
2268 }
2269
2270 epoch.state = EpochState::Startable;
2272
2273 self._startable_epochs.insert(*epoch_id);
2275
2276 Ok(())
2277 }
2278
2279 fn undo_epoch_finished(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
2281 let epoch_id = epoch.id;
2282
2283 if self._epochs.contains_key(&epoch_id) {
2285 return Err(UndoError::StateMismatch(format!(
2286 "epoch {} already exists",
2287 epoch_id
2288 )));
2289 }
2290
2291 self._epochs.insert(epoch_id, epoch.clone());
2294
2295 let epoch_location = PacketLocation::Node(epoch_id);
2297 self._packets_by_location
2298 .insert(epoch_location, IndexSet::new());
2299
2300 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2302 for port_name in node.out_ports.keys() {
2303 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
2304 self._packets_by_location
2305 .insert(output_port_location, IndexSet::new());
2306 }
2307 }
2308
2309 self._node_to_epochs
2311 .entry(epoch.node_name.clone())
2312 .or_default()
2313 .push(epoch_id);
2314
2315 self.set_request_token(&epoch.node_name, false);
2317
2318 Ok(())
2319 }
2320
2321 fn undo_epoch_cancelled(&mut self, epoch: &Epoch) -> Result<(), UndoError> {
2324 let epoch_id = epoch.id;
2325
2326 if self._epochs.contains_key(&epoch_id) {
2328 return Err(UndoError::StateMismatch(format!(
2329 "epoch {} already exists",
2330 epoch_id
2331 )));
2332 }
2333
2334 self._epochs.insert(epoch_id, epoch.clone());
2336
2337 let epoch_location = PacketLocation::Node(epoch_id);
2339 self._packets_by_location
2340 .insert(epoch_location, IndexSet::new());
2341
2342 if let Some(node) = self.graph.nodes().get(&epoch.node_name) {
2344 for port_name in node.out_ports.keys() {
2345 let output_port_location = PacketLocation::OutputPort(epoch_id, port_name.clone());
2346 self._packets_by_location
2347 .insert(output_port_location, IndexSet::new());
2348 }
2349 }
2350
2351 self._node_to_epochs
2353 .entry(epoch.node_name.clone())
2354 .or_default()
2355 .push(epoch_id);
2356
2357 if epoch.state == EpochState::Startable {
2359 self._startable_epochs.insert(epoch_id);
2360 }
2361
2362 self.set_request_token(&epoch.node_name, false);
2364
2365 Ok(())
2366 }
2367
2368 fn undo_packet_moved(
2370 &mut self,
2371 packet_id: &PacketID,
2372 from: &PacketLocation,
2373 to: &PacketLocation,
2374 from_index: usize,
2375 ) -> Result<(), UndoError> {
2376 let packet = match self._packets.get(packet_id) {
2378 Some(p) => p,
2379 None => {
2380 return Err(UndoError::NotFound(format!(
2381 "packet {} not found",
2382 packet_id
2383 )));
2384 }
2385 };
2386
2387 if packet.location != *to {
2388 return Err(UndoError::StateMismatch(format!(
2389 "packet {} is not at expected location {:?}, found at {:?}",
2390 packet_id, to, packet.location
2391 )));
2392 }
2393
2394 if let Some(packets) = self._packets_by_location.get_mut(to) {
2396 packets.shift_remove(packet_id);
2397 }
2398
2399 let packets_at_from = self._packets_by_location.entry(from.clone()).or_default();
2401 packets_at_from.shift_insert(from_index, *packet_id);
2402
2403 self._packets.get_mut(packet_id).unwrap().location = from.clone();
2405
2406 Ok(())
2407 }
2408
2409 fn undo_output_salvo_triggered(
2411 &mut self,
2412 epoch_id: &EpochID,
2413 action: &NetAction,
2414 ) -> Result<(), UndoError> {
2415 if !matches!(action, NetAction::SendOutputSalvo(_, _)) {
2418 return Ok(());
2419 }
2420
2421 let epoch = match self._epochs.get_mut(epoch_id) {
2422 Some(e) => e,
2423 None => {
2424 return Err(UndoError::NotFound(format!("epoch {} not found", epoch_id)));
2425 }
2426 };
2427
2428 if epoch.out_salvos.pop().is_none() {
2430 return Err(UndoError::StateMismatch(format!(
2431 "epoch {} has no out_salvos to pop",
2432 epoch_id
2433 )));
2434 }
2435
2436 Ok(())
2439 }
2440
2441 fn undo_packet_orphaned(
2443 &mut self,
2444 packet_id: &PacketID,
2445 epoch_id: &EpochID,
2446 port_name: &PortName,
2447 ) -> Result<(), UndoError> {
2448 let packet = match self._packets.get(packet_id) {
2450 Some(p) => p,
2451 None => {
2452 return Err(UndoError::NotFound(format!(
2453 "packet {} not found",
2454 packet_id
2455 )));
2456 }
2457 };
2458
2459 if packet.location != PacketLocation::OutsideNet {
2460 return Err(UndoError::StateMismatch(format!(
2461 "packet {} is not at OutsideNet, found at {:?}",
2462 packet_id, packet.location
2463 )));
2464 }
2465
2466 if let Some(packets) = self
2468 ._packets_by_location
2469 .get_mut(&PacketLocation::OutsideNet)
2470 {
2471 packets.shift_remove(packet_id);
2472 }
2473
2474 let output_port_location = PacketLocation::OutputPort(*epoch_id, port_name.clone());
2476 self._packets_by_location
2477 .entry(output_port_location.clone())
2478 .or_default()
2479 .insert(*packet_id);
2480
2481 self._packets.get_mut(packet_id).unwrap().location = output_port_location;
2483
2484 if let Some(epoch) = self._epochs.get_mut(epoch_id) {
2486 epoch
2487 .orphaned_packets
2488 .retain(|info| info.packet_id != *packet_id);
2489 }
2490
2491 Ok(())
2492 }
2493
2494 #[cfg(test)]
2497 pub fn startable_epoch_ids(&self) -> Vec<EpochID> {
2498 self.get_startable_epochs()
2499 }
2500}
2501
2502#[cfg(test)]
2503#[path = "net_tests.rs"]
2504mod tests;