1use indexmap::IndexMap;
16use netrun_sim::graph::{
17 Edge, Graph, MaxSalvos, Node, PacketCount, Port, PortRef, PortSlotSpec, PortState, PortType,
18 SalvoCondition, SalvoConditionTerm,
19};
20use netrun_sim::net::{
21 NetAction, NetActionResponse, NetActionResponseData, NetSim, PacketLocation,
22};
23use std::collections::HashMap;
24
25fn main() {
26 let graph = create_diamond_graph();
28 println!("Created diamond graph: A -> B,C -> D");
29 println!("D requires inputs from BOTH B and C\n");
30
31 let mut net = NetSim::new(graph);
32
33 let packet1 = create_packet(&mut net);
35 let packet2 = create_packet(&mut net);
36 println!("Created packets: {} and {}", packet1, packet2);
37
38 let edge_a_b = edge_location("A", "out1", "B", "in");
40 net.do_action(&NetAction::TransportPacketToLocation(
41 packet1.clone(),
42 edge_a_b,
43 ));
44 println!("Placed packet1 on edge A -> B");
45
46 let edge_a_c = edge_location("A", "out2", "C", "in");
48 net.do_action(&NetAction::TransportPacketToLocation(
49 packet2.clone(),
50 edge_a_c,
51 ));
52 println!("Placed packet2 on edge A -> C");
53
54 net.run_until_blocked();
56
57 let startable = net.get_startable_epochs();
58 println!(
59 "\nAfter first run: {} startable epochs (B and C)",
60 startable.len()
61 );
62
63 for epoch_id in startable {
65 let epoch = net.get_epoch(&epoch_id).unwrap();
66 let node_name = epoch.node_name.clone();
67 println!("\nProcessing node {}", node_name);
68
69 let started = match net.do_action(&NetAction::StartEpoch(epoch_id.clone())) {
71 NetActionResponse::Success(NetActionResponseData::StartedEpoch(e), _) => e,
72 _ => panic!("Failed to start epoch"),
73 };
74
75 let input_packet = started.in_salvo.packets[0].1.clone();
77 net.do_action(&NetAction::ConsumePacket(input_packet));
78
79 let output = create_packet_in_epoch(&mut net, &started.id);
81
82 net.do_action(&NetAction::LoadPacketIntoOutputPort(
84 output,
85 "out".to_string(),
86 ));
87 net.do_action(&NetAction::SendOutputSalvo(
88 started.id.clone(),
89 "default".to_string(),
90 ));
91
92 net.do_action(&NetAction::FinishEpoch(started.id));
94 println!(" Finished {} - sent packet to D", node_name);
95 }
96
97 net.run_until_blocked();
99
100 let d_in1 = PacketLocation::InputPort("D".to_string(), "in1".to_string());
102 let d_in2 = PacketLocation::InputPort("D".to_string(), "in2".to_string());
103 println!("\nD's input ports:");
104 println!(" in1 (from B): {} packets", net.packet_count_at(&d_in1));
105 println!(" in2 (from C): {} packets", net.packet_count_at(&d_in2));
106
107 let startable_d = net.get_startable_epochs();
109 println!("\nStartable epochs at D: {}", startable_d.len());
110
111 if let Some(d_epoch_id) = startable_d.first() {
112 let d_epoch = net.get_epoch(d_epoch_id).unwrap();
113 println!(
114 "D's epoch received {} packets from both branches!",
115 d_epoch.in_salvo.packets.len()
116 );
117 }
118
119 println!("\nDiamond flow example complete!");
120}
121
122fn create_diamond_graph() -> Graph {
123 let node_a = Node {
125 name: "A".to_string(),
126 in_ports: HashMap::new(),
127 out_ports: [
128 (
129 "out1".to_string(),
130 Port {
131 slots_spec: PortSlotSpec::Infinite,
132 },
133 ),
134 (
135 "out2".to_string(),
136 Port {
137 slots_spec: PortSlotSpec::Infinite,
138 },
139 ),
140 ]
141 .into(),
142 in_salvo_conditions: IndexMap::new(),
143 out_salvo_conditions: IndexMap::new(),
144 dependency_request_config: None,
145 };
146
147 let node_b = create_simple_node("B");
149
150 let node_c = create_simple_node("C");
152
153 let node_d = Node {
155 name: "D".to_string(),
156 in_ports: [
157 (
158 "in1".to_string(),
159 Port {
160 slots_spec: PortSlotSpec::Infinite,
161 },
162 ),
163 (
164 "in2".to_string(),
165 Port {
166 slots_spec: PortSlotSpec::Infinite,
167 },
168 ),
169 ]
170 .into(),
171 out_ports: HashMap::new(),
172 in_salvo_conditions: IndexMap::from([(
173 "default".to_string(),
174 SalvoCondition {
175 max_salvos: MaxSalvos::Finite(1),
176 ports: [
177 ("in1".to_string(), PacketCount::All),
178 ("in2".to_string(), PacketCount::All),
179 ]
180 .into_iter()
181 .collect(),
182 term: SalvoConditionTerm::And(vec![
184 SalvoConditionTerm::Port {
185 port_name: "in1".to_string(),
186 state: PortState::NonEmpty,
187 },
188 SalvoConditionTerm::Port {
189 port_name: "in2".to_string(),
190 state: PortState::NonEmpty,
191 },
192 ]),
193 },
194 )]),
195 out_salvo_conditions: IndexMap::new(),
196 dependency_request_config: None,
197 };
198
199 let edges = vec![
200 create_edge("A", "out1", "B", "in"),
201 create_edge("A", "out2", "C", "in"),
202 create_edge("B", "out", "D", "in1"),
203 create_edge("C", "out", "D", "in2"),
204 ];
205
206 let graph = Graph::new(vec![node_a, node_b, node_c, node_d], edges);
207 assert!(graph.validate().is_empty(), "Graph validation failed");
208 graph
209}
210
211fn create_simple_node(name: &str) -> Node {
212 Node {
213 name: name.to_string(),
214 in_ports: [(
215 "in".to_string(),
216 Port {
217 slots_spec: PortSlotSpec::Infinite,
218 },
219 )]
220 .into(),
221 out_ports: [(
222 "out".to_string(),
223 Port {
224 slots_spec: PortSlotSpec::Infinite,
225 },
226 )]
227 .into(),
228 in_salvo_conditions: IndexMap::from([(
229 "default".to_string(),
230 SalvoCondition {
231 max_salvos: MaxSalvos::Finite(1),
232 ports: [("in".to_string(), PacketCount::All)].into_iter().collect(),
233 term: SalvoConditionTerm::Port {
234 port_name: "in".to_string(),
235 state: PortState::NonEmpty,
236 },
237 },
238 )]),
239 out_salvo_conditions: IndexMap::from([(
240 "default".to_string(),
241 SalvoCondition {
242 max_salvos: MaxSalvos::Infinite,
243 ports: [("out".to_string(), PacketCount::All)]
244 .into_iter()
245 .collect(),
246 term: SalvoConditionTerm::Port {
247 port_name: "out".to_string(),
248 state: PortState::NonEmpty,
249 },
250 },
251 )]),
252 dependency_request_config: None,
253 }
254}
255
256fn create_edge(src_node: &str, src_port: &str, tgt_node: &str, tgt_port: &str) -> Edge {
257 Edge {
258 source: PortRef {
259 node_name: src_node.to_string(),
260 port_type: PortType::Output,
261 port_name: src_port.to_string(),
262 },
263 target: PortRef {
264 node_name: tgt_node.to_string(),
265 port_type: PortType::Input,
266 port_name: tgt_port.to_string(),
267 },
268 }
269}
270
271fn edge_location(src_node: &str, src_port: &str, tgt_node: &str, tgt_port: &str) -> PacketLocation {
272 PacketLocation::Edge(Edge {
273 source: PortRef {
274 node_name: src_node.to_string(),
275 port_type: PortType::Output,
276 port_name: src_port.to_string(),
277 },
278 target: PortRef {
279 node_name: tgt_node.to_string(),
280 port_type: PortType::Input,
281 port_name: tgt_port.to_string(),
282 },
283 })
284}
285
286fn create_packet(net: &mut NetSim) -> ulid::Ulid {
287 match net.do_action(&NetAction::CreatePacket(None)) {
288 NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id,
289 _ => panic!("Failed to create packet"),
290 }
291}
292
293fn create_packet_in_epoch(net: &mut NetSim, epoch_id: &ulid::Ulid) -> ulid::Ulid {
294 match net.do_action(&NetAction::CreatePacket(Some(epoch_id.clone()))) {
295 NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id,
296 _ => panic!("Failed to create packet in epoch"),
297 }
298}