std_modrpc/role_impls/
multi_stream_sender.rs

1use crate::proto::{
2    MultiStreamId, MultiStreamInitState, MultiStreamItem, MultiStreamItemGen,
3    MultiStreamSenderConfig,
4};
5use core::cell::Cell;
6use modrpc::RoleSetup;
7
8pub struct SendMultiStream<T> {
9    stream_id: MultiStreamId,
10    next_seq: Cell<u64>,
11    item_tx: modrpc::EventTx<MultiStreamItem<T>>,
12}
13
14pub struct MultiStreamSender<T> {
15    hooks: crate::MultiStreamSenderHooks<T>,
16}
17
18pub struct MultiStreamSenderBuilder<T> {
19    hooks: crate::MultiStreamSenderHooks<T>,
20}
21
22impl<T: mproto::Owned> MultiStreamSenderBuilder<T> {
23    pub fn new(
24        _name: &'static str,
25        hooks: crate::MultiStreamSenderHooks<T>,
26        _stubs: crate::MultiStreamSenderStubs<T>,
27        _config: &MultiStreamSenderConfig,
28        _init: MultiStreamInitState,
29    ) -> Self {
30        Self { hooks }
31    }
32
33    pub fn create_handle(&self, _setup: &RoleSetup) -> MultiStreamSender<T> {
34        MultiStreamSender {
35            hooks: self.hooks.clone(),
36        }
37    }
38
39    pub fn build(self, _setup: &RoleSetup) {}
40}
41
42impl<T: mproto::Owned> MultiStreamSender<T> {
43    pub fn new_stream(&self, stream_id: MultiStreamId) -> SendMultiStream<T> {
44        SendMultiStream {
45            stream_id,
46            next_seq: Cell::new(0),
47            item_tx: self.hooks.item.clone(),
48        }
49    }
50}
51
52impl<T> Clone for MultiStreamSender<T> {
53    fn clone(&self) -> Self {
54        Self {
55            hooks: self.hooks.clone(),
56        }
57    }
58}
59
60impl<T: mproto::Owned> SendMultiStream<T> {
61    pub fn stream_id(&self) -> MultiStreamId {
62        self.stream_id.clone()
63    }
64
65    pub fn try_send(&self, input: impl mproto::Encode + mproto::Compatible<T>) -> bool {
66        let seq = self.next_seq.replace(self.next_seq.get() + 1);
67
68        self.item_tx.try_send(MultiStreamItemGen {
69            stream_id: self.stream_id.clone(),
70            seq,
71            payload: Some(input),
72        })
73    }
74
75    pub async fn send(&self, input: impl mproto::Encode + mproto::Compatible<T>) {
76        let seq = self.next_seq.replace(self.next_seq.get() + 1);
77
78        self.item_tx
79            .send(MultiStreamItemGen {
80                stream_id: self.stream_id.clone(),
81                seq,
82                payload: Some(input),
83            })
84            .await;
85    }
86
87    pub async fn end(self) {
88        let seq = self.next_seq.replace(self.next_seq.get() + 1);
89
90        self.item_tx
91            .send(MultiStreamItemGen {
92                stream_id: self.stream_id.clone(),
93                seq,
94                payload: None::<T>,
95            })
96            .await;
97    }
98}
99
100// Helpers to play nice with type inference for the fairly common situation where the item type
101// is a `Result`.
102impl<O: mproto::Owned, E: mproto::Owned> SendMultiStream<Result<O, E>> {
103    pub async fn send_ok(&mut self, input: impl mproto::Encode + mproto::Compatible<O>) {
104        let seq = self.next_seq.replace(self.next_seq.get() + 1);
105
106        self.item_tx
107            .send(MultiStreamItemGen {
108                stream_id: self.stream_id.clone(),
109                seq,
110                payload: Some(Ok::<_, E>(input)),
111            })
112            .await;
113    }
114
115    pub async fn send_err(&mut self, input: impl mproto::Encode + mproto::Compatible<E>) {
116        let seq = self.next_seq.replace(self.next_seq.get() + 1);
117
118        self.item_tx
119            .send(MultiStreamItemGen {
120                stream_id: self.stream_id.clone(),
121                seq,
122                payload: Some(Err::<O, _>(input)),
123            })
124            .await;
125    }
126}