std_modrpc/role_impls/
multi_stream_sender.rs1use crate::proto::{
2 MultiStreamId, MultiStreamInitState, MultiStreamItem, MultiStreamItemGen,
3 MultiStreamSenderConfig,
4};
5use core::cell::Cell;
6use modrpc::RoleSetup;
7
8pub struct SendMultiStream<T> {
9 stream_id: MultiStreamId,
10 next_seq: Cell<u64>,
11 item_tx: modrpc::EventTx<MultiStreamItem<T>>,
12}
13
14pub struct MultiStreamSender<T> {
15 hooks: crate::MultiStreamSenderHooks<T>,
16}
17
18pub struct MultiStreamSenderBuilder<T> {
19 hooks: crate::MultiStreamSenderHooks<T>,
20}
21
22impl<T: mproto::Owned> MultiStreamSenderBuilder<T> {
23 pub fn new(
24 _name: &'static str,
25 hooks: crate::MultiStreamSenderHooks<T>,
26 _stubs: crate::MultiStreamSenderStubs<T>,
27 _config: &MultiStreamSenderConfig,
28 _init: MultiStreamInitState,
29 ) -> Self {
30 Self { hooks }
31 }
32
33 pub fn create_handle(&self, _setup: &RoleSetup) -> MultiStreamSender<T> {
34 MultiStreamSender {
35 hooks: self.hooks.clone(),
36 }
37 }
38
39 pub fn build(self, _setup: &RoleSetup) {}
40}
41
42impl<T: mproto::Owned> MultiStreamSender<T> {
43 pub fn new_stream(&self, stream_id: MultiStreamId) -> SendMultiStream<T> {
44 SendMultiStream {
45 stream_id,
46 next_seq: Cell::new(0),
47 item_tx: self.hooks.item.clone(),
48 }
49 }
50}
51
52impl<T> Clone for MultiStreamSender<T> {
53 fn clone(&self) -> Self {
54 Self {
55 hooks: self.hooks.clone(),
56 }
57 }
58}
59
60impl<T: mproto::Owned> SendMultiStream<T> {
61 pub fn stream_id(&self) -> MultiStreamId {
62 self.stream_id.clone()
63 }
64
65 pub fn try_send(&self, input: impl mproto::Encode + mproto::Compatible<T>) -> bool {
66 let seq = self.next_seq.replace(self.next_seq.get() + 1);
67
68 self.item_tx.try_send(MultiStreamItemGen {
69 stream_id: self.stream_id.clone(),
70 seq,
71 payload: Some(input),
72 })
73 }
74
75 pub async fn send(&self, input: impl mproto::Encode + mproto::Compatible<T>) {
76 let seq = self.next_seq.replace(self.next_seq.get() + 1);
77
78 self.item_tx
79 .send(MultiStreamItemGen {
80 stream_id: self.stream_id.clone(),
81 seq,
82 payload: Some(input),
83 })
84 .await;
85 }
86
87 pub async fn end(self) {
88 let seq = self.next_seq.replace(self.next_seq.get() + 1);
89
90 self.item_tx
91 .send(MultiStreamItemGen {
92 stream_id: self.stream_id.clone(),
93 seq,
94 payload: None::<T>,
95 })
96 .await;
97 }
98}
99
100impl<O: mproto::Owned, E: mproto::Owned> SendMultiStream<Result<O, E>> {
103 pub async fn send_ok(&mut self, input: impl mproto::Encode + mproto::Compatible<O>) {
104 let seq = self.next_seq.replace(self.next_seq.get() + 1);
105
106 self.item_tx
107 .send(MultiStreamItemGen {
108 stream_id: self.stream_id.clone(),
109 seq,
110 payload: Some(Ok::<_, E>(input)),
111 })
112 .await;
113 }
114
115 pub async fn send_err(&mut self, input: impl mproto::Encode + mproto::Compatible<E>) {
116 let seq = self.next_seq.replace(self.next_seq.get() + 1);
117
118 self.item_tx
119 .send(MultiStreamItemGen {
120 stream_id: self.stream_id.clone(),
121 seq,
122 payload: Some(Err::<O, _>(input)),
123 })
124 .await;
125 }
126}