std_modrpc/role_impls/
multi_stream_sender.rs

1use core::cell::Cell;
2use crate::proto::{
3    MultiStreamId,
4    MultiStreamInitState,
5    MultiStreamItem,
6    MultiStreamItemGen,
7    MultiStreamSenderConfig,
8};
9use modrpc::RoleSetup;
10
11pub struct SendMultiStream<T> {
12    stream_id: MultiStreamId,
13    next_seq: Cell<u64>,
14    item_tx: modrpc::EventTx<MultiStreamItem<T>>,
15}
16
17pub struct MultiStreamSender<T> {
18    hooks: crate::MultiStreamSenderHooks<T>,
19}
20
21pub struct MultiStreamSenderBuilder<T> {
22    hooks: crate::MultiStreamSenderHooks<T>,
23}
24
25impl<T: mproto::Owned> MultiStreamSenderBuilder<T> {
26    pub fn new(
27        _name: &'static str,
28        hooks: crate::MultiStreamSenderHooks<T>,
29        _stubs: crate::MultiStreamSenderStubs<T>,
30        _config: &MultiStreamSenderConfig,
31        _init: MultiStreamInitState,
32    ) -> Self {
33        Self { hooks }
34    }
35
36    pub fn create_handle(
37        &self,
38        _setup: &RoleSetup,
39    ) -> MultiStreamSender<T> {
40        MultiStreamSender {
41            hooks: self.hooks.clone(),
42        }
43    }
44
45    pub fn build(
46        self,
47        _setup: &RoleSetup,
48    ) {
49    }
50}
51
52impl<T: mproto::Owned> MultiStreamSender<T> {
53    pub fn new_stream(&self, stream_id: MultiStreamId) -> SendMultiStream<T> {
54        SendMultiStream {
55            stream_id,
56            next_seq: Cell::new(0),
57            item_tx: self.hooks.item.clone(),
58        }
59    }
60}
61
62impl<T> Clone for MultiStreamSender<T> {
63    fn clone(&self) -> Self {
64        Self {
65            hooks: self.hooks.clone(),
66        }
67    }
68}
69
70impl<T: mproto::Owned> SendMultiStream<T> {
71    pub fn stream_id(&self) -> MultiStreamId {
72        self.stream_id.clone()
73    }
74
75    pub fn try_send(
76        &self,
77        input: impl mproto::Encode + mproto::Compatible<T>,
78    ) -> bool {
79        let seq = self.next_seq.replace(self.next_seq.get() + 1);
80
81        self.item_tx.try_send(MultiStreamItemGen {
82            stream_id: self.stream_id.clone(),
83            seq,
84            payload: Some(input),
85        })
86    }
87
88    pub async fn send(
89        &self,
90        input: impl mproto::Encode + mproto::Compatible<T>,
91    ) {
92        let seq = self.next_seq.replace(self.next_seq.get() + 1);
93
94        self.item_tx.send(MultiStreamItemGen {
95            stream_id: self.stream_id.clone(),
96            seq,
97            payload: Some(input),
98        })
99        .await;
100    }
101
102    pub async fn end(self) {
103        let seq = self.next_seq.replace(self.next_seq.get() + 1);
104
105        self.item_tx.send(MultiStreamItemGen {
106            stream_id: self.stream_id.clone(),
107            seq,
108            payload: None::<T>,
109        })
110        .await;
111    }
112}
113
114// Helpers to play nice with type inference for the fairly common situation where the item type
115// is a `Result`.
116impl<O: mproto::Owned, E: mproto::Owned> SendMultiStream<Result<O, E>> {
117    pub async fn send_ok(&mut self, input: impl mproto::Encode + mproto::Compatible<O>) {
118        let seq = self.next_seq.replace(self.next_seq.get() + 1);
119
120        self.item_tx.send(MultiStreamItemGen {
121            stream_id: self.stream_id.clone(),
122            seq,
123            payload: Some(Ok::<_, E>(input)),
124        })
125        .await;
126    }
127
128    pub async fn send_err(&mut self, input: impl mproto::Encode + mproto::Compatible<E>) {
129        let seq = self.next_seq.replace(self.next_seq.get() + 1);
130
131        self.item_tx.send(MultiStreamItemGen {
132            stream_id: self.stream_id.clone(),
133            seq,
134            payload: Some(Err::<O, _>(input)),
135        })
136        .await;
137    }
138}