std_modrpc/role_impls/
stream_receiver.rs1use crate::proto::{StreamInitState, StreamItemLazy, StreamReceiverConfig};
2use modrpc::RoleSetup;
3use std::collections::BinaryHeap;
4
5#[derive(Clone)]
6pub struct StreamReceiver<T> {
7 _phantom: core::marker::PhantomData<T>,
8}
9
10pub struct StreamReceiverBuilder<T> {
11 stubs: crate::StreamReceiverStubs<T>,
12}
13
14impl<T: mproto::Owned> StreamReceiverBuilder<T> {
15 pub fn new(
16 _name: &'static str,
17 _hooks: crate::StreamReceiverHooks<T>,
18 stubs: crate::StreamReceiverStubs<T>,
19 _config: &StreamReceiverConfig,
20 _init: StreamInitState,
21 ) -> Self {
22 Self { stubs }
23 }
24
25 pub fn create_handle(&self, _setup: &RoleSetup) -> crate::StreamReceiver<T> {
26 crate::StreamReceiver {
27 _phantom: core::marker::PhantomData,
28 }
29 }
30
31 pub fn build<H>(self, setup: &RoleSetup, mut handler: H)
32 where
33 for<'a> H: modrpc::AsyncHandler<Context<'a> = modrpc::EndpointAddr> + 'static,
35 for<'a> H::Input<'a>: mproto::Lazy<'a>,
36 {
37 use mproto::BaseLen;
38 use std::cmp::Reverse;
39
40 #[allow(type_alias_bounds)]
41 type HandlerInputOwned<'a, H: modrpc::AsyncHandler> =
42 <H::Input<'a> as mproto::Lazy<'a>>::Owned;
43
44 struct Item {
46 seq: u64,
47 packet: modrpc::Packet,
48 }
49 impl Item {
50 fn sort_key(&self) -> u64 {
51 self.seq
52 }
53 }
54 impl PartialEq for Item {
55 fn eq(&self, other: &Self) -> bool {
56 self.sort_key().eq(&other.sort_key())
57 }
58 }
59 impl Eq for Item {}
60 impl PartialOrd for Item {
61 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
62 self.sort_key().partial_cmp(&other.sort_key())
63 }
64 }
65 impl Ord for Item {
66 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
67 self.sort_key().cmp(&other.sort_key())
68 }
69 }
70
71 let mut heap = BinaryHeap::new();
72 let (local_queue_tx, mut local_queue_rx) = localq::mpsc::channel::<modrpc::Packet>(64);
74
75 setup.role_spawner().spawn(async move {
76 loop {
77 let Ok(packet) = local_queue_rx.recv().await else {
78 break;
79 };
80
81 let Ok(packet_header) =
85 mproto::decode_value::<modrpc::TransmitPacket>(packet.as_ref())
86 else {
87 continue;
88 };
89
90 let stream_item_offset = modrpc::TransmitPacket::BASE_LEN;
91 let source = packet_header.source;
92
93 let Ok(stream_item) = mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(
95 &packet.as_ref()[stream_item_offset..],
96 ) else {
97 continue;
98 };
99
100 let Ok(payload) = stream_item.payload() else {
101 continue;
102 };
103
104 handler.call(source.clone(), payload).await;
106 }
107 });
108
109 self.stubs
110 .item
111 .inline_untyped(setup, move |_source, packet| {
112 let seq = {
113 let Ok(stream_item) =
114 mproto::decode_value::<StreamItemLazy<HandlerInputOwned<H>>>(&packet)
115 else {
116 return;
117 };
118
119 let Ok(seq) = stream_item.seq() else {
120 return;
121 };
122
123 seq
124 };
125
126 heap.push(Reverse(Item {
128 seq,
129 packet: packet.clone(),
130 }));
131
132 let mut next_seq = 0;
133 while let Some(Reverse(stream_item)) = heap.peek() {
134 if stream_item.seq != next_seq {
135 break;
136 }
137 next_seq += 1;
138
139 let Reverse(stream_item) = heap.pop().unwrap();
141
142 let _ = local_queue_tx.try_send(stream_item.packet);
143 }
144 })
145 .subscribe();
147 }
148}