1use bytes::Bytes;
2use futures::{
3 Future, FutureExt, future, ready,
4 sink::Sink,
5 task::{Context, Poll},
6};
7use std::{
8 error::Error,
9 fmt,
10 mem::size_of,
11 pin::Pin,
12 sync::{
13 Arc, Weak,
14 atomic::{AtomicBool, Ordering},
15 },
16};
17use tokio::sync::{Mutex, mpsc, oneshot};
18use tokio_util::sync::ReusableBoxFuture;
19
20use super::{
21 AnyStorage, Connect, ConnectError, PortAllocator, PortReq,
22 client::ConnectResponse,
23 credit::{AssignedCredits, CreditUser},
24 mux::PortEvt,
25};
26use crate::exec;
27
28#[derive(Debug, Clone)]
30#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
31pub enum SendError {
32 ChMux,
34 Closed {
36 gracefully: bool,
38 },
39}
40
41impl SendError {
42 pub fn is_closed(&self) -> bool {
44 matches!(self, Self::Closed { gracefully: true })
45 }
46
47 #[deprecated = "a chmux::SendError is always due to disconnection"]
49 pub fn is_disconnected(&self) -> bool {
50 true
51 }
52
53 #[deprecated = "a remoc::chmux::SendError is always final"]
55 pub fn is_final(&self) -> bool {
56 true
57 }
58}
59
60impl fmt::Display for SendError {
61 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62 match self {
63 Self::ChMux => write!(f, "multiplexer terminated"),
64 Self::Closed { gracefully } => write!(
65 f,
66 "remote endpoint closed channel{}",
67 if *gracefully { " but still processes sent messages" } else { "" }
68 ),
69 }
70 }
71}
72
73impl Error for SendError {}
74
75impl<T> From<mpsc::error::SendError<T>> for SendError {
76 fn from(_err: mpsc::error::SendError<T>) -> Self {
77 Self::ChMux
78 }
79}
80
81impl From<SendError> for std::io::Error {
82 fn from(err: SendError) -> Self {
83 use std::io::ErrorKind;
84 match err {
85 SendError::ChMux => Self::new(ErrorKind::ConnectionReset, err.to_string()),
86 SendError::Closed { gracefully: false } => Self::new(ErrorKind::ConnectionReset, err.to_string()),
87 SendError::Closed { gracefully: true } => Self::new(ErrorKind::ConnectionAborted, err.to_string()),
88 }
89 }
90}
91
92#[derive(Debug)]
94pub enum TrySendError {
95 Full,
99 Send(SendError),
101}
102
103impl TrySendError {
104 pub fn is_closed(&self) -> bool {
106 match self {
107 Self::Full => false,
108 Self::Send(err) => err.is_closed(),
109 }
110 }
111
112 pub fn is_final(&self) -> bool {
114 match self {
115 Self::Full => false,
116 Self::Send(_) => true,
117 }
118 }
119}
120
121impl fmt::Display for TrySendError {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 match self {
124 Self::Full => write!(f, "channel queue is full"),
125 Self::Send(err) => write!(f, "{err}"),
126 }
127 }
128}
129
130impl From<SendError> for TrySendError {
131 fn from(err: SendError) -> Self {
132 Self::Send(err)
133 }
134}
135
136impl From<mpsc::error::TrySendError<PortEvt>> for TrySendError {
137 fn from(err: mpsc::error::TrySendError<PortEvt>) -> Self {
138 match err {
139 mpsc::error::TrySendError::Full(_) => Self::Full,
140 mpsc::error::TrySendError::Closed(_) => Self::Send(SendError::ChMux),
141 }
142 }
143}
144
145impl Error for TrySendError {}
146
147pub struct Closed {
152 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
153}
154
155impl fmt::Debug for Closed {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 f.debug_tuple("Closed").finish()
158 }
159}
160
161impl Closed {
162 fn new(hangup_notify: &Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>) -> Self {
163 match hangup_notify.upgrade() {
164 Some(hangup_notify) => {
165 if let Some(notifiers) = hangup_notify.lock().unwrap().as_mut() {
166 let (tx, rx) = oneshot::channel();
167 notifiers.push(tx);
168 Self {
169 fut: async move {
170 let _ = rx.await;
171 }
172 .boxed(),
173 }
174 } else {
175 Self { fut: future::ready(()).boxed() }
176 }
177 }
178 _ => Self { fut: future::ready(()).boxed() },
179 }
180 }
181}
182
183impl Future for Closed {
184 type Output = ();
185 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
186 self.fut.as_mut().poll(cx)
187 }
188}
189
190pub struct Sender {
192 local_port: u32,
193 remote_port: u32,
194 chunk_size: usize,
195 max_data_size: usize,
196 tx: mpsc::Sender<PortEvt>,
197 credits: CreditUser,
198 hangup_recved: Weak<AtomicBool>,
199 hangup_notify: Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>,
200 port_allocator: PortAllocator,
201 storage: AnyStorage,
202 _drop_tx: oneshot::Sender<()>,
203}
204
205impl fmt::Debug for Sender {
206 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
207 f.debug_struct("Sender")
208 .field("local_port", &self.local_port)
209 .field("remote_port", &self.remote_port)
210 .field("chunk_size", &self.chunk_size)
211 .field("max_data_size", &self.max_data_size)
212 .field("is_closed", &self.is_closed())
213 .finish()
214 }
215}
216
217impl Sender {
218 #[allow(clippy::too_many_arguments)]
220 pub(crate) fn new(
221 local_port: u32, remote_port: u32, chunk_size: usize, max_data_size: usize, tx: mpsc::Sender<PortEvt>,
222 credits: CreditUser, hangup_recved: Weak<AtomicBool>,
223 hangup_notify: Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>, port_allocator: PortAllocator,
224 storage: AnyStorage,
225 ) -> Self {
226 let (_drop_tx, drop_rx) = oneshot::channel();
227 let tx_drop = tx.clone();
228 exec::spawn(async move {
229 let _ = drop_rx.await;
230 let _ = tx_drop.send(PortEvt::SenderDropped { local_port }).await;
231 });
232
233 Self {
234 local_port,
235 remote_port,
236 chunk_size,
237 max_data_size,
238 tx,
239 credits,
240 hangup_recved,
241 hangup_notify,
242 port_allocator,
243 storage,
244 _drop_tx,
245 }
246 }
247
248 pub fn local_port(&self) -> u32 {
250 self.local_port
251 }
252
253 pub fn remote_port(&self) -> u32 {
255 self.remote_port
256 }
257
258 pub fn chunk_size(&self) -> usize {
262 self.chunk_size
263 }
264
265 pub fn max_data_size(&self) -> usize {
270 self.max_data_size
271 }
272
273 pub async fn send(&mut self, mut data: Bytes) -> Result<(), SendError> {
281 if data.is_empty() {
282 let mut credits = self.credits.request(1, 1).await?;
283 credits.take(1);
284
285 let msg = PortEvt::SendData { remote_port: self.remote_port, data, first: true, last: true };
286 self.tx.send(msg).await?;
287 } else {
288 let mut first = true;
289 let mut credits = AssignedCredits::default();
290
291 while !data.is_empty() {
292 if credits.is_empty() {
293 credits = self.credits.request(data.len().min(u32::MAX as usize) as u32, 1).await?;
294 }
295
296 let at = data.len().min(self.chunk_size).min(credits.available() as usize);
297 let chunk = data.split_to(at);
298
299 credits.take(chunk.len() as u32);
300
301 let msg = PortEvt::SendData {
302 remote_port: self.remote_port,
303 data: chunk,
304 first,
305 last: data.is_empty(),
306 };
307 self.tx.send(msg).await?;
308
309 first = false;
310 }
311 }
312
313 Ok(())
314 }
315
316 pub fn send_chunks(&mut self) -> ChunkSender<'_> {
318 ChunkSender { sender: self, credits: AssignedCredits::default(), first: true }
319 }
320
321 pub fn try_send(&mut self, data: &Bytes) -> Result<(), TrySendError> {
327 let mut data = data.clone();
328
329 if data.is_empty() {
330 match self.credits.try_request(1)? {
331 Some(mut credits) => {
332 credits.take(1);
333 let msg = PortEvt::SendData { remote_port: self.remote_port, data, first: true, last: true };
334 self.tx.try_send(msg)?;
335 Ok(())
336 }
337 None => Err(TrySendError::Full),
338 }
339 } else {
340 match self.credits.try_request(data.len().min(u32::MAX as usize) as u32)? {
341 Some(mut credits) => {
342 let mut first = true;
343 while !data.is_empty() {
344 let at = data.len().min(self.chunk_size);
345 let chunk = data.split_to(at);
346
347 credits.take(chunk.len() as u32);
348
349 let msg = PortEvt::SendData {
350 remote_port: self.remote_port,
351 data: chunk,
352 first,
353 last: data.is_empty(),
354 };
355 self.tx.try_send(msg)?;
356
357 first = false;
358 }
359 Ok(())
360 }
361 None => Err(TrySendError::Full),
362 }
363 }
364 }
365
366 pub async fn connect(&mut self, ports: Vec<PortReq>, wait: bool) -> Result<Vec<Connect>, SendError> {
371 let mut ports_response = Vec::new();
372 let mut sent_txs = Vec::new();
373 let mut connects = Vec::new();
374
375 for port in ports {
376 let (response_tx, response_rx) = oneshot::channel();
377 ports_response.push((port, response_tx));
378
379 let response = exec::spawn(async move {
380 match response_rx.await {
381 Ok(ConnectResponse::Accepted(sender, receiver)) => Ok((sender, receiver)),
382 Ok(ConnectResponse::Rejected { no_ports }) => {
383 if no_ports {
384 Err(ConnectError::RemotePortsExhausted)
385 } else {
386 Err(ConnectError::Rejected)
387 }
388 }
389 Err(_) => Err(ConnectError::ChMux),
390 }
391 });
392
393 let (sent_tx, sent_rx) = mpsc::channel(1);
394 sent_txs.push(sent_tx);
395
396 connects.push(Connect { sent_rx, response });
397 }
398
399 let mut first = true;
400 let mut credits = AssignedCredits::default();
401
402 while !ports_response.is_empty() {
403 if credits.is_empty() {
404 let data_len = ports_response.len() * size_of::<u32>();
405 credits =
406 self.credits.request(data_len.min(u32::MAX as usize) as u32, size_of::<u32>() as u32).await?;
407 }
408
409 let max_ports = self.chunk_size.min(credits.available() as usize) / size_of::<u32>();
410 let next =
411 if ports_response.len() > max_ports { ports_response.split_off(max_ports) } else { Vec::new() };
412
413 credits.take((ports_response.len() * size_of::<u32>()) as u32);
414
415 let msg = PortEvt::SendPorts {
416 remote_port: self.remote_port,
417 first,
418 last: next.is_empty(),
419 wait,
420 ports: ports_response,
421 };
422 self.tx.send(msg).await?;
423
424 ports_response = next;
425 first = false;
426 }
427
428 Ok(connects)
429 }
430
431 pub fn is_closed(&self) -> bool {
433 self.hangup_recved.upgrade().map(|hr| hr.load(Ordering::Relaxed)).unwrap_or_default()
434 }
435
436 pub fn closed(&self) -> Closed {
438 Closed::new(&self.hangup_notify)
439 }
440
441 pub fn is_graceful_close_overridden(&self) -> bool {
448 self.credits.override_graceful_close
449 }
450
451 pub fn set_override_graceful_close(&mut self, override_graceful_close: bool) {
456 self.credits.override_graceful_close = override_graceful_close;
457 }
458
459 pub fn into_sink(self) -> SenderSink {
461 SenderSink::new(self)
462 }
463
464 pub fn port_allocator(&self) -> PortAllocator {
466 self.port_allocator.clone()
467 }
468
469 pub fn storage(&self) -> AnyStorage {
471 self.storage.clone()
472 }
473}
474
475impl Drop for Sender {
476 fn drop(&mut self) {
477 }
479}
480
481pub struct ChunkSender<'a> {
486 sender: &'a mut Sender,
487 credits: AssignedCredits,
488 first: bool,
489}
490
491impl<'a> ChunkSender<'a> {
492 async fn send_int(&mut self, mut data: Bytes, finish: bool) -> Result<(), SendError> {
493 if data.is_empty() {
494 if self.credits.is_empty() {
495 self.credits = self.sender.credits.request(1, 1).await?;
496 }
497 self.credits.take(1);
498
499 let msg =
500 PortEvt::SendData { remote_port: self.sender.remote_port, data, first: self.first, last: finish };
501 self.sender.tx.send(msg).await?;
502
503 self.first = false;
504 } else {
505 while !data.is_empty() {
506 if self.credits.is_empty() {
507 self.credits =
508 self.sender.credits.request(data.len().min(u32::MAX as usize) as u32, 1).await?;
509 }
510
511 let at = data.len().min(self.sender.chunk_size).min(self.credits.available() as usize);
512 let chunk = data.split_to(at);
513
514 self.credits.take(chunk.len() as u32);
515
516 let msg = PortEvt::SendData {
517 remote_port: self.sender.remote_port,
518 data: chunk,
519 first: self.first,
520 last: data.is_empty() && finish,
521 };
522 self.sender.tx.send(msg).await?;
523
524 self.first = false;
525 }
526 }
527
528 Ok(())
529 }
530
531 pub async fn send(mut self, chunk: Bytes) -> Result<ChunkSender<'a>, SendError> {
537 self.send_int(chunk, false).await?;
538 Ok(self)
539 }
540
541 pub async fn send_final(mut self, chunk: Bytes) -> Result<(), SendError> {
546 self.send_int(chunk, true).await
547 }
548
549 pub async fn finish(mut self) -> Result<(), SendError> {
551 self.send_int(Bytes::new(), true).await
552 }
553}
554
555pub struct SenderSink {
557 sender: Option<Arc<Mutex<Sender>>>,
558 send_fut: Option<ReusableBoxFuture<'static, Result<(), SendError>>>,
559}
560
561impl SenderSink {
562 fn new(sender: Sender) -> Self {
563 Self { sender: Some(Arc::new(Mutex::new(sender))), send_fut: None }
564 }
565
566 async fn send(sender: Arc<Mutex<Sender>>, data: Bytes) -> Result<(), SendError> {
567 let mut sender = sender.lock().await;
568 sender.send(data).await
569 }
570
571 fn start_send(&mut self, data: Bytes) -> Result<(), SendError> {
572 if self.send_fut.is_some() {
573 panic!("sink is not ready for sending");
574 }
575
576 match self.sender.clone() {
577 Some(sender) => {
578 self.send_fut = Some(ReusableBoxFuture::new(Self::send(sender, data)));
579 Ok(())
580 }
581 None => panic!("start_send after sink has been closed"),
582 }
583 }
584
585 fn poll_send(&mut self, cx: &mut Context) -> Poll<Result<(), SendError>> {
586 match &mut self.send_fut {
587 Some(fut) => {
588 let res = ready!(fut.poll(cx));
589 self.send_fut = None;
590 Poll::Ready(res)
591 }
592 None => Poll::Ready(Ok(())),
593 }
594 }
595
596 fn close(&mut self) {
597 self.sender = None;
598 }
599}
600
601impl Sink<Bytes> for SenderSink {
602 type Error = SendError;
603
604 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
605 Pin::into_inner(self).poll_send(cx)
606 }
607
608 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
609 Pin::into_inner(self).start_send(item)
610 }
611
612 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
613 Pin::into_inner(self).poll_send(cx)
614 }
615
616 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
617 ready!(Pin::into_inner(self.as_mut()).poll_send(cx))?;
618 Pin::into_inner(self).close();
619 Poll::Ready(Ok(()))
620 }
621}