Skip to main content

laminar_core/dag/
routing.rs

1//! Pre-computed routing table for O(1) hot path dispatch.
2//!
3//! The [`RoutingTable`] is built once during DAG finalization (Ring 2) and
4//! provides O(1) lookups during event processing (Ring 0). Each
5//! [`RoutingEntry`] is cache-line aligned (64 bytes) to avoid false sharing.
6//!
7//! # Index Formula
8//!
9//! ```text
10//! index = node_id * MAX_PORTS + port
11//! ```
12//!
13//! For most nodes, `port = 0` (single output). Multi-output operators
14//! use separate ports for distinct output streams.
15
16use super::topology::{NodeId, StreamingDag, MAX_FAN_OUT};
17
18/// Maximum output ports per node.
19///
20/// Each port can route to a different set of targets.
21/// In practice, most nodes use only port 0.
22pub const MAX_PORTS: usize = MAX_FAN_OUT;
23
24/// A cache-line aligned routing entry for a `(node, port)` pair.
25///
26/// Contains the target node IDs and metadata for dispatch.
27/// Sized at exactly 64 bytes to align with CPU cache lines,
28/// preventing false sharing between entries accessed by different cores.
29///
30/// # Layout (64 bytes)
31///
32/// ```text
33/// [  targets: [u32; 8]  ][count][mcast][   padding: [u8; 30]   ]
34/// |       32 bytes       | 1B   | 1B   |       30 bytes         |
35/// ```
36#[repr(C, align(64))]
37#[derive(Clone, Copy)]
38pub struct RoutingEntry {
39    /// Target node IDs (raw `u32` values from [`NodeId`]).
40    pub targets: [u32; MAX_FAN_OUT],
41    /// Number of active targets in the `targets` array.
42    pub target_count: u8,
43    /// Whether this entry routes to multiple consumers (multicast).
44    pub is_multicast: bool,
45    /// Padding to fill the 64-byte cache line.
46    _padding: [u8; 30],
47}
48
49impl RoutingEntry {
50    /// Creates an empty routing entry (no targets).
51    #[must_use]
52    const fn empty() -> Self {
53        Self {
54            targets: [0; MAX_FAN_OUT],
55            target_count: 0,
56            is_multicast: false,
57            _padding: [0; 30],
58        }
59    }
60
61    /// Returns the active target node IDs as a slice.
62    #[inline]
63    #[must_use]
64    pub fn target_ids(&self) -> &[u32] {
65        &self.targets[..self.target_count as usize]
66    }
67
68    /// Returns whether this entry has no targets (terminal/sink node).
69    #[inline]
70    #[must_use]
71    pub fn is_terminal(&self) -> bool {
72        self.target_count == 0
73    }
74}
75
76impl std::fmt::Debug for RoutingEntry {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("RoutingEntry")
79            .field("targets", &self.target_ids())
80            .field("target_count", &self.target_count)
81            .field("is_multicast", &self.is_multicast)
82            .finish_non_exhaustive()
83    }
84}
85
86/// Pre-computed routing table for O(1) DAG event dispatch.
87///
88/// Built from a finalized [`StreamingDag`] during topology construction
89/// (Ring 2). Provides cache-aligned O(1) lookups indexed by `(node_id, port)`.
90///
91/// # Performance Target
92///
93/// Route lookup: < 50ns (single cache-line read).
94#[derive(Debug)]
95pub struct RoutingTable {
96    /// Flat array of routing entries, indexed by `node_id * MAX_PORTS + port`.
97    routes: Box<[RoutingEntry]>,
98    /// Maximum node ID in the table (for bounds checking).
99    max_node_id: u32,
100}
101
102impl RoutingTable {
103    /// Builds a routing table from a finalized [`StreamingDag`].
104    ///
105    /// Scans all nodes and their outgoing edges to populate routing entries.
106    /// All targets for a node are collected into a single entry at port 0.
107    ///
108    /// # Arguments
109    ///
110    /// * `dag` - A finalized `StreamingDag` topology
111    #[must_use]
112    pub fn from_dag(dag: &StreamingDag) -> Self {
113        let max_node_id = dag.nodes().keys().map(|n| n.0).max().unwrap_or(0);
114        let table_size = (max_node_id as usize + 1) * MAX_PORTS;
115        let mut routes = vec![RoutingEntry::empty(); table_size];
116
117        for node in dag.nodes().values() {
118            if node.outputs.is_empty() {
119                continue; // Sink nodes have no routing entries
120            }
121
122            // Collect all downstream targets for this node.
123            let mut targets = [0u32; MAX_FAN_OUT];
124            let mut count = 0u8;
125
126            for &edge_id in &node.outputs {
127                if let Some(edge) = dag.edge(edge_id) {
128                    if (count as usize) < MAX_FAN_OUT {
129                        targets[count as usize] = edge.target.0;
130                        count += 1;
131                    }
132                }
133            }
134
135            // Store at port 0 (primary output).
136            let idx = node.id.0 as usize * MAX_PORTS;
137            routes[idx] = RoutingEntry {
138                targets,
139                target_count: count,
140                is_multicast: count > 1,
141                _padding: [0; 30],
142            };
143        }
144
145        Self {
146            routes: routes.into_boxed_slice(),
147            max_node_id,
148        }
149    }
150
151    /// Looks up the routing entry for a `(source, port)` pair.
152    ///
153    /// # Arguments
154    ///
155    /// * `source` - The source node ID
156    /// * `port` - The output port (typically 0)
157    ///
158    /// # Panics
159    ///
160    /// Panics if the computed index is out of bounds.
161    #[inline]
162    #[must_use]
163    pub fn route(&self, source: NodeId, port: u8) -> &RoutingEntry {
164        let idx = source.0 as usize * MAX_PORTS + port as usize;
165        &self.routes[idx]
166    }
167
168    /// Convenience method to get targets for a node's primary output (port 0).
169    #[inline]
170    #[must_use]
171    pub fn node_targets(&self, source: NodeId) -> &RoutingEntry {
172        self.route(source, 0)
173    }
174
175    /// Returns the total number of routing entries in the table.
176    #[inline]
177    #[must_use]
178    pub fn entry_count(&self) -> usize {
179        self.routes.len()
180    }
181
182    /// Returns the maximum node ID covered by the table.
183    #[inline]
184    #[must_use]
185    pub fn max_node_id(&self) -> u32 {
186        self.max_node_id
187    }
188}