Skip to main content

diamond_flow/
diamond_flow.rs

1//! Example: Diamond flow pattern with branching and merging
2//!
3//! Graph structure:
4//!        A
5//!       / \
6//!      B   C
7//!       \ /
8//!        D
9//!
10//! This example demonstrates:
11//! - Branching: A sends packets to both B and C
12//! - Merging: D waits for packets from both B and C
13//! - Synchronization: D's epoch only triggers when both inputs are present
14
15use netrun_sim::graph::{
16    Edge, Graph, MaxSalvos, Node, PacketCount, Port, PortRef, PortSlotSpec, PortState, PortType,
17    SalvoCondition, SalvoConditionTerm,
18};
19use netrun_sim::net::{
20    NetAction, NetActionResponse, NetActionResponseData, NetSim, PacketLocation,
21};
22use std::collections::HashMap;
23
24fn main() {
25    // Create a diamond graph
26    let graph = create_diamond_graph();
27    println!("Created diamond graph: A -> B,C -> D");
28    println!("D requires inputs from BOTH B and C\n");
29
30    let mut net = NetSim::new(graph);
31
32    // Create two packets and place them on edges from A
33    let packet1 = create_packet(&mut net);
34    let packet2 = create_packet(&mut net);
35    println!("Created packets: {} and {}", packet1, packet2);
36
37    // Place packet1 on edge A -> B
38    let edge_a_b = edge_location("A", "out1", "B", "in");
39    net.do_action(&NetAction::TransportPacketToLocation(
40        packet1.clone(),
41        edge_a_b,
42    ));
43    println!("Placed packet1 on edge A -> B");
44
45    // Place packet2 on edge A -> C
46    let edge_a_c = edge_location("A", "out2", "C", "in");
47    net.do_action(&NetAction::TransportPacketToLocation(
48        packet2.clone(),
49        edge_a_c,
50    ));
51    println!("Placed packet2 on edge A -> C");
52
53    // Run network - packets move to B and C, triggering epochs
54    net.run_until_blocked();
55
56    let startable = net.get_startable_epochs();
57    println!(
58        "\nAfter first run: {} startable epochs (B and C)",
59        startable.len()
60    );
61
62    // Process B and C, sending outputs to D
63    for epoch_id in startable {
64        let epoch = net.get_epoch(&epoch_id).unwrap();
65        let node_name = epoch.node_name.clone();
66        println!("\nProcessing node {}", node_name);
67
68        // Start the epoch
69        let started = match net.do_action(&NetAction::StartEpoch(epoch_id.clone())) {
70            NetActionResponse::Success(NetActionResponseData::StartedEpoch(e), _) => e,
71            _ => panic!("Failed to start epoch"),
72        };
73
74        // Find and consume the input packet
75        let input_packet = started.in_salvo.packets[0].1.clone();
76        net.do_action(&NetAction::ConsumePacket(input_packet));
77
78        // Create output packet
79        let output = create_packet_in_epoch(&mut net, &started.id);
80
81        // Load into output port and send
82        net.do_action(&NetAction::LoadPacketIntoOutputPort(
83            output,
84            "out".to_string(),
85        ));
86        net.do_action(&NetAction::SendOutputSalvo(
87            started.id.clone(),
88            "default".to_string(),
89        ));
90
91        // Finish epoch
92        net.do_action(&NetAction::FinishEpoch(started.id));
93        println!("  Finished {} - sent packet to D", node_name);
94    }
95
96    // Run network - packets move from B->D and C->D edges to D's input ports
97    net.run_until_blocked();
98
99    // Check D's input ports
100    let d_in1 = PacketLocation::InputPort("D".to_string(), "in1".to_string());
101    let d_in2 = PacketLocation::InputPort("D".to_string(), "in2".to_string());
102    println!("\nD's input ports:");
103    println!("  in1 (from B): {} packets", net.packet_count_at(&d_in1));
104    println!("  in2 (from C): {} packets", net.packet_count_at(&d_in2));
105
106    // D should now have a startable epoch (both inputs present)
107    let startable_d = net.get_startable_epochs();
108    println!("\nStartable epochs at D: {}", startable_d.len());
109
110    if let Some(d_epoch_id) = startable_d.first() {
111        let d_epoch = net.get_epoch(d_epoch_id).unwrap();
112        println!(
113            "D's epoch received {} packets from both branches!",
114            d_epoch.in_salvo.packets.len()
115        );
116    }
117
118    println!("\nDiamond flow example complete!");
119}
120
121fn create_diamond_graph() -> Graph {
122    // Node A: source with two outputs
123    let node_a = Node {
124        name: "A".to_string(),
125        in_ports: HashMap::new(),
126        out_ports: [
127            (
128                "out1".to_string(),
129                Port {
130                    slots_spec: PortSlotSpec::Infinite,
131                },
132            ),
133            (
134                "out2".to_string(),
135                Port {
136                    slots_spec: PortSlotSpec::Infinite,
137                },
138            ),
139        ]
140        .into(),
141        in_salvo_conditions: HashMap::new(),
142        out_salvo_conditions: HashMap::new(),
143    };
144
145    // Node B: one input, one output
146    let node_b = create_simple_node("B");
147
148    // Node C: one input, one output
149    let node_c = create_simple_node("C");
150
151    // Node D: TWO inputs (requires both), no outputs
152    let node_d = Node {
153        name: "D".to_string(),
154        in_ports: [
155            (
156                "in1".to_string(),
157                Port {
158                    slots_spec: PortSlotSpec::Infinite,
159                },
160            ),
161            (
162                "in2".to_string(),
163                Port {
164                    slots_spec: PortSlotSpec::Infinite,
165                },
166            ),
167        ]
168        .into(),
169        out_ports: HashMap::new(),
170        in_salvo_conditions: [(
171            "default".to_string(),
172            SalvoCondition {
173                max_salvos: MaxSalvos::Finite(1),
174                ports: [
175                    ("in1".to_string(), PacketCount::All),
176                    ("in2".to_string(), PacketCount::All),
177                ]
178                .into_iter()
179                .collect(),
180                // Require BOTH inputs to be non-empty
181                term: SalvoConditionTerm::And(vec![
182                    SalvoConditionTerm::Port {
183                        port_name: "in1".to_string(),
184                        state: PortState::NonEmpty,
185                    },
186                    SalvoConditionTerm::Port {
187                        port_name: "in2".to_string(),
188                        state: PortState::NonEmpty,
189                    },
190                ]),
191            },
192        )]
193        .into(),
194        out_salvo_conditions: HashMap::new(),
195    };
196
197    let edges = vec![
198        create_edge("A", "out1", "B", "in"),
199        create_edge("A", "out2", "C", "in"),
200        create_edge("B", "out", "D", "in1"),
201        create_edge("C", "out", "D", "in2"),
202    ];
203
204    let graph = Graph::new(vec![node_a, node_b, node_c, node_d], edges);
205    assert!(graph.validate().is_empty(), "Graph validation failed");
206    graph
207}
208
209fn create_simple_node(name: &str) -> Node {
210    Node {
211        name: name.to_string(),
212        in_ports: [(
213            "in".to_string(),
214            Port {
215                slots_spec: PortSlotSpec::Infinite,
216            },
217        )]
218        .into(),
219        out_ports: [(
220            "out".to_string(),
221            Port {
222                slots_spec: PortSlotSpec::Infinite,
223            },
224        )]
225        .into(),
226        in_salvo_conditions: [(
227            "default".to_string(),
228            SalvoCondition {
229                max_salvos: MaxSalvos::Finite(1),
230                ports: [("in".to_string(), PacketCount::All)].into_iter().collect(),
231                term: SalvoConditionTerm::Port {
232                    port_name: "in".to_string(),
233                    state: PortState::NonEmpty,
234                },
235            },
236        )]
237        .into(),
238        out_salvo_conditions: [(
239            "default".to_string(),
240            SalvoCondition {
241                max_salvos: MaxSalvos::Infinite,
242                ports: [("out".to_string(), PacketCount::All)]
243                    .into_iter()
244                    .collect(),
245                term: SalvoConditionTerm::Port {
246                    port_name: "out".to_string(),
247                    state: PortState::NonEmpty,
248                },
249            },
250        )]
251        .into(),
252    }
253}
254
255fn create_edge(src_node: &str, src_port: &str, tgt_node: &str, tgt_port: &str) -> Edge {
256    Edge {
257        source: PortRef {
258            node_name: src_node.to_string(),
259            port_type: PortType::Output,
260            port_name: src_port.to_string(),
261        },
262        target: PortRef {
263            node_name: tgt_node.to_string(),
264            port_type: PortType::Input,
265            port_name: tgt_port.to_string(),
266        },
267    }
268}
269
270fn edge_location(src_node: &str, src_port: &str, tgt_node: &str, tgt_port: &str) -> PacketLocation {
271    PacketLocation::Edge(Edge {
272        source: PortRef {
273            node_name: src_node.to_string(),
274            port_type: PortType::Output,
275            port_name: src_port.to_string(),
276        },
277        target: PortRef {
278            node_name: tgt_node.to_string(),
279            port_type: PortType::Input,
280            port_name: tgt_port.to_string(),
281        },
282    })
283}
284
285fn create_packet(net: &mut NetSim) -> ulid::Ulid {
286    match net.do_action(&NetAction::CreatePacket(None)) {
287        NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id,
288        _ => panic!("Failed to create packet"),
289    }
290}
291
292fn create_packet_in_epoch(net: &mut NetSim, epoch_id: &ulid::Ulid) -> ulid::Ulid {
293    match net.do_action(&NetAction::CreatePacket(Some(epoch_id.clone()))) {
294        NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id,
295        _ => panic!("Failed to create packet in epoch"),
296    }
297}