actori_http/h1/
payload.rs

1//! Payload stream
2use std::cell::RefCell;
3use std::collections::VecDeque;
4use std::pin::Pin;
5use std::rc::{Rc, Weak};
6use std::task::{Context, Poll};
7
8use actori_utils::task::LocalWaker;
9use bytes::Bytes;
10use futures_core::Stream;
11
12use crate::error::PayloadError;
13
14/// max buffer size 32k
15pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
16
17#[derive(Debug, PartialEq)]
18pub enum PayloadStatus {
19    Read,
20    Pause,
21    Dropped,
22}
23
24/// Buffered stream of bytes chunks
25///
26/// Payload stores chunks in a vector. First chunk can be received with
27/// `.readany()` method. Payload stream is not thread safe. Payload does not
28/// notify current task when new data is available.
29///
30/// Payload stream can be used as `Response` body stream.
31#[derive(Debug)]
32pub struct Payload {
33    inner: Rc<RefCell<Inner>>,
34}
35
36impl Payload {
37    /// Create payload stream.
38    ///
39    /// This method construct two objects responsible for bytes stream
40    /// generation.
41    ///
42    /// * `PayloadSender` - *Sender* side of the stream
43    ///
44    /// * `Payload` - *Receiver* side of the stream
45    pub fn create(eof: bool) -> (PayloadSender, Payload) {
46        let shared = Rc::new(RefCell::new(Inner::new(eof)));
47
48        (
49            PayloadSender {
50                inner: Rc::downgrade(&shared),
51            },
52            Payload { inner: shared },
53        )
54    }
55
56    /// Create empty payload
57    #[doc(hidden)]
58    pub fn empty() -> Payload {
59        Payload {
60            inner: Rc::new(RefCell::new(Inner::new(true))),
61        }
62    }
63
64    /// Length of the data in this payload
65    #[cfg(test)]
66    pub fn len(&self) -> usize {
67        self.inner.borrow().len()
68    }
69
70    /// Is payload empty
71    #[cfg(test)]
72    pub fn is_empty(&self) -> bool {
73        self.inner.borrow().len() == 0
74    }
75
76    /// Put unused data back to payload
77    #[inline]
78    pub fn unread_data(&mut self, data: Bytes) {
79        self.inner.borrow_mut().unread_data(data);
80    }
81
82    #[inline]
83    pub fn readany(
84        &mut self,
85        cx: &mut Context<'_>,
86    ) -> Poll<Option<Result<Bytes, PayloadError>>> {
87        self.inner.borrow_mut().readany(cx)
88    }
89}
90
91impl Stream for Payload {
92    type Item = Result<Bytes, PayloadError>;
93
94    fn poll_next(
95        self: Pin<&mut Self>,
96        cx: &mut Context<'_>,
97    ) -> Poll<Option<Result<Bytes, PayloadError>>> {
98        self.inner.borrow_mut().readany(cx)
99    }
100}
101
102/// Sender part of the payload stream
103pub struct PayloadSender {
104    inner: Weak<RefCell<Inner>>,
105}
106
107impl PayloadSender {
108    #[inline]
109    pub fn set_error(&mut self, err: PayloadError) {
110        if let Some(shared) = self.inner.upgrade() {
111            shared.borrow_mut().set_error(err)
112        }
113    }
114
115    #[inline]
116    pub fn feed_eof(&mut self) {
117        if let Some(shared) = self.inner.upgrade() {
118            shared.borrow_mut().feed_eof()
119        }
120    }
121
122    #[inline]
123    pub fn feed_data(&mut self, data: Bytes) {
124        if let Some(shared) = self.inner.upgrade() {
125            shared.borrow_mut().feed_data(data)
126        }
127    }
128
129    #[inline]
130    pub fn need_read(&self, cx: &mut Context<'_>) -> PayloadStatus {
131        // we check need_read only if Payload (other side) is alive,
132        // otherwise always return true (consume payload)
133        if let Some(shared) = self.inner.upgrade() {
134            if shared.borrow().need_read {
135                PayloadStatus::Read
136            } else {
137                shared.borrow_mut().io_task.register(cx.waker());
138                PayloadStatus::Pause
139            }
140        } else {
141            PayloadStatus::Dropped
142        }
143    }
144}
145
146#[derive(Debug)]
147struct Inner {
148    len: usize,
149    eof: bool,
150    err: Option<PayloadError>,
151    need_read: bool,
152    items: VecDeque<Bytes>,
153    task: LocalWaker,
154    io_task: LocalWaker,
155}
156
157impl Inner {
158    fn new(eof: bool) -> Self {
159        Inner {
160            eof,
161            len: 0,
162            err: None,
163            items: VecDeque::new(),
164            need_read: true,
165            task: LocalWaker::new(),
166            io_task: LocalWaker::new(),
167        }
168    }
169
170    #[inline]
171    fn set_error(&mut self, err: PayloadError) {
172        self.err = Some(err);
173    }
174
175    #[inline]
176    fn feed_eof(&mut self) {
177        self.eof = true;
178    }
179
180    #[inline]
181    fn feed_data(&mut self, data: Bytes) {
182        self.len += data.len();
183        self.items.push_back(data);
184        self.need_read = self.len < MAX_BUFFER_SIZE;
185        if let Some(task) = self.task.take() {
186            task.wake()
187        }
188    }
189
190    #[cfg(test)]
191    fn len(&self) -> usize {
192        self.len
193    }
194
195    fn readany(
196        &mut self,
197        cx: &mut Context<'_>,
198    ) -> Poll<Option<Result<Bytes, PayloadError>>> {
199        if let Some(data) = self.items.pop_front() {
200            self.len -= data.len();
201            self.need_read = self.len < MAX_BUFFER_SIZE;
202
203            if self.need_read && !self.eof {
204                self.task.register(cx.waker());
205            }
206            self.io_task.wake();
207            Poll::Ready(Some(Ok(data)))
208        } else if let Some(err) = self.err.take() {
209            Poll::Ready(Some(Err(err)))
210        } else if self.eof {
211            Poll::Ready(None)
212        } else {
213            self.need_read = true;
214            self.task.register(cx.waker());
215            self.io_task.wake();
216            Poll::Pending
217        }
218    }
219
220    fn unread_data(&mut self, data: Bytes) {
221        self.len += data.len();
222        self.items.push_front(data);
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use futures_util::future::poll_fn;
230
231    #[actori_rt::test]
232    async fn test_unread_data() {
233        let (_, mut payload) = Payload::create(false);
234
235        payload.unread_data(Bytes::from("data"));
236        assert!(!payload.is_empty());
237        assert_eq!(payload.len(), 4);
238
239        assert_eq!(
240            Bytes::from("data"),
241            poll_fn(|cx| payload.readany(cx)).await.unwrap().unwrap()
242        );
243    }
244}