std_modrpc/role_impls/
stream_receiver.rs1use crate::proto::{StreamInitState, StreamItemLazy, StreamReceiverConfig};
2use std::collections::BinaryHeap;
3use modrpc::RoleSetup;
4
5#[derive(Clone)]
6pub struct StreamReceiver<T> {
7 _phantom: core::marker::PhantomData<T>,
8}
9
10pub struct StreamReceiverBuilder<T> {
11 stubs: crate::StreamReceiverStubs<T>,
12}
13
14impl<T: mproto::Owned> StreamReceiverBuilder<T> {
15 pub fn new(
16 _name: &'static str,
17 _hooks: crate::StreamReceiverHooks<T>,
18 stubs: crate::StreamReceiverStubs<T>,
19 _config: &StreamReceiverConfig,
20 _init: StreamInitState,
21 ) -> Self {
22 Self { stubs }
23 }
24
25 pub fn create_handle(
26 &self,
27 _setup: &RoleSetup,
28 ) -> crate::StreamReceiver<T> {
29 crate::StreamReceiver {
30 _phantom: core::marker::PhantomData,
31 }
32 }
33
34 pub fn build<H>(
35 self,
36 setup: &RoleSetup,
37 mut handler: H,
38 )
39 where
40 for<'a> H: modrpc::AsyncHandler<Context<'a> = modrpc::EndpointAddr> + 'static,
42 for<'a> H::Input<'a>: mproto::Lazy<'a>,
43 {
44 use std::cmp::Reverse;
45 use mproto::BaseLen;
46
47 #[allow(type_alias_bounds)]
48 type HandlerInputOwned<'a, H: modrpc::AsyncHandler> =
49 <H::Input<'a> as mproto::Lazy<'a>>::Owned;
50
51 struct Item { seq: u64, packet: modrpc::Packet }
53 impl Item { fn sort_key(&self) -> u64 { self.seq} }
54 impl PartialEq for Item {
55 fn eq(&self, other: &Self) -> bool { self.sort_key().eq(&other.sort_key()) }
56 }
57 impl Eq for Item { }
58 impl PartialOrd for Item {
59 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { self.sort_key().partial_cmp(&other.sort_key()) }
60 }
61 impl Ord for Item {
62 fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.sort_key().cmp(&other.sort_key()) }
63 }
64
65 let mut heap = BinaryHeap::new();
66 let (local_queue_tx, mut local_queue_rx) = localq::mpsc::channel::<modrpc::Packet>(64);
68
69 setup.role_spawner().spawn(async move {
70 loop {
71 let Ok(packet) = local_queue_rx.recv().await else {
72 break;
73 };
74
75 let Ok(packet_header) =
79 mproto::decode_value::<modrpc::TransmitPacket>(packet.as_ref())
80 else { continue; };
81
82 let stream_item_offset = modrpc::TransmitPacket::BASE_LEN;
83 let source = packet_header.source;
84
85 let Ok(stream_item) =
87 mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(
88 &packet.as_ref()[stream_item_offset..]
89 )
90 else { continue; };
91
92 let Ok(payload) = stream_item.payload() else { continue; };
93
94 handler.call(source.clone(), payload).await;
96 }
97 });
98
99 self.stubs.item
100 .inline_untyped(setup, move |_source, packet| {
101 let seq = {
102 let Ok(stream_item) =
103 mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(&packet)
104 else {
105 return;
106 };
107
108 let Ok(seq) = stream_item.seq() else {
109 return;
110 };
111
112 seq
113 };
114
115 heap.push(Reverse(Item { seq, packet: packet.clone() }));
117
118 let mut next_seq = 0;
119 while let Some(Reverse(stream_item)) = heap.peek() {
120 if stream_item.seq != next_seq { break; }
121 next_seq += 1;
122
123 let Reverse(stream_item) = heap.pop().unwrap();
125
126 let _ = local_queue_tx.try_send(stream_item.packet);
127 }
128 })
129 .subscribe();
131 }
132}