1use std::{cell::Cell, fmt, io, mem};
2
3use ntex_bytes::{Bytes, BytesMut};
4use ntex_util::{channel::bstream, future::Either};
5
6pub(crate) use ntex_util::channel::bstream::Status as PayloadStatus;
7
8use crate::error::PayloadError;
9
10type PlStream = bstream::Receiver<PayloadError>;
11pub(crate) type PlSender = bstream::Sender<PayloadError>;
12
13pub struct Payload {
15 pl: Either<Cell<Option<Bytes>>, PlStream>,
16}
17
18pub struct PayloadSender {
20 tx: PlSender,
21}
22
23impl Default for Payload {
24 fn default() -> Self {
25 Payload { pl: Either::Left(Cell::new(None)) }
26 }
27}
28
29impl Payload {
30 pub fn from_bytes(buf: Bytes) -> Payload {
31 Payload { pl: Either::Left(Cell::new(Some(buf))) }
32 }
33
34 pub(crate) fn from_stream(buf: Bytes) -> (Payload, PlSender) {
35 let (tx, rx) = bstream::channel();
36 if !buf.is_empty() {
37 tx.feed_data(buf);
38 }
39 (Payload { pl: Either::Right(rx) }, tx)
40 }
41
42 pub fn is_fixed(&self) -> bool {
44 self.pl.is_left()
45 }
46
47 pub async fn read(&self) -> Result<Option<Bytes>, PayloadError> {
49 match &self.pl {
50 Either::Left(pl) => Ok(pl.take()),
51 Either::Right(pl) => {
52 pl.read().await.map_or(Ok(None), |res| res.map(|val| Some(val)))
53 }
54 }
55 }
56
57 pub async fn read_all(&self) -> Result<Option<Bytes>, PayloadError> {
58 match &self.pl {
59 Either::Left(pl) => Ok(pl.take()),
60 Either::Right(pl) => {
61 let mut chunk = if let Some(item) = pl.read().await {
62 Some(item?)
63 } else {
64 return Ok(None);
65 };
66
67 let mut buf = BytesMut::new();
68 loop {
69 return match pl.read().await {
70 Some(Ok(b)) => {
71 if let Some(chunk) = chunk.take() {
72 buf.reserve(b.len() + chunk.len());
73 buf.extend_from_slice(&chunk);
74 }
75 buf.extend_from_slice(&b);
76 continue;
77 }
78 None => Ok(Some(chunk.unwrap_or_else(|| buf.freeze()))),
79 Some(Err(err)) => Err(err),
80 };
81 }
82 }
83 }
84 }
85
86 pub fn take(&mut self) -> Payload {
87 Payload { pl: mem::replace(&mut self.pl, Either::Left(Cell::new(None))) }
88 }
89}
90
91impl fmt::Debug for Payload {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 if self.pl.is_left() {
94 f.debug_struct("FixedPayload").finish()
95 } else {
96 f.debug_struct("StreamingPayload").finish()
97 }
98 }
99}