ntex_mqtt/
payload.rs

1use std::{cell::Cell, fmt, io, mem};
2
3use ntex_bytes::{Bytes, BytesMut};
4use ntex_util::{channel::bstream, future::Either};
5
6pub(crate) use ntex_util::channel::bstream::Status as PayloadStatus;
7
8use crate::error::PayloadError;
9
10type PlStream = bstream::Receiver<PayloadError>;
11pub(crate) type PlSender = bstream::Sender<PayloadError>;
12
13/// Payload for Publish packet
14pub struct Payload {
15    pl: Either<Cell<Option<Bytes>>, PlStream>,
16}
17
18/// Client payload streaming
19pub struct PayloadSender {
20    tx: PlSender,
21}
22
23impl Default for Payload {
24    fn default() -> Self {
25        Payload { pl: Either::Left(Cell::new(None)) }
26    }
27}
28
29impl Payload {
30    pub fn from_bytes(buf: Bytes) -> Payload {
31        Payload { pl: Either::Left(Cell::new(Some(buf))) }
32    }
33
34    pub(crate) fn from_stream(buf: Bytes) -> (Payload, PlSender) {
35        let (tx, rx) = bstream::channel();
36        if !buf.is_empty() {
37            tx.feed_data(buf);
38        }
39        (Payload { pl: Either::Right(rx) }, tx)
40    }
41
42    /// Check if payload is fixed
43    pub fn is_fixed(&self) -> bool {
44        self.pl.is_left()
45    }
46
47    /// Read next chunk
48    pub async fn read(&self) -> Result<Option<Bytes>, PayloadError> {
49        match &self.pl {
50            Either::Left(pl) => Ok(pl.take()),
51            Either::Right(pl) => {
52                pl.read().await.map_or(Ok(None), |res| res.map(|val| Some(val)))
53            }
54        }
55    }
56
57    pub async fn read_all(&self) -> Result<Option<Bytes>, PayloadError> {
58        match &self.pl {
59            Either::Left(pl) => Ok(pl.take()),
60            Either::Right(pl) => {
61                let mut chunk = if let Some(item) = pl.read().await {
62                    Some(item?)
63                } else {
64                    return Ok(None);
65                };
66
67                let mut buf = BytesMut::new();
68                loop {
69                    return match pl.read().await {
70                        Some(Ok(b)) => {
71                            if let Some(chunk) = chunk.take() {
72                                buf.reserve(b.len() + chunk.len());
73                                buf.extend_from_slice(&chunk);
74                            }
75                            buf.extend_from_slice(&b);
76                            continue;
77                        }
78                        None => Ok(Some(chunk.unwrap_or_else(|| buf.freeze()))),
79                        Some(Err(err)) => Err(err),
80                    };
81                }
82            }
83        }
84    }
85
86    pub fn take(&mut self) -> Payload {
87        Payload { pl: mem::replace(&mut self.pl, Either::Left(Cell::new(None))) }
88    }
89}
90
91impl fmt::Debug for Payload {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        if self.pl.is_left() {
94            f.debug_struct("FixedPayload").finish()
95        } else {
96            f.debug_struct("StreamingPayload").finish()
97        }
98    }
99}