ecksport_net/
channel.rs

1//! Channel handles.
2
3use std::sync::Arc;
4
5use tokio::sync::mpsc;
6
7use ecksport_core::{
8    frame::{self, MsgFlags},
9    peer::PeerData,
10    topic,
11};
12
13use crate::{
14    errors::Error,
15    event::{PushFlags, WorkerCommand},
16    shared_state::ChanSharedState,
17};
18
19pub struct InbMsg {
20    flags: PushFlags,
21    payload: Vec<u8>,
22}
23
24impl InbMsg {
25    pub(super) fn new(flags: PushFlags, payload: Vec<u8>) -> Self {
26        Self { flags, payload }
27    }
28
29    pub fn flags(&self) -> &PushFlags {
30        &self.flags
31    }
32
33    pub fn payload(&self) -> &[u8] {
34        &self.payload
35    }
36
37    pub fn into_payload(self) -> Vec<u8> {
38        self.payload
39    }
40}
41
42pub struct OutbMsg {
43    id: u32,
44    payload: Vec<u8>,
45    flags: frame::MsgFlags,
46}
47
48impl OutbMsg {
49    fn new(id: u32, payload: Vec<u8>, flags: MsgFlags) -> Self {
50        Self { id, payload, flags }
51    }
52
53    pub fn id(&self) -> u32 {
54        self.id
55    }
56
57    pub fn payload(&self) -> &[u8] {
58        &self.payload
59    }
60
61    pub fn into_payload(self) -> Vec<u8> {
62        self.payload
63    }
64
65    pub fn flags(&self) -> &MsgFlags {
66        &self.flags
67    }
68
69    pub fn close(&self) -> bool {
70        self.flags.close
71    }
72}
73
74/// Exclusive handle to a channel.
75pub struct ChannelHandle {
76    /// Channel ID within the connection.
77    id: u32,
78
79    /// Whatever data we know about the peer.
80    peer_data: PeerData,
81
82    /// State shared between the channel and the corresponding worker.
83    shared: Arc<ChanSharedState>,
84
85    /// Inbound messages.
86    ch_inb_rx: mpsc::Receiver<Result<InbMsg, Error>>,
87
88    /// Outbound messages, dropped to close the channel.
89    worker_cmd_tx: Option<mpsc::Sender<WorkerCommand>>,
90}
91
92impl ChannelHandle {
93    pub(crate) fn new(
94        id: u32,
95        peer_data: PeerData,
96        shared: Arc<ChanSharedState>,
97        ch_inb_rx: mpsc::Receiver<Result<InbMsg, Error>>,
98        worker_cmd_tx: mpsc::Sender<WorkerCommand>,
99    ) -> Self {
100        Self {
101            id,
102            peer_data,
103            shared,
104            ch_inb_rx,
105            worker_cmd_tx: Some(worker_cmd_tx),
106        }
107    }
108
109    pub fn chan_id(&self) -> u32 {
110        self.id
111    }
112
113    pub fn peer_data(&self) -> &PeerData {
114        &self.peer_data
115    }
116
117    pub fn protocol(&self) -> topic::Topic {
118        self.shared.protocol()
119    }
120
121    pub fn topic(&self) -> topic::Topic {
122        self.shared.topic()
123    }
124
125    /// Receives an inbound message.
126    pub async fn recv_msg(&mut self) -> Result<Option<InbMsg>, Error> {
127        self.ch_inb_rx.recv().await.transpose()
128    }
129
130    async fn submit_command(&self, cmd: WorkerCommand) -> Result<(), Error> {
131        if let Some(tx) = &self.worker_cmd_tx {
132            if tx.send(cmd).await.is_err() {
133                return Err(Error::ConnWorkerExit);
134            }
135
136            Ok(())
137        } else {
138            Err(Error::SendOnClosedChan(self.id))
139        }
140    }
141
142    /// Sends an exact payload and optionally closes the channel.
143    pub async fn send_payload(&mut self, payload: Vec<u8>, flags: MsgFlags) -> Result<(), Error> {
144        let msg = OutbMsg::new(self.id, payload, flags);
145        self.submit_command(WorkerCommand::SendMsg(msg)).await?;
146
147        if flags.close {
148            self.worker_cmd_tx = None;
149        }
150
151        Ok(())
152    }
153
154    /// Sends a message.
155    pub async fn send(&mut self, payload: Vec<u8>) -> Result<(), Error> {
156        self.send_payload(payload, MsgFlags::none()).await
157    }
158
159    /// Sends a message and closes the local side of the channel.  New messages
160    /// may still be received until the remote side closes their side too.
161    pub async fn send_and_close(&mut self, payload: Vec<u8>) -> Result<(), Error> {
162        self.send_payload(payload, MsgFlags::close()).await
163    }
164
165    /// Sends an err message and close, new messages may still be received as
166    /// with `send_and_close`.
167    pub async fn close_with_err(&mut self, payload: Vec<u8>) -> Result<(), Error> {
168        let mut flags = MsgFlags::close();
169        flags.err = true;
170        self.send_payload(payload, flags).await
171    }
172
173    /// Closes the local side of the channel.  New messages may still be
174    /// received until the remote side closes their side too.
175    pub async fn close(&mut self) -> Result<(), Error> {
176        self.submit_command(WorkerCommand::CloseChannel(self.id))
177            .await?;
178        Ok(())
179    }
180}
181
182impl Drop for ChannelHandle {
183    fn drop(&mut self) {
184        self.shared.set_dropped();
185    }
186}