std_modrpc/role_impls/
multi_stream_sender.rs1use core::cell::Cell;
2use crate::proto::{
3 MultiStreamId,
4 MultiStreamInitState,
5 MultiStreamItem,
6 MultiStreamItemGen,
7 MultiStreamSenderConfig,
8};
9use modrpc::RoleSetup;
10
11pub struct SendMultiStream<T> {
12 stream_id: MultiStreamId,
13 next_seq: Cell<u64>,
14 item_tx: modrpc::EventTx<MultiStreamItem<T>>,
15}
16
17pub struct MultiStreamSender<T> {
18 hooks: crate::MultiStreamSenderHooks<T>,
19}
20
21pub struct MultiStreamSenderBuilder<T> {
22 hooks: crate::MultiStreamSenderHooks<T>,
23}
24
25impl<T: mproto::Owned> MultiStreamSenderBuilder<T> {
26 pub fn new(
27 _name: &'static str,
28 hooks: crate::MultiStreamSenderHooks<T>,
29 _stubs: crate::MultiStreamSenderStubs<T>,
30 _config: &MultiStreamSenderConfig,
31 _init: MultiStreamInitState,
32 ) -> Self {
33 Self { hooks }
34 }
35
36 pub fn create_handle(
37 &self,
38 _setup: &RoleSetup,
39 ) -> MultiStreamSender<T> {
40 MultiStreamSender {
41 hooks: self.hooks.clone(),
42 }
43 }
44
45 pub fn build(
46 self,
47 _setup: &RoleSetup,
48 ) {
49 }
50}
51
52impl<T: mproto::Owned> MultiStreamSender<T> {
53 pub fn new_stream(&self, stream_id: MultiStreamId) -> SendMultiStream<T> {
54 SendMultiStream {
55 stream_id,
56 next_seq: Cell::new(0),
57 item_tx: self.hooks.item.clone(),
58 }
59 }
60}
61
62impl<T> Clone for MultiStreamSender<T> {
63 fn clone(&self) -> Self {
64 Self {
65 hooks: self.hooks.clone(),
66 }
67 }
68}
69
70impl<T: mproto::Owned> SendMultiStream<T> {
71 pub fn stream_id(&self) -> MultiStreamId {
72 self.stream_id.clone()
73 }
74
75 pub fn try_send(
76 &self,
77 input: impl mproto::Encode + mproto::Compatible<T>,
78 ) -> bool {
79 let seq = self.next_seq.replace(self.next_seq.get() + 1);
80
81 self.item_tx.try_send(MultiStreamItemGen {
82 stream_id: self.stream_id.clone(),
83 seq,
84 payload: Some(input),
85 })
86 }
87
88 pub async fn send(
89 &self,
90 input: impl mproto::Encode + mproto::Compatible<T>,
91 ) {
92 let seq = self.next_seq.replace(self.next_seq.get() + 1);
93
94 self.item_tx.send(MultiStreamItemGen {
95 stream_id: self.stream_id.clone(),
96 seq,
97 payload: Some(input),
98 })
99 .await;
100 }
101
102 pub async fn end(self) {
103 let seq = self.next_seq.replace(self.next_seq.get() + 1);
104
105 self.item_tx.send(MultiStreamItemGen {
106 stream_id: self.stream_id.clone(),
107 seq,
108 payload: None::<T>,
109 })
110 .await;
111 }
112}
113
114impl<O: mproto::Owned, E: mproto::Owned> SendMultiStream<Result<O, E>> {
117 pub async fn send_ok(&mut self, input: impl mproto::Encode + mproto::Compatible<O>) {
118 let seq = self.next_seq.replace(self.next_seq.get() + 1);
119
120 self.item_tx.send(MultiStreamItemGen {
121 stream_id: self.stream_id.clone(),
122 seq,
123 payload: Some(Ok::<_, E>(input)),
124 })
125 .await;
126 }
127
128 pub async fn send_err(&mut self, input: impl mproto::Encode + mproto::Compatible<E>) {
129 let seq = self.next_seq.replace(self.next_seq.get() + 1);
130
131 self.item_tx.send(MultiStreamItemGen {
132 stream_id: self.stream_id.clone(),
133 seq,
134 payload: Some(Err::<O, _>(input)),
135 })
136 .await;
137 }
138}