std_modrpc/role_impls/
stream_receiver.rs

1use crate::proto::{StreamInitState, StreamItemLazy, StreamReceiverConfig};
2use std::collections::BinaryHeap;
3use modrpc::RoleSetup;
4
5#[derive(Clone)]
6pub struct StreamReceiver<T> {
7    _phantom: core::marker::PhantomData<T>,
8}
9
10pub struct StreamReceiverBuilder<T> {
11    stubs: crate::StreamReceiverStubs<T>,
12}
13
14impl<T: mproto::Owned> StreamReceiverBuilder<T> {
15    pub fn new(
16        _name: &'static str,
17        _hooks: crate::StreamReceiverHooks<T>,
18        stubs: crate::StreamReceiverStubs<T>,
19        _config: &StreamReceiverConfig,
20        _init: StreamInitState,
21    ) -> Self {
22        Self { stubs }
23    }
24
25    pub fn create_handle(
26        &self,
27        _setup: &RoleSetup,
28    ) -> crate::StreamReceiver<T> {
29        crate::StreamReceiver {
30            _phantom: core::marker::PhantomData,
31        }
32    }
33
34    pub fn build<H>(
35        self,
36        setup: &RoleSetup,
37        mut handler: H,
38    )
39        where
40            // TODO constrain handler payload type to be compatible with stream's payload type.
41            for<'a> H: modrpc::AsyncHandler<Context<'a> = modrpc::EndpointAddr> + 'static,
42            for<'a> H::Input<'a>: mproto::Lazy<'a>,
43    {
44        use std::cmp::Reverse;
45        use mproto::BaseLen;
46
47        #[allow(type_alias_bounds)]
48        type HandlerInputOwned<'a, H: modrpc::AsyncHandler> =
49            <H::Input<'a> as mproto::Lazy<'a>>::Owned;
50
51        // Wrapper for StreamItem that is Eq + PartialEq + Ord + PartialOrd
52        struct Item { seq: u64, packet: modrpc::Packet }
53        impl Item { fn sort_key(&self) -> u64 { self.seq} }
54        impl PartialEq for Item {
55            fn eq(&self, other: &Self) -> bool { self.sort_key().eq(&other.sort_key()) }
56        }
57        impl Eq for Item { }
58        impl PartialOrd for Item {
59            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { self.sort_key().partial_cmp(&other.sort_key()) }
60        }
61        impl Ord for Item {
62            fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.sort_key().cmp(&other.sort_key()) }
63        }
64
65        let mut heap = BinaryHeap::new();
66        // TODO configurable capacity
67        let (local_queue_tx, mut local_queue_rx) = localq::mpsc::channel::<modrpc::Packet>(64);
68
69        setup.role_spawner().spawn(async move {
70            loop {
71                let Ok(packet) = local_queue_rx.recv().await else {
72                    break;
73                };
74
75                // TODO decode error handling - at least log it?
76
77                // Decode packet header
78                let Ok(packet_header) =
79                    mproto::decode_value::<modrpc::TransmitPacket>(packet.as_ref())
80                else { continue; };
81
82                let stream_item_offset = modrpc::TransmitPacket::BASE_LEN;
83                let source = packet_header.source;
84
85                // Decode payload
86                let Ok(stream_item) =
87                    mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(
88                        &packet.as_ref()[stream_item_offset..]
89                    )
90                else { continue; };
91
92                let Ok(payload) = stream_item.payload() else { continue; };
93
94                // Handle event
95                handler.call(source.clone(), payload).await;
96            }
97        });
98
99        self.stubs.item
100            .inline_untyped(setup, move |_source, packet| {
101                let seq = {
102                    let Ok(stream_item) =
103                        mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(&packet)
104                    else {
105                        return;
106                    };
107
108                    let Ok(seq) = stream_item.seq() else {
109                        return;
110                    };
111
112                    seq
113                };
114
115                // Reverse order so that heap produces item with smallest seq.
116                heap.push(Reverse(Item { seq, packet: packet.clone() }));
117
118                let mut next_seq = 0;
119                while let Some(Reverse(stream_item)) = heap.peek() {
120                    if stream_item.seq != next_seq { break; }
121                    next_seq += 1;
122
123                    // Unwrap guaranteed to succeed.
124                    let Reverse(stream_item) = heap.pop().unwrap();
125
126                    let _ = local_queue_tx.try_send(stream_item.packet);
127                }
128            })
129            // TODO allow caller to specify load-balance vs subscribe?
130            .subscribe();
131    }
132}