Skip to main content

moonpool_transport/rpc/
net_notified_queue.rs

1//! NetNotifiedQueue: Typed message queue with async notification.
2//!
3//! Deserializes incoming bytes into typed messages and provides
4//! async waiting for new messages via Waker-based notification.
5//!
6//! # Pluggable Serialization
7//!
8//! The queue uses a [`MessageCodec`] for deserialization. By default, [`JsonCodec`]
9//! is used, but you can provide any codec that implements the trait.
10//!
11//! ```rust,ignore
12//! use moonpool::{NetNotifiedQueue, Endpoint};
13//! use moonpool_traits::{JsonCodec, MessageCodec};
14//!
15//! // Default JsonCodec
16//! let queue: NetNotifiedQueue<MyMessage> = NetNotifiedQueue::new(endpoint);
17//!
18//! // Custom codec
19//! let queue = NetNotifiedQueue::with_codec(endpoint, MyBincodeCodec);
20//! ```
21//!
22//! # FDB Reference
23//! Based on PromiseStream internal queue pattern from fdbrpc.h
24
25use std::cell::RefCell;
26use std::collections::VecDeque;
27use std::future::Future;
28use std::pin::Pin;
29use std::rc::Rc;
30use std::task::{Context, Poll, Waker};
31
32use serde::de::DeserializeOwned;
33
34use crate::{Endpoint, MessageCodec, NetworkAddress, UID};
35use moonpool_sim::sometimes_assert;
36
37use super::endpoint_map::MessageReceiver;
38
39/// Type-safe message queue with async notification.
40///
41/// Receives raw bytes, deserializes them to type `T`, and queues them.
42/// Consumers can async wait for messages using `recv()` or poll with `try_recv()`.
43///
44/// # Design
45///
46/// - Uses `RefCell` for single-threaded runtime (no Mutex overhead)
47/// - Waker-based notification wakes all waiting consumers
48/// - Deserializes on receive (producer side) to fail fast on bad messages
49/// - Pluggable codec via `C: MessageCodec` type parameter
50///
51/// # Type Safety
52///
53/// The type `T` is baked in at compile time. Only messages that deserialize
54/// to `T` will be accepted. Invalid messages log an error and are dropped.
55pub struct NetNotifiedQueue<T, C: MessageCodec> {
56    /// Internal state wrapped in RefCell for interior mutability.
57    inner: RefCell<NetNotifiedQueueInner<T>>,
58
59    /// Endpoint associated with this queue.
60    endpoint: Endpoint,
61
62    /// Codec for deserializing messages.
63    codec: C,
64}
65
66/// Internal state for the queue.
67struct NetNotifiedQueueInner<T> {
68    /// Message queue (FIFO).
69    queue: VecDeque<T>,
70
71    /// Wakers waiting for messages.
72    wakers: Vec<Waker>,
73
74    /// Whether the queue has been closed (no more messages expected).
75    closed: bool,
76
77    /// Statistics for debugging.
78    messages_received: u64,
79    messages_dropped: u64,
80}
81
82impl<T> Default for NetNotifiedQueueInner<T> {
83    fn default() -> Self {
84        Self {
85            queue: VecDeque::new(),
86            wakers: Vec::new(),
87            closed: false,
88            messages_received: 0,
89            messages_dropped: 0,
90        }
91    }
92}
93
94impl<T, C: MessageCodec> NetNotifiedQueue<T, C> {
95    /// Create a new queue with the given endpoint and codec.
96    pub fn new(endpoint: Endpoint, codec: C) -> Self {
97        Self {
98            inner: RefCell::new(NetNotifiedQueueInner::default()),
99            endpoint,
100            codec,
101        }
102    }
103
104    /// Create a new queue with a dynamically allocated endpoint.
105    ///
106    /// Uses the provided address with a new random UID.
107    pub fn with_address(address: NetworkAddress, codec: C) -> Self {
108        // In real usage, UID should be generated via RandomProvider for determinism.
109        // For now, use a simple sequential ID.
110        let token = UID::new(0, rand_simple_id());
111        Self::new(Endpoint::new(address, token), codec)
112    }
113
114    /// Get the endpoint for this queue.
115    ///
116    /// Senders use this endpoint to address messages to this queue.
117    pub fn endpoint(&self) -> &Endpoint {
118        &self.endpoint
119    }
120
121    /// Try to receive a message without blocking.
122    ///
123    /// Returns `None` if no message is available.
124    pub fn try_recv(&self) -> Option<T> {
125        self.inner.borrow_mut().queue.pop_front()
126    }
127
128    /// Check if the queue is empty.
129    pub fn is_empty(&self) -> bool {
130        self.inner.borrow().queue.is_empty()
131    }
132
133    /// Get the number of messages currently in the queue.
134    pub fn len(&self) -> usize {
135        self.inner.borrow().queue.len()
136    }
137
138    /// Get the total number of messages received.
139    pub fn messages_received(&self) -> u64 {
140        self.inner.borrow().messages_received
141    }
142
143    /// Get the number of messages dropped due to deserialization errors.
144    pub fn messages_dropped(&self) -> u64 {
145        self.inner.borrow().messages_dropped
146    }
147
148    /// Mark the queue as closed.
149    ///
150    /// After closing, `recv()` will return `None` when the queue is empty
151    /// instead of waiting for more messages.
152    pub fn close(&self) {
153        let mut inner = self.inner.borrow_mut();
154        inner.closed = true;
155        // Wake all waiters to let them see the close
156        for waker in inner.wakers.drain(..) {
157            waker.wake();
158        }
159    }
160
161    /// Check if the queue is closed.
162    pub fn is_closed(&self) -> bool {
163        self.inner.borrow().closed
164    }
165
166    /// Push a pre-deserialized message directly (for testing).
167    #[cfg(test)]
168    fn push(&self, message: T) {
169        let mut inner = self.inner.borrow_mut();
170        inner.queue.push_back(message);
171        inner.messages_received += 1;
172        // Wake all waiters
173        for waker in inner.wakers.drain(..) {
174            waker.wake();
175        }
176    }
177}
178
179impl<T: DeserializeOwned, C: MessageCodec> NetNotifiedQueue<T, C> {
180    /// Async receive - waits for a message.
181    ///
182    /// Returns `None` if the queue is closed and empty.
183    pub fn recv(&self) -> RecvFuture<'_, T, C> {
184        RecvFuture { queue: self }
185    }
186}
187
188impl<T: DeserializeOwned + 'static, C: MessageCodec> MessageReceiver for NetNotifiedQueue<T, C> {
189    fn receive(&self, payload: &[u8]) {
190        // Deserialize the message using the codec
191        match self.codec.decode::<T>(payload) {
192            Ok(message) => {
193                sometimes_assert!(
194                    deserialization_success,
195                    true,
196                    "Message deserialized successfully"
197                );
198                let mut inner = self.inner.borrow_mut();
199                inner.queue.push_back(message);
200                inner.messages_received += 1;
201
202                // Wake all waiters
203                let had_waiters = !inner.wakers.is_empty();
204                for waker in inner.wakers.drain(..) {
205                    waker.wake();
206                }
207                if had_waiters {
208                    sometimes_assert!(waker_notified, true, "Wakers notified on new message");
209                }
210            }
211            Err(e) => {
212                sometimes_assert!(
213                    deserialization_failed,
214                    true,
215                    "Message deserialization failed"
216                );
217                // Log error and drop the message
218                tracing::warn!(
219                    endpoint = %self.endpoint.token,
220                    error = %e,
221                    "failed to deserialize message"
222                );
223                self.inner.borrow_mut().messages_dropped += 1;
224            }
225        }
226    }
227}
228
229/// Future returned by `recv()`.
230pub struct RecvFuture<'a, T, C: MessageCodec> {
231    queue: &'a NetNotifiedQueue<T, C>,
232}
233
234impl<T, C: MessageCodec> Future for RecvFuture<'_, T, C> {
235    type Output = Option<T>;
236
237    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
238        let mut inner = self.queue.inner.borrow_mut();
239
240        // Try to get a message
241        if let Some(message) = inner.queue.pop_front() {
242            sometimes_assert!(message_available, true, "Message available immediately");
243            return Poll::Ready(Some(message));
244        }
245
246        // If closed and empty, return None
247        if inner.closed {
248            sometimes_assert!(queue_closed_empty, true, "Queue closed and empty");
249            return Poll::Ready(None);
250        }
251
252        // Register waker and wait
253        sometimes_assert!(recv_pending, true, "Recv waiting for message");
254        inner.wakers.push(cx.waker().clone());
255        Poll::Pending
256    }
257}
258
259/// Wrapper for Rc<NetNotifiedQueue<T, C>> that can be registered with EndpointMap.
260pub struct SharedNetNotifiedQueue<T: DeserializeOwned + 'static, C: MessageCodec>(
261    pub Rc<NetNotifiedQueue<T, C>>,
262);
263
264impl<T: DeserializeOwned + 'static, C: MessageCodec> MessageReceiver
265    for SharedNetNotifiedQueue<T, C>
266{
267    fn receive(&self, payload: &[u8]) {
268        self.0.receive(payload)
269    }
270}
271
272impl<T: DeserializeOwned + 'static, C: MessageCodec> SharedNetNotifiedQueue<T, C> {
273    /// Create a new shared queue.
274    pub fn new(endpoint: Endpoint, codec: C) -> Self {
275        Self(Rc::new(NetNotifiedQueue::new(endpoint, codec)))
276    }
277
278    /// Get a reference to the inner queue.
279    pub fn inner(&self) -> &NetNotifiedQueue<T, C> {
280        &self.0
281    }
282
283    /// Get a clone of the Rc for registration with EndpointMap.
284    pub fn as_receiver(&self) -> Rc<NetNotifiedQueue<T, C>> {
285        Rc::clone(&self.0)
286    }
287}
288
289/// Simple sequential ID generator for testing.
290/// In production, use RandomProvider for deterministic IDs.
291fn rand_simple_id() -> u64 {
292    use std::sync::atomic::{AtomicU64, Ordering};
293    static COUNTER: AtomicU64 = AtomicU64::new(1);
294    COUNTER.fetch_add(1, Ordering::Relaxed)
295}
296
297#[cfg(test)]
298mod tests {
299    use std::net::{IpAddr, Ipv4Addr};
300
301    use super::*;
302    use crate::JsonCodec;
303
304    fn test_endpoint() -> Endpoint {
305        let addr = NetworkAddress::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4500);
306        Endpoint::new(addr, UID::new(1, 1))
307    }
308
309    #[test]
310    fn test_new_queue_is_empty() {
311        let queue: NetNotifiedQueue<String, JsonCodec> =
312            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
313        assert!(queue.is_empty());
314        assert_eq!(queue.len(), 0);
315        assert_eq!(queue.messages_received(), 0);
316    }
317
318    #[test]
319    fn test_push_and_try_recv() {
320        let queue: NetNotifiedQueue<String, JsonCodec> =
321            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
322
323        queue.push("hello".to_string());
324        assert!(!queue.is_empty());
325        assert_eq!(queue.len(), 1);
326
327        let msg = queue.try_recv();
328        assert_eq!(msg, Some("hello".to_string()));
329        assert!(queue.is_empty());
330    }
331
332    #[test]
333    fn test_receive_deserializes() {
334        let queue: NetNotifiedQueue<String, JsonCodec> =
335            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
336
337        // Receive a JSON-encoded string
338        let payload = b"\"hello world\"";
339        queue.receive(payload);
340
341        assert_eq!(queue.len(), 1);
342        assert_eq!(queue.messages_received(), 1);
343        assert_eq!(queue.try_recv(), Some("hello world".to_string()));
344    }
345
346    #[test]
347    fn test_receive_invalid_json_drops() {
348        let queue: NetNotifiedQueue<String, JsonCodec> =
349            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
350
351        // Receive invalid JSON
352        let payload = b"not valid json";
353        queue.receive(payload);
354
355        assert!(queue.is_empty());
356        assert_eq!(queue.messages_received(), 0);
357        assert_eq!(queue.messages_dropped(), 1);
358    }
359
360    #[test]
361    fn test_fifo_ordering() {
362        let queue: NetNotifiedQueue<i32, JsonCodec> =
363            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
364
365        queue.push(1);
366        queue.push(2);
367        queue.push(3);
368
369        assert_eq!(queue.try_recv(), Some(1));
370        assert_eq!(queue.try_recv(), Some(2));
371        assert_eq!(queue.try_recv(), Some(3));
372        assert_eq!(queue.try_recv(), None);
373    }
374
375    #[test]
376    fn test_close_queue() {
377        let queue: NetNotifiedQueue<String, JsonCodec> =
378            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
379
380        assert!(!queue.is_closed());
381        queue.close();
382        assert!(queue.is_closed());
383    }
384
385    #[test]
386    fn test_endpoint_accessor() {
387        let endpoint = test_endpoint();
388        let queue: NetNotifiedQueue<String, JsonCodec> =
389            NetNotifiedQueue::new(endpoint.clone(), JsonCodec);
390
391        assert_eq!(queue.endpoint().token, endpoint.token);
392    }
393
394    #[derive(Debug, PartialEq, serde::Deserialize)]
395    struct TestMessage {
396        id: u32,
397        content: String,
398    }
399
400    #[test]
401    fn test_receive_complex_type() {
402        let queue: NetNotifiedQueue<TestMessage, JsonCodec> =
403            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
404
405        let payload = br#"{"id": 42, "content": "hello"}"#;
406        queue.receive(payload);
407
408        let msg = queue.try_recv();
409        assert_eq!(
410            msg,
411            Some(TestMessage {
412                id: 42,
413                content: "hello".to_string()
414            })
415        );
416    }
417
418    #[test]
419    fn test_shared_queue() {
420        let shared: SharedNetNotifiedQueue<String, JsonCodec> =
421            SharedNetNotifiedQueue::new(test_endpoint(), JsonCodec);
422
423        // Receive through the shared wrapper
424        shared.receive(b"\"shared message\"");
425
426        assert_eq!(
427            shared.inner().try_recv(),
428            Some("shared message".to_string())
429        );
430    }
431
432    #[tokio::test]
433    async fn test_recv_async() {
434        let queue: NetNotifiedQueue<String, JsonCodec> =
435            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
436
437        // Push before recv - should complete immediately
438        queue.push("async hello".to_string());
439
440        let result = queue.recv().await;
441        assert_eq!(result, Some("async hello".to_string()));
442    }
443
444    #[tokio::test]
445    async fn test_recv_closed_empty() {
446        let queue: NetNotifiedQueue<String, JsonCodec> =
447            NetNotifiedQueue::new(test_endpoint(), JsonCodec);
448
449        queue.close();
450
451        let result = queue.recv().await;
452        assert_eq!(result, None);
453    }
454}