actori_http/h1/
payload.rs1use std::cell::RefCell;
3use std::collections::VecDeque;
4use std::pin::Pin;
5use std::rc::{Rc, Weak};
6use std::task::{Context, Poll};
7
8use actori_utils::task::LocalWaker;
9use bytes::Bytes;
10use futures_core::Stream;
11
12use crate::error::PayloadError;
13
14pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
16
17#[derive(Debug, PartialEq)]
18pub enum PayloadStatus {
19 Read,
20 Pause,
21 Dropped,
22}
23
24#[derive(Debug)]
32pub struct Payload {
33 inner: Rc<RefCell<Inner>>,
34}
35
36impl Payload {
37 pub fn create(eof: bool) -> (PayloadSender, Payload) {
46 let shared = Rc::new(RefCell::new(Inner::new(eof)));
47
48 (
49 PayloadSender {
50 inner: Rc::downgrade(&shared),
51 },
52 Payload { inner: shared },
53 )
54 }
55
56 #[doc(hidden)]
58 pub fn empty() -> Payload {
59 Payload {
60 inner: Rc::new(RefCell::new(Inner::new(true))),
61 }
62 }
63
64 #[cfg(test)]
66 pub fn len(&self) -> usize {
67 self.inner.borrow().len()
68 }
69
70 #[cfg(test)]
72 pub fn is_empty(&self) -> bool {
73 self.inner.borrow().len() == 0
74 }
75
76 #[inline]
78 pub fn unread_data(&mut self, data: Bytes) {
79 self.inner.borrow_mut().unread_data(data);
80 }
81
82 #[inline]
83 pub fn readany(
84 &mut self,
85 cx: &mut Context<'_>,
86 ) -> Poll<Option<Result<Bytes, PayloadError>>> {
87 self.inner.borrow_mut().readany(cx)
88 }
89}
90
91impl Stream for Payload {
92 type Item = Result<Bytes, PayloadError>;
93
94 fn poll_next(
95 self: Pin<&mut Self>,
96 cx: &mut Context<'_>,
97 ) -> Poll<Option<Result<Bytes, PayloadError>>> {
98 self.inner.borrow_mut().readany(cx)
99 }
100}
101
102pub struct PayloadSender {
104 inner: Weak<RefCell<Inner>>,
105}
106
107impl PayloadSender {
108 #[inline]
109 pub fn set_error(&mut self, err: PayloadError) {
110 if let Some(shared) = self.inner.upgrade() {
111 shared.borrow_mut().set_error(err)
112 }
113 }
114
115 #[inline]
116 pub fn feed_eof(&mut self) {
117 if let Some(shared) = self.inner.upgrade() {
118 shared.borrow_mut().feed_eof()
119 }
120 }
121
122 #[inline]
123 pub fn feed_data(&mut self, data: Bytes) {
124 if let Some(shared) = self.inner.upgrade() {
125 shared.borrow_mut().feed_data(data)
126 }
127 }
128
129 #[inline]
130 pub fn need_read(&self, cx: &mut Context<'_>) -> PayloadStatus {
131 if let Some(shared) = self.inner.upgrade() {
134 if shared.borrow().need_read {
135 PayloadStatus::Read
136 } else {
137 shared.borrow_mut().io_task.register(cx.waker());
138 PayloadStatus::Pause
139 }
140 } else {
141 PayloadStatus::Dropped
142 }
143 }
144}
145
146#[derive(Debug)]
147struct Inner {
148 len: usize,
149 eof: bool,
150 err: Option<PayloadError>,
151 need_read: bool,
152 items: VecDeque<Bytes>,
153 task: LocalWaker,
154 io_task: LocalWaker,
155}
156
157impl Inner {
158 fn new(eof: bool) -> Self {
159 Inner {
160 eof,
161 len: 0,
162 err: None,
163 items: VecDeque::new(),
164 need_read: true,
165 task: LocalWaker::new(),
166 io_task: LocalWaker::new(),
167 }
168 }
169
170 #[inline]
171 fn set_error(&mut self, err: PayloadError) {
172 self.err = Some(err);
173 }
174
175 #[inline]
176 fn feed_eof(&mut self) {
177 self.eof = true;
178 }
179
180 #[inline]
181 fn feed_data(&mut self, data: Bytes) {
182 self.len += data.len();
183 self.items.push_back(data);
184 self.need_read = self.len < MAX_BUFFER_SIZE;
185 if let Some(task) = self.task.take() {
186 task.wake()
187 }
188 }
189
190 #[cfg(test)]
191 fn len(&self) -> usize {
192 self.len
193 }
194
195 fn readany(
196 &mut self,
197 cx: &mut Context<'_>,
198 ) -> Poll<Option<Result<Bytes, PayloadError>>> {
199 if let Some(data) = self.items.pop_front() {
200 self.len -= data.len();
201 self.need_read = self.len < MAX_BUFFER_SIZE;
202
203 if self.need_read && !self.eof {
204 self.task.register(cx.waker());
205 }
206 self.io_task.wake();
207 Poll::Ready(Some(Ok(data)))
208 } else if let Some(err) = self.err.take() {
209 Poll::Ready(Some(Err(err)))
210 } else if self.eof {
211 Poll::Ready(None)
212 } else {
213 self.need_read = true;
214 self.task.register(cx.waker());
215 self.io_task.wake();
216 Poll::Pending
217 }
218 }
219
220 fn unread_data(&mut self, data: Bytes) {
221 self.len += data.len();
222 self.items.push_front(data);
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use futures_util::future::poll_fn;
230
231 #[actori_rt::test]
232 async fn test_unread_data() {
233 let (_, mut payload) = Payload::create(false);
234
235 payload.unread_data(Bytes::from("data"));
236 assert!(!payload.is_empty());
237 assert_eq!(payload.len(), 4);
238
239 assert_eq!(
240 Bytes::from("data"),
241 poll_fn(|cx| payload.readany(cx)).await.unwrap().unwrap()
242 );
243 }
244}