std_modrpc/role_impls/
stream_receiver.rs

1use crate::proto::{StreamInitState, StreamItemLazy, StreamReceiverConfig};
2use modrpc::RoleSetup;
3use std::collections::BinaryHeap;
4
5#[derive(Clone)]
6pub struct StreamReceiver<T> {
7    _phantom: core::marker::PhantomData<T>,
8}
9
10pub struct StreamReceiverBuilder<T> {
11    stubs: crate::StreamReceiverStubs<T>,
12}
13
14impl<T: mproto::Owned> StreamReceiverBuilder<T> {
15    pub fn new(
16        _name: &'static str,
17        _hooks: crate::StreamReceiverHooks<T>,
18        stubs: crate::StreamReceiverStubs<T>,
19        _config: &StreamReceiverConfig,
20        _init: StreamInitState,
21    ) -> Self {
22        Self { stubs }
23    }
24
25    pub fn create_handle(&self, _setup: &RoleSetup) -> crate::StreamReceiver<T> {
26        crate::StreamReceiver {
27            _phantom: core::marker::PhantomData,
28        }
29    }
30
31    pub fn build<H>(self, setup: &RoleSetup, mut handler: H)
32    where
33        // TODO constrain handler payload type to be compatible with stream's payload type.
34        for<'a> H: modrpc::AsyncHandler<Context<'a> = modrpc::EndpointAddr> + 'static,
35        for<'a> H::Input<'a>: mproto::Lazy<'a>,
36    {
37        use mproto::BaseLen;
38        use std::cmp::Reverse;
39
40        #[allow(type_alias_bounds)]
41        type HandlerInputOwned<'a, H: modrpc::AsyncHandler> =
42            <H::Input<'a> as mproto::Lazy<'a>>::Owned;
43
44        // Wrapper for StreamItem that is Eq + PartialEq + Ord + PartialOrd
45        struct Item {
46            seq: u64,
47            packet: modrpc::Packet,
48        }
49        impl Item {
50            fn sort_key(&self) -> u64 {
51                self.seq
52            }
53        }
54        impl PartialEq for Item {
55            fn eq(&self, other: &Self) -> bool {
56                self.sort_key().eq(&other.sort_key())
57            }
58        }
59        impl Eq for Item {}
60        impl PartialOrd for Item {
61            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
62                self.sort_key().partial_cmp(&other.sort_key())
63            }
64        }
65        impl Ord for Item {
66            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
67                self.sort_key().cmp(&other.sort_key())
68            }
69        }
70
71        let mut heap = BinaryHeap::new();
72        // TODO configurable capacity
73        let (local_queue_tx, mut local_queue_rx) = localq::mpsc::channel::<modrpc::Packet>(64);
74
75        setup.role_spawner().spawn(async move {
76            loop {
77                let Ok(packet) = local_queue_rx.recv().await else {
78                    break;
79                };
80
81                // TODO decode error handling - at least log it?
82
83                // Decode packet header
84                let Ok(packet_header) =
85                    mproto::decode_value::<modrpc::TransmitPacket>(packet.as_ref())
86                else {
87                    continue;
88                };
89
90                let stream_item_offset = modrpc::TransmitPacket::BASE_LEN;
91                let source = packet_header.source;
92
93                // Decode payload
94                let Ok(stream_item) = mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(
95                    &packet.as_ref()[stream_item_offset..],
96                ) else {
97                    continue;
98                };
99
100                let Ok(payload) = stream_item.payload() else {
101                    continue;
102                };
103
104                // Handle event
105                handler.call(source.clone(), payload).await;
106            }
107        });
108
109        self.stubs
110            .item
111            .inline_untyped(setup, move |_source, packet| {
112                let seq = {
113                    let Ok(stream_item) =
114                        mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(&packet)
115                    else {
116                        return;
117                    };
118
119                    let Ok(seq) = stream_item.seq() else {
120                        return;
121                    };
122
123                    seq
124                };
125
126                // Reverse order so that heap produces item with smallest seq.
127                heap.push(Reverse(Item {
128                    seq,
129                    packet: packet.clone(),
130                }));
131
132                let mut next_seq = 0;
133                while let Some(Reverse(stream_item)) = heap.peek() {
134                    if stream_item.seq != next_seq {
135                        break;
136                    }
137                    next_seq += 1;
138
139                    // Unwrap guaranteed to succeed.
140                    let Reverse(stream_item) = heap.pop().unwrap();
141
142                    let _ = local_queue_tx.try_send(stream_item.packet);
143                }
144            })
145            // TODO allow caller to specify load-balance vs subscribe?
146            .subscribe();
147    }
148}