Skip to main content

diamond_flow/
diamond_flow.rs

1//! Example: Diamond flow pattern with branching and merging
2//!
3//! Graph structure:
4//!        A
5//!       / \
6//!      B   C
7//!       \ /
8//!        D
9//!
10//! This example demonstrates:
11//! - Branching: A sends packets to both B and C
12//! - Merging: D waits for packets from both B and C
13//! - Synchronization: D's epoch only triggers when both inputs are present
14
15use indexmap::IndexMap;
16use netrun_sim::graph::{
17    Edge, Graph, MaxSalvos, Node, PacketCount, Port, PortRef, PortSlotSpec, PortState, PortType,
18    SalvoCondition, SalvoConditionTerm,
19};
20use netrun_sim::net::{
21    NetAction, NetActionResponse, NetActionResponseData, NetSim, PacketLocation,
22};
23use std::collections::HashMap;
24
25fn main() {
26    // Create a diamond graph
27    let graph = create_diamond_graph();
28    println!("Created diamond graph: A -> B,C -> D");
29    println!("D requires inputs from BOTH B and C\n");
30
31    let mut net = NetSim::new(graph);
32
33    // Create two packets and place them on edges from A
34    let packet1 = create_packet(&mut net);
35    let packet2 = create_packet(&mut net);
36    println!("Created packets: {} and {}", packet1, packet2);
37
38    // Place packet1 on edge A -> B
39    let edge_a_b = edge_location("A", "out1", "B", "in");
40    net.do_action(&NetAction::TransportPacketToLocation(
41        packet1.clone(),
42        edge_a_b,
43    ));
44    println!("Placed packet1 on edge A -> B");
45
46    // Place packet2 on edge A -> C
47    let edge_a_c = edge_location("A", "out2", "C", "in");
48    net.do_action(&NetAction::TransportPacketToLocation(
49        packet2.clone(),
50        edge_a_c,
51    ));
52    println!("Placed packet2 on edge A -> C");
53
54    // Run network - packets move to B and C, triggering epochs
55    net.run_until_blocked();
56
57    let startable = net.get_startable_epochs();
58    println!(
59        "\nAfter first run: {} startable epochs (B and C)",
60        startable.len()
61    );
62
63    // Process B and C, sending outputs to D
64    for epoch_id in startable {
65        let epoch = net.get_epoch(&epoch_id).unwrap();
66        let node_name = epoch.node_name.clone();
67        println!("\nProcessing node {}", node_name);
68
69        // Start the epoch
70        let started = match net.do_action(&NetAction::StartEpoch(epoch_id.clone())) {
71            NetActionResponse::Success(NetActionResponseData::StartedEpoch(e), _) => e,
72            _ => panic!("Failed to start epoch"),
73        };
74
75        // Find and consume the input packet
76        let input_packet = started.in_salvo.packets[0].1.clone();
77        net.do_action(&NetAction::ConsumePacket(input_packet));
78
79        // Create output packet
80        let output = create_packet_in_epoch(&mut net, &started.id);
81
82        // Load into output port and send
83        net.do_action(&NetAction::LoadPacketIntoOutputPort(
84            output,
85            "out".to_string(),
86        ));
87        net.do_action(&NetAction::SendOutputSalvo(
88            started.id.clone(),
89            "default".to_string(),
90        ));
91
92        // Finish epoch
93        net.do_action(&NetAction::FinishEpoch(started.id));
94        println!("  Finished {} - sent packet to D", node_name);
95    }
96
97    // Run network - packets move from B->D and C->D edges to D's input ports
98    net.run_until_blocked();
99
100    // Check D's input ports
101    let d_in1 = PacketLocation::InputPort("D".to_string(), "in1".to_string());
102    let d_in2 = PacketLocation::InputPort("D".to_string(), "in2".to_string());
103    println!("\nD's input ports:");
104    println!("  in1 (from B): {} packets", net.packet_count_at(&d_in1));
105    println!("  in2 (from C): {} packets", net.packet_count_at(&d_in2));
106
107    // D should now have a startable epoch (both inputs present)
108    let startable_d = net.get_startable_epochs();
109    println!("\nStartable epochs at D: {}", startable_d.len());
110
111    if let Some(d_epoch_id) = startable_d.first() {
112        let d_epoch = net.get_epoch(d_epoch_id).unwrap();
113        println!(
114            "D's epoch received {} packets from both branches!",
115            d_epoch.in_salvo.packets.len()
116        );
117    }
118
119    println!("\nDiamond flow example complete!");
120}
121
122fn create_diamond_graph() -> Graph {
123    // Node A: source with two outputs
124    let node_a = Node {
125        name: "A".to_string(),
126        in_ports: HashMap::new(),
127        out_ports: [
128            (
129                "out1".to_string(),
130                Port {
131                    slots_spec: PortSlotSpec::Infinite,
132                },
133            ),
134            (
135                "out2".to_string(),
136                Port {
137                    slots_spec: PortSlotSpec::Infinite,
138                },
139            ),
140        ]
141        .into(),
142        in_salvo_conditions: IndexMap::new(),
143        out_salvo_conditions: IndexMap::new(),
144        dependency_request_config: None,
145    };
146
147    // Node B: one input, one output
148    let node_b = create_simple_node("B");
149
150    // Node C: one input, one output
151    let node_c = create_simple_node("C");
152
153    // Node D: TWO inputs (requires both), no outputs
154    let node_d = Node {
155        name: "D".to_string(),
156        in_ports: [
157            (
158                "in1".to_string(),
159                Port {
160                    slots_spec: PortSlotSpec::Infinite,
161                },
162            ),
163            (
164                "in2".to_string(),
165                Port {
166                    slots_spec: PortSlotSpec::Infinite,
167                },
168            ),
169        ]
170        .into(),
171        out_ports: HashMap::new(),
172        in_salvo_conditions: IndexMap::from([(
173            "default".to_string(),
174            SalvoCondition {
175                max_salvos: MaxSalvos::Finite(1),
176                ports: [
177                    ("in1".to_string(), PacketCount::All),
178                    ("in2".to_string(), PacketCount::All),
179                ]
180                .into_iter()
181                .collect(),
182                // Require BOTH inputs to be non-empty
183                term: SalvoConditionTerm::And(vec![
184                    SalvoConditionTerm::Port {
185                        port_name: "in1".to_string(),
186                        state: PortState::NonEmpty,
187                    },
188                    SalvoConditionTerm::Port {
189                        port_name: "in2".to_string(),
190                        state: PortState::NonEmpty,
191                    },
192                ]),
193            },
194        )]),
195        out_salvo_conditions: IndexMap::new(),
196        dependency_request_config: None,
197    };
198
199    let edges = vec![
200        create_edge("A", "out1", "B", "in"),
201        create_edge("A", "out2", "C", "in"),
202        create_edge("B", "out", "D", "in1"),
203        create_edge("C", "out", "D", "in2"),
204    ];
205
206    let graph = Graph::new(vec![node_a, node_b, node_c, node_d], edges);
207    assert!(graph.validate().is_empty(), "Graph validation failed");
208    graph
209}
210
211fn create_simple_node(name: &str) -> Node {
212    Node {
213        name: name.to_string(),
214        in_ports: [(
215            "in".to_string(),
216            Port {
217                slots_spec: PortSlotSpec::Infinite,
218            },
219        )]
220        .into(),
221        out_ports: [(
222            "out".to_string(),
223            Port {
224                slots_spec: PortSlotSpec::Infinite,
225            },
226        )]
227        .into(),
228        in_salvo_conditions: IndexMap::from([(
229            "default".to_string(),
230            SalvoCondition {
231                max_salvos: MaxSalvos::Finite(1),
232                ports: [("in".to_string(), PacketCount::All)].into_iter().collect(),
233                term: SalvoConditionTerm::Port {
234                    port_name: "in".to_string(),
235                    state: PortState::NonEmpty,
236                },
237            },
238        )]),
239        out_salvo_conditions: IndexMap::from([(
240            "default".to_string(),
241            SalvoCondition {
242                max_salvos: MaxSalvos::Infinite,
243                ports: [("out".to_string(), PacketCount::All)]
244                    .into_iter()
245                    .collect(),
246                term: SalvoConditionTerm::Port {
247                    port_name: "out".to_string(),
248                    state: PortState::NonEmpty,
249                },
250            },
251        )]),
252        dependency_request_config: None,
253    }
254}
255
256fn create_edge(src_node: &str, src_port: &str, tgt_node: &str, tgt_port: &str) -> Edge {
257    Edge {
258        source: PortRef {
259            node_name: src_node.to_string(),
260            port_type: PortType::Output,
261            port_name: src_port.to_string(),
262        },
263        target: PortRef {
264            node_name: tgt_node.to_string(),
265            port_type: PortType::Input,
266            port_name: tgt_port.to_string(),
267        },
268    }
269}
270
271fn edge_location(src_node: &str, src_port: &str, tgt_node: &str, tgt_port: &str) -> PacketLocation {
272    PacketLocation::Edge(Edge {
273        source: PortRef {
274            node_name: src_node.to_string(),
275            port_type: PortType::Output,
276            port_name: src_port.to_string(),
277        },
278        target: PortRef {
279            node_name: tgt_node.to_string(),
280            port_type: PortType::Input,
281            port_name: tgt_port.to_string(),
282        },
283    })
284}
285
286fn create_packet(net: &mut NetSim) -> ulid::Ulid {
287    match net.do_action(&NetAction::CreatePacket(None)) {
288        NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id,
289        _ => panic!("Failed to create packet"),
290    }
291}
292
293fn create_packet_in_epoch(net: &mut NetSim, epoch_id: &ulid::Ulid) -> ulid::Ulid {
294    match net.do_action(&NetAction::CreatePacket(Some(epoch_id.clone()))) {
295        NetActionResponse::Success(NetActionResponseData::Packet(id), _) => id,
296        _ => panic!("Failed to create packet in epoch"),
297    }
298}