Skip to main content

moonpool_transport/rpc/
endpoint_map.rs

1//! EndpointMap: Token → receiver routing (FDB pattern).
2//!
3//! Routes incoming packets by token to registered receivers.
4//! Uses hybrid lookup: O(1) array for well-known tokens, HashMap for dynamic.
5//!
6//! # FDB Reference
7//! From FlowTransport.actor.cpp:80-220
8
9use std::collections::HashMap;
10use std::rc::Rc;
11
12use crate::{UID, WELL_KNOWN_RESERVED_COUNT, WellKnownToken};
13use moonpool_sim::sometimes_assert;
14
15use crate::error::MessagingError;
16
17/// Trait for receiving deserialized messages from the transport layer.
18///
19/// Implementors handle incoming packets dispatched by EndpointMap.
20/// The `receive` method is called synchronously during packet processing.
21pub trait MessageReceiver {
22    /// Process an incoming message payload.
23    ///
24    /// # Arguments
25    /// * `payload` - Raw bytes to be deserialized by the receiver
26    fn receive(&self, payload: &[u8]);
27
28    /// Whether this receiver handles a stream of messages (default: true).
29    ///
30    /// Stream receivers can receive multiple messages over their lifetime.
31    /// Non-stream receivers (promises) expect exactly one message.
32    fn is_stream(&self) -> bool {
33        true
34    }
35}
36
37/// Maps endpoint tokens to message receivers.
38///
39/// # Design
40///
41/// - **Well-known endpoints**: O(1) array access via token index (0-63)
42/// - **Dynamic endpoints**: HashMap lookup by full UID
43///
44/// Well-known endpoints are checked first for hot-path performance.
45///
46/// # FDB Reference
47/// From FlowTransport.actor.cpp:80-220 (EndpointMap class)
48pub struct EndpointMap {
49    /// Well-known receivers indexed by token.second (0-63).
50    /// These use O(1) array access for hot-path performance.
51    well_known: [Option<Rc<dyn MessageReceiver>>; WELL_KNOWN_RESERVED_COUNT],
52
53    /// Dynamic receivers keyed by full UID.
54    /// Used for endpoints allocated at runtime.
55    dynamic: HashMap<UID, Rc<dyn MessageReceiver>>,
56
57    /// Counter for metrics and debugging.
58    registration_count: u64,
59    deregistration_count: u64,
60}
61
62impl Default for EndpointMap {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl EndpointMap {
69    /// Create a new empty endpoint map.
70    pub fn new() -> Self {
71        Self {
72            well_known: std::array::from_fn(|_| None),
73            dynamic: HashMap::new(),
74            registration_count: 0,
75            deregistration_count: 0,
76        }
77    }
78
79    /// Register a well-known endpoint.
80    ///
81    /// Well-known endpoints have deterministic tokens and use O(1) array lookup.
82    ///
83    /// # Errors
84    ///
85    /// Returns error if the token index is out of range.
86    ///
87    /// # Example
88    ///
89    /// ```ignore
90    /// let mut map = EndpointMap::new();
91    /// let receiver = Rc::new(MyReceiver::new());
92    /// map.insert_well_known(WellKnownToken::Ping, receiver)?;
93    /// ```
94    pub fn insert_well_known(
95        &mut self,
96        token: WellKnownToken,
97        receiver: Rc<dyn MessageReceiver>,
98    ) -> Result<(), MessagingError> {
99        let index = token.as_u32() as usize;
100        if index >= WELL_KNOWN_RESERVED_COUNT {
101            return Err(MessagingError::InvalidWellKnownToken {
102                index: token.as_u32(),
103                max: WELL_KNOWN_RESERVED_COUNT,
104            });
105        }
106        self.well_known[index] = Some(receiver);
107        self.registration_count += 1;
108        sometimes_assert!(
109            well_known_registered,
110            true,
111            "Well-known endpoint registered successfully"
112        );
113        Ok(())
114    }
115
116    /// Register a dynamic endpoint with the given UID.
117    ///
118    /// Dynamic endpoints use HashMap lookup. Call this when you need
119    /// a specific UID (e.g., for request-response correlation).
120    ///
121    /// # Arguments
122    ///
123    /// * `token` - The UID for this endpoint
124    /// * `receiver` - The receiver to handle incoming messages
125    pub fn insert(&mut self, token: UID, receiver: Rc<dyn MessageReceiver>) {
126        self.dynamic.insert(token, receiver);
127        self.registration_count += 1;
128    }
129
130    /// Look up a receiver by token.
131    ///
132    /// Checks well-known endpoints first (O(1)), then dynamic (O(1) amortized).
133    ///
134    /// # Returns
135    ///
136    /// The receiver if found, or None if no endpoint is registered for this token.
137    pub fn get(&self, token: &UID) -> Option<Rc<dyn MessageReceiver>> {
138        // Check well-known first (hot path)
139        if token.is_well_known() {
140            let index = token.second as usize;
141            if index < WELL_KNOWN_RESERVED_COUNT
142                && let Some(receiver) = &self.well_known[index]
143            {
144                return Some(Rc::clone(receiver));
145            }
146            return None;
147        }
148
149        // Fall back to dynamic lookup
150        let result = self.dynamic.get(token).cloned();
151        sometimes_assert!(
152            dynamic_lookup_found,
153            result.is_some(),
154            "Dynamic endpoint lookup succeeds"
155        );
156        result
157    }
158
159    /// Remove a dynamic endpoint.
160    ///
161    /// Note: Well-known endpoints cannot be removed.
162    ///
163    /// # Returns
164    ///
165    /// The removed receiver if it existed.
166    pub fn remove(&mut self, token: &UID) -> Option<Rc<dyn MessageReceiver>> {
167        if token.is_well_known() {
168            // Well-known endpoints cannot be removed
169            sometimes_assert!(
170                well_known_removal_rejected,
171                true,
172                "Well-known endpoint removal correctly rejected"
173            );
174            return None;
175        }
176
177        let result = self.dynamic.remove(token);
178        if result.is_some() {
179            self.deregistration_count += 1;
180            sometimes_assert!(
181                endpoint_deregistered,
182                true,
183                "Dynamic endpoint deregistered successfully"
184            );
185        }
186        result
187    }
188
189    /// Get the number of registered well-known endpoints.
190    pub fn well_known_count(&self) -> usize {
191        self.well_known.iter().filter(|e| e.is_some()).count()
192    }
193
194    /// Get the number of registered dynamic endpoints.
195    pub fn dynamic_count(&self) -> usize {
196        self.dynamic.len()
197    }
198
199    /// Get total registration count (for metrics).
200    pub fn registration_count(&self) -> u64 {
201        self.registration_count
202    }
203
204    /// Get total deregistration count (for metrics).
205    pub fn deregistration_count(&self) -> u64 {
206        self.deregistration_count
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use std::cell::RefCell;
213
214    use super::*;
215
216    /// Mock receiver for testing.
217    struct MockReceiver {
218        received: RefCell<Vec<Vec<u8>>>,
219    }
220
221    impl MockReceiver {
222        fn new() -> Self {
223            Self {
224                received: RefCell::new(Vec::new()),
225            }
226        }
227
228        fn received_count(&self) -> usize {
229            self.received.borrow().len()
230        }
231
232        fn last_received(&self) -> Option<Vec<u8>> {
233            self.received.borrow().last().cloned()
234        }
235    }
236
237    impl MessageReceiver for MockReceiver {
238        fn receive(&self, payload: &[u8]) {
239            self.received.borrow_mut().push(payload.to_vec());
240        }
241    }
242
243    #[test]
244    fn test_new_endpoint_map_is_empty() {
245        let map = EndpointMap::new();
246        assert_eq!(map.well_known_count(), 0);
247        assert_eq!(map.dynamic_count(), 0);
248        assert_eq!(map.registration_count(), 0);
249        assert_eq!(map.deregistration_count(), 0);
250    }
251
252    #[test]
253    fn test_insert_well_known() {
254        let mut map = EndpointMap::new();
255        let receiver = Rc::new(MockReceiver::new());
256
257        map.insert_well_known(WellKnownToken::Ping, receiver.clone())
258            .expect("insert should succeed");
259
260        assert_eq!(map.well_known_count(), 1);
261        assert_eq!(map.registration_count(), 1);
262    }
263
264    #[test]
265    fn test_get_well_known() {
266        let mut map = EndpointMap::new();
267        let receiver = Rc::new(MockReceiver::new());
268
269        map.insert_well_known(WellKnownToken::Ping, receiver.clone())
270            .expect("insert should succeed");
271
272        // Look up by well-known UID
273        let token = WellKnownToken::Ping.uid();
274        let found = map.get(&token);
275        assert!(found.is_some());
276    }
277
278    #[test]
279    fn test_get_well_known_not_registered() {
280        let map = EndpointMap::new();
281
282        // Look up unregistered well-known token
283        let token = WellKnownToken::Ping.uid();
284        let found = map.get(&token);
285        assert!(found.is_none());
286    }
287
288    #[test]
289    fn test_insert_dynamic() {
290        let mut map = EndpointMap::new();
291        let receiver = Rc::new(MockReceiver::new());
292        let token = UID::new(0x1234, 0x5678);
293
294        map.insert(token, receiver);
295
296        assert_eq!(map.dynamic_count(), 1);
297        assert_eq!(map.registration_count(), 1);
298    }
299
300    #[test]
301    fn test_get_dynamic() {
302        let mut map = EndpointMap::new();
303        let receiver = Rc::new(MockReceiver::new());
304        let token = UID::new(0x1234, 0x5678);
305
306        map.insert(token, receiver.clone());
307
308        let found = map.get(&token);
309        assert!(found.is_some());
310    }
311
312    #[test]
313    fn test_get_dynamic_not_registered() {
314        let map = EndpointMap::new();
315        let token = UID::new(0x1234, 0x5678);
316
317        let found = map.get(&token);
318        assert!(found.is_none());
319    }
320
321    #[test]
322    fn test_remove_dynamic() {
323        let mut map = EndpointMap::new();
324        let receiver = Rc::new(MockReceiver::new());
325        let token = UID::new(0x1234, 0x5678);
326
327        map.insert(token, receiver);
328        assert_eq!(map.dynamic_count(), 1);
329
330        let removed = map.remove(&token);
331        assert!(removed.is_some());
332        assert_eq!(map.dynamic_count(), 0);
333        assert_eq!(map.deregistration_count(), 1);
334
335        // Should not find it anymore
336        assert!(map.get(&token).is_none());
337    }
338
339    #[test]
340    fn test_remove_well_known_not_allowed() {
341        let mut map = EndpointMap::new();
342        let receiver = Rc::new(MockReceiver::new());
343
344        map.insert_well_known(WellKnownToken::Ping, receiver)
345            .expect("insert should succeed");
346
347        // Attempt to remove well-known endpoint should fail
348        let token = WellKnownToken::Ping.uid();
349        let removed = map.remove(&token);
350        assert!(removed.is_none());
351
352        // Should still be registered
353        assert_eq!(map.well_known_count(), 1);
354    }
355
356    #[test]
357    fn test_receiver_receives_payload() {
358        let mut map = EndpointMap::new();
359        let receiver = Rc::new(MockReceiver::new());
360        let token = UID::new(0x1234, 0x5678);
361
362        map.insert(token, receiver.clone());
363
364        // Dispatch a message
365        if let Some(r) = map.get(&token) {
366            r.receive(b"hello world");
367        }
368
369        assert_eq!(receiver.received_count(), 1);
370        assert_eq!(receiver.last_received(), Some(b"hello world".to_vec()));
371    }
372
373    #[test]
374    fn test_o1_well_known_lookup() {
375        // This test verifies the design intention (O(1) array access)
376        // by checking that well-known tokens use array indexing
377        let mut map = EndpointMap::new();
378        let receiver = Rc::new(MockReceiver::new());
379
380        // Register all well-known tokens
381        for i in 0..4 {
382            // EndpointNotFound, Ping, UnauthorizedEndpoint, FirstAvailable
383            let token = match i {
384                0 => WellKnownToken::EndpointNotFound,
385                1 => WellKnownToken::Ping,
386                2 => WellKnownToken::UnauthorizedEndpoint,
387                3 => WellKnownToken::FirstAvailable,
388                _ => unreachable!(),
389            };
390            map.insert_well_known(token, receiver.clone())
391                .expect("insert should succeed");
392        }
393
394        // All should be found via O(1) array access
395        for i in 0..4u32 {
396            let uid = UID::well_known(i);
397            assert!(
398                map.get(&uid).is_some(),
399                "well-known token {i} should be found"
400            );
401        }
402    }
403}