1use std::sync::Arc;
4
5use tokio::sync::mpsc;
6
7use ecksport_core::{
8 frame::{self, MsgFlags},
9 peer::PeerData,
10 topic,
11};
12
13use crate::{
14 errors::Error,
15 event::{PushFlags, WorkerCommand},
16 shared_state::ChanSharedState,
17};
18
19pub struct InbMsg {
20 flags: PushFlags,
21 payload: Vec<u8>,
22}
23
24impl InbMsg {
25 pub(super) fn new(flags: PushFlags, payload: Vec<u8>) -> Self {
26 Self { flags, payload }
27 }
28
29 pub fn flags(&self) -> &PushFlags {
30 &self.flags
31 }
32
33 pub fn payload(&self) -> &[u8] {
34 &self.payload
35 }
36
37 pub fn into_payload(self) -> Vec<u8> {
38 self.payload
39 }
40}
41
42pub struct OutbMsg {
43 id: u32,
44 payload: Vec<u8>,
45 flags: frame::MsgFlags,
46}
47
48impl OutbMsg {
49 fn new(id: u32, payload: Vec<u8>, flags: MsgFlags) -> Self {
50 Self { id, payload, flags }
51 }
52
53 pub fn id(&self) -> u32 {
54 self.id
55 }
56
57 pub fn payload(&self) -> &[u8] {
58 &self.payload
59 }
60
61 pub fn into_payload(self) -> Vec<u8> {
62 self.payload
63 }
64
65 pub fn flags(&self) -> &MsgFlags {
66 &self.flags
67 }
68
69 pub fn close(&self) -> bool {
70 self.flags.close
71 }
72}
73
74pub struct ChannelHandle {
76 id: u32,
78
79 peer_data: PeerData,
81
82 shared: Arc<ChanSharedState>,
84
85 ch_inb_rx: mpsc::Receiver<Result<InbMsg, Error>>,
87
88 worker_cmd_tx: Option<mpsc::Sender<WorkerCommand>>,
90}
91
92impl ChannelHandle {
93 pub(crate) fn new(
94 id: u32,
95 peer_data: PeerData,
96 shared: Arc<ChanSharedState>,
97 ch_inb_rx: mpsc::Receiver<Result<InbMsg, Error>>,
98 worker_cmd_tx: mpsc::Sender<WorkerCommand>,
99 ) -> Self {
100 Self {
101 id,
102 peer_data,
103 shared,
104 ch_inb_rx,
105 worker_cmd_tx: Some(worker_cmd_tx),
106 }
107 }
108
109 pub fn chan_id(&self) -> u32 {
110 self.id
111 }
112
113 pub fn peer_data(&self) -> &PeerData {
114 &self.peer_data
115 }
116
117 pub fn protocol(&self) -> topic::Topic {
118 self.shared.protocol()
119 }
120
121 pub fn topic(&self) -> topic::Topic {
122 self.shared.topic()
123 }
124
125 pub async fn recv_msg(&mut self) -> Result<Option<InbMsg>, Error> {
127 self.ch_inb_rx.recv().await.transpose()
128 }
129
130 async fn submit_command(&self, cmd: WorkerCommand) -> Result<(), Error> {
131 if let Some(tx) = &self.worker_cmd_tx {
132 if tx.send(cmd).await.is_err() {
133 return Err(Error::ConnWorkerExit);
134 }
135
136 Ok(())
137 } else {
138 Err(Error::SendOnClosedChan(self.id))
139 }
140 }
141
142 pub async fn send_payload(&mut self, payload: Vec<u8>, flags: MsgFlags) -> Result<(), Error> {
144 let msg = OutbMsg::new(self.id, payload, flags);
145 self.submit_command(WorkerCommand::SendMsg(msg)).await?;
146
147 if flags.close {
148 self.worker_cmd_tx = None;
149 }
150
151 Ok(())
152 }
153
154 pub async fn send(&mut self, payload: Vec<u8>) -> Result<(), Error> {
156 self.send_payload(payload, MsgFlags::none()).await
157 }
158
159 pub async fn send_and_close(&mut self, payload: Vec<u8>) -> Result<(), Error> {
162 self.send_payload(payload, MsgFlags::close()).await
163 }
164
165 pub async fn close_with_err(&mut self, payload: Vec<u8>) -> Result<(), Error> {
168 let mut flags = MsgFlags::close();
169 flags.err = true;
170 self.send_payload(payload, flags).await
171 }
172
173 pub async fn close(&mut self) -> Result<(), Error> {
176 self.submit_command(WorkerCommand::CloseChannel(self.id))
177 .await?;
178 Ok(())
179 }
180}
181
182impl Drop for ChannelHandle {
183 fn drop(&mut self) {
184 self.shared.set_dropped();
185 }
186}