laminar_core/dag/routing.rs
1//! Pre-computed routing table for O(1) hot path dispatch.
2//!
3//! The [`RoutingTable`] is built once during DAG finalization (Ring 2) and
4//! provides O(1) lookups during event processing (Ring 0). Each
5//! [`RoutingEntry`] is cache-line aligned (64 bytes) to avoid false sharing.
6//!
7//! # Index Formula
8//!
9//! ```text
10//! index = node_id * MAX_PORTS + port
11//! ```
12//!
13//! For most nodes, `port = 0` (single output). Multi-output operators
14//! use separate ports for distinct output streams.
15
16use super::topology::{NodeId, StreamingDag, MAX_FAN_OUT};
17
18/// Maximum output ports per node.
19///
20/// Each port can route to a different set of targets.
21/// In practice, most nodes use only port 0.
22pub const MAX_PORTS: usize = MAX_FAN_OUT;
23
24/// A cache-line aligned routing entry for a `(node, port)` pair.
25///
26/// Contains the target node IDs and metadata for dispatch.
27/// Sized at exactly 64 bytes to align with CPU cache lines,
28/// preventing false sharing between entries accessed by different cores.
29///
30/// # Layout (64 bytes)
31///
32/// ```text
33/// [ targets: [u32; 8] ][count][mcast][ padding: [u8; 30] ]
34/// | 32 bytes | 1B | 1B | 30 bytes |
35/// ```
36#[repr(C, align(64))]
37#[derive(Clone, Copy)]
38pub struct RoutingEntry {
39 /// Target node IDs (raw `u32` values from [`NodeId`]).
40 pub targets: [u32; MAX_FAN_OUT],
41 /// Number of active targets in the `targets` array.
42 pub target_count: u8,
43 /// Whether this entry routes to multiple consumers (multicast).
44 pub is_multicast: bool,
45 /// Padding to fill the 64-byte cache line.
46 _padding: [u8; 30],
47}
48
49impl RoutingEntry {
50 /// Creates an empty routing entry (no targets).
51 #[must_use]
52 const fn empty() -> Self {
53 Self {
54 targets: [0; MAX_FAN_OUT],
55 target_count: 0,
56 is_multicast: false,
57 _padding: [0; 30],
58 }
59 }
60
61 /// Returns the active target node IDs as a slice.
62 #[inline]
63 #[must_use]
64 pub fn target_ids(&self) -> &[u32] {
65 &self.targets[..self.target_count as usize]
66 }
67
68 /// Returns whether this entry has no targets (terminal/sink node).
69 #[inline]
70 #[must_use]
71 pub fn is_terminal(&self) -> bool {
72 self.target_count == 0
73 }
74}
75
76impl std::fmt::Debug for RoutingEntry {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("RoutingEntry")
79 .field("targets", &self.target_ids())
80 .field("target_count", &self.target_count)
81 .field("is_multicast", &self.is_multicast)
82 .finish_non_exhaustive()
83 }
84}
85
86/// Pre-computed routing table for O(1) DAG event dispatch.
87///
88/// Built from a finalized [`StreamingDag`] during topology construction
89/// (Ring 2). Provides cache-aligned O(1) lookups indexed by `(node_id, port)`.
90///
91/// # Performance Target
92///
93/// Route lookup: < 50ns (single cache-line read).
94#[derive(Debug)]
95pub struct RoutingTable {
96 /// Flat array of routing entries, indexed by `node_id * MAX_PORTS + port`.
97 routes: Box<[RoutingEntry]>,
98 /// Maximum node ID in the table (for bounds checking).
99 max_node_id: u32,
100}
101
102impl RoutingTable {
103 /// Builds a routing table from a finalized [`StreamingDag`].
104 ///
105 /// Scans all nodes and their outgoing edges to populate routing entries.
106 /// All targets for a node are collected into a single entry at port 0.
107 ///
108 /// # Arguments
109 ///
110 /// * `dag` - A finalized `StreamingDag` topology
111 #[must_use]
112 pub fn from_dag(dag: &StreamingDag) -> Self {
113 let max_node_id = dag.nodes().keys().map(|n| n.0).max().unwrap_or(0);
114 let table_size = (max_node_id as usize + 1) * MAX_PORTS;
115 let mut routes = vec![RoutingEntry::empty(); table_size];
116
117 for node in dag.nodes().values() {
118 if node.outputs.is_empty() {
119 continue; // Sink nodes have no routing entries
120 }
121
122 // Collect all downstream targets for this node.
123 let mut targets = [0u32; MAX_FAN_OUT];
124 let mut count = 0u8;
125
126 for &edge_id in &node.outputs {
127 if let Some(edge) = dag.edge(edge_id) {
128 if (count as usize) < MAX_FAN_OUT {
129 targets[count as usize] = edge.target.0;
130 count += 1;
131 }
132 }
133 }
134
135 // Store at port 0 (primary output).
136 let idx = node.id.0 as usize * MAX_PORTS;
137 routes[idx] = RoutingEntry {
138 targets,
139 target_count: count,
140 is_multicast: count > 1,
141 _padding: [0; 30],
142 };
143 }
144
145 Self {
146 routes: routes.into_boxed_slice(),
147 max_node_id,
148 }
149 }
150
151 /// Looks up the routing entry for a `(source, port)` pair.
152 ///
153 /// # Arguments
154 ///
155 /// * `source` - The source node ID
156 /// * `port` - The output port (typically 0)
157 ///
158 /// # Panics
159 ///
160 /// Panics if the computed index is out of bounds.
161 #[inline]
162 #[must_use]
163 pub fn route(&self, source: NodeId, port: u8) -> &RoutingEntry {
164 let idx = source.0 as usize * MAX_PORTS + port as usize;
165 &self.routes[idx]
166 }
167
168 /// Convenience method to get targets for a node's primary output (port 0).
169 #[inline]
170 #[must_use]
171 pub fn node_targets(&self, source: NodeId) -> &RoutingEntry {
172 self.route(source, 0)
173 }
174
175 /// Returns the total number of routing entries in the table.
176 #[inline]
177 #[must_use]
178 pub fn entry_count(&self) -> usize {
179 self.routes.len()
180 }
181
182 /// Returns the maximum node ID covered by the table.
183 #[inline]
184 #[must_use]
185 pub fn max_node_id(&self) -> u32 {
186 self.max_node_id
187 }
188}