1use std::cmp;
2
3use bytes::Bytes;
4
5use crate::error;
6
7use crate::solicit::end_stream::EndStream;
8use crate::solicit::header::Headers;
9use crate::solicit::session::StreamState;
10use crate::solicit::window_size::NonNegativeWindowSize;
11use crate::solicit::window_size::WindowSize;
12
13use super::types::Types;
14
15use super::stream_queue::StreamQueue;
16use super::window_size;
17use crate::common::stream_handler::StreamHandlerInternal;
18use crate::data_or_headers::DataOrHeaders;
19use crate::data_or_headers_with_flag::DataOrHeadersWithFlag;
20use crate::ErrorCode;
21
22pub enum HttpStreamCommand {
23 Headers(Headers, EndStream),
24 Data(Bytes, EndStream),
25 Rst(ErrorCode),
26}
27
28impl HttpStreamCommand {
29 pub fn from(part: DataOrHeadersWithFlag) -> HttpStreamCommand {
30 let end_stream = match part.last {
31 true => EndStream::Yes,
32 false => EndStream::No,
33 };
34 match part.content {
35 DataOrHeaders::Data(data) => HttpStreamCommand::Data(data, end_stream),
36 DataOrHeaders::Headers(headers) => HttpStreamCommand::Headers(headers, end_stream),
37 }
38 }
39}
40
41#[must_use]
42pub struct DroppedData {
43 pub size: usize,
44}
45
46#[derive(Debug, PartialEq, Eq, Clone)]
47pub struct HttpStreamStateSnapshot {
48 pub state: StreamState,
49 pub out_window_size: i32,
50 pub in_window_size: i32,
51 pub pump_out_window_size: isize,
52 pub queued_out_data_size: usize,
53 pub out_data_size: usize,
54}
55
56#[derive(Eq, PartialEq, Copy, Clone, Debug)]
57pub enum InMessageStage {
58 Initial,
59 AfterInitialHeaders,
60 AfterTrailingHeaders,
61}
62
63pub(crate) struct HttpStreamCommon<T: Types> {
67 pub specific: T::HttpStreamSpecific,
68 pub state: StreamState,
69 pub out_window_size: WindowSize,
70 pub in_window_size: NonNegativeWindowSize,
71 pub outgoing: StreamQueue,
72 pub peer_tx: Option<T::StreamHandlerHolder>,
73 pub pump_out_window: window_size::StreamOutWindowSender,
75 pub in_rem_content_length: Option<u64>,
77 pub in_message_stage: InMessageStage,
78}
79
80impl<T: Types> HttpStreamCommon<T> {
81 pub fn new(
82 in_window_size: u32,
83 out_window_size: u32,
84 pump_out_window: window_size::StreamOutWindowSender,
85 in_rem_content_length: Option<u64>,
86 in_message_stage: InMessageStage,
87 specific: T::HttpStreamSpecific,
88 ) -> HttpStreamCommon<T> {
89 HttpStreamCommon {
90 specific,
91 state: StreamState::Open,
92 in_window_size: NonNegativeWindowSize::new(in_window_size as i32),
93 out_window_size: WindowSize::new(out_window_size as i32),
94 outgoing: StreamQueue::new(),
95 peer_tx: None,
96 pump_out_window,
97 in_rem_content_length,
98 in_message_stage,
99 }
100 }
101
102 pub fn snapshot(&self) -> HttpStreamStateSnapshot {
103 HttpStreamStateSnapshot {
104 state: self.state,
105 out_window_size: self.out_window_size.size(),
106 in_window_size: self.in_window_size.size(),
107 pump_out_window_size: self.pump_out_window.get(),
108 queued_out_data_size: self.outgoing.data_size(),
109 out_data_size: self.outgoing.data_size(),
110 }
111 }
112
113 pub fn close_local(&mut self) {
114 trace!("close local");
115 self.state = match self.state {
116 StreamState::Closed | StreamState::HalfClosedRemote => StreamState::Closed,
117 _ => StreamState::HalfClosedLocal,
118 };
119 }
120
121 pub fn close_remote(&mut self) {
122 trace!("close remote");
123 self.state = match self.state {
124 StreamState::Closed | StreamState::HalfClosedLocal => StreamState::Closed,
125 _ => StreamState::HalfClosedRemote,
126 };
127 }
128
129 pub fn conn_died(mut self, error: error::Error) {
130 if let Some(handler) = self.peer_tx.take() {
131 drop(handler.error(error));
132 }
133 }
134
135 pub fn is_writable(&self) -> bool {
137 match self.outgoing.front() {
138 Some(front) => match front {
139 DataOrHeaders::Headers(..) => true,
140 DataOrHeaders::Data(data) => data.len() == 0 || self.out_window_size.size() > 0,
141 },
142 None => {
143 if let Some(_error_code) = self.outgoing.end() {
144 if !self.state.is_closed_local() {
145 return true;
146 }
147 };
148
149 false
150 }
151 }
152 }
153
154 #[cfg(debug_assertions)]
155 pub fn pop_outg(&mut self, conn_out_window_size: &mut WindowSize) -> Option<HttpStreamCommand> {
156 let writable = self.is_writable();
157 let conn_out_window_size_before = conn_out_window_size.size();
158
159 let command = self.pop_outg_impl(conn_out_window_size);
160 if command.is_some() {
161 assert!(writable);
162 } else {
163 assert!(
164 !writable || conn_out_window_size_before == 0,
165 "popped nothing but writable: {}, conn out window size before: {}, {:?}",
166 writable,
167 conn_out_window_size_before,
168 self.snapshot()
169 );
170 }
171 command
172 }
173
174 #[cfg(not(debug_assertions))]
175 pub fn pop_outg(&mut self, conn_out_window_size: &mut WindowSize) -> Option<HttpStreamCommand> {
176 self.pop_outg_impl(conn_out_window_size)
177 }
178
179 fn pop_outg_impl(
180 &mut self,
181 conn_out_window_size: &mut WindowSize,
182 ) -> Option<HttpStreamCommand> {
183 if self.outgoing.is_empty() {
184 return if let Some(error_code) = self.outgoing.end() {
185 if self.state.is_closed_local() {
186 None
187 } else {
188 self.close_local();
189 Some(match error_code {
190 ErrorCode::NoError => HttpStreamCommand::Data(Bytes::new(), EndStream::Yes),
191 error_code => HttpStreamCommand::Rst(error_code),
192 })
193 }
194 } else {
195 None
196 };
197 }
198
199 let pop_headers = if let &DataOrHeaders::Headers(..) = self.outgoing.front().unwrap() {
200 true
201 } else {
202 false
203 };
204 if pop_headers {
205 let r = self.outgoing.pop_front().unwrap();
206 let last = self.outgoing.end() == Some(ErrorCode::NoError);
207 if last {
208 self.close_local();
209 }
210 return Some(HttpStreamCommand::from(DataOrHeadersWithFlag {
211 content: r,
212 last: last,
213 }));
214 }
215
216 if self.out_window_size.size() <= 0 || conn_out_window_size.size() <= 0 {
217 return None;
218 }
219
220 let mut data = if let Some(DataOrHeaders::Data(data)) = self.outgoing.pop_front() {
221 data
222 } else {
223 unreachable!()
224 };
225
226 let max_window = cmp::min(self.out_window_size.size(), conn_out_window_size.size());
228
229 if data.len() as usize > max_window as usize {
230 trace!("truncating data of len {} to {}", data.len(), max_window);
231 let size = max_window as usize;
232 let rem = data.split_off(size);
233 self.outgoing.push_front(DataOrHeaders::Data(rem));
234 };
235
236 self.out_window_size
237 .try_decrease_to_non_negative(data.len() as i32)
238 .unwrap();
239 conn_out_window_size
240 .try_decrease_to_non_negative(data.len() as i32)
241 .unwrap();
242
243 let last = self.outgoing.end() == Some(ErrorCode::NoError);
244 if last {
245 self.close_local();
246 }
247
248 Some(HttpStreamCommand::from(DataOrHeadersWithFlag {
249 content: DataOrHeaders::Data(data),
250 last: last,
251 }))
252 }
253
254 pub fn data_recvd(&mut self, data: Bytes, last: bool) {
255 if let Some(ref mut response_handler) = self.peer_tx {
256 drop(response_handler.data_frame(data, last));
258 }
259 }
260
261 pub fn rst_recvd(&mut self, error_code: ErrorCode) -> DroppedData {
262 if let Some(response_handler) = self.peer_tx.take() {
263 drop(response_handler.rst(error_code));
264 }
265 DroppedData {
266 size: self.outgoing.data_size(),
267 }
268 }
269
270 pub fn goaway_recvd(&mut self, _raw_error_code: u32) {
271 if let Some(response_handler) = self.peer_tx.take() {
272 drop(response_handler.error(error::Error::GoawayReceived));
274 }
275 }
276}
277
278pub(crate) trait HttpStreamDataSpecific: Send + 'static {}
279
280pub(crate) trait HttpStreamData {
281 type Types: Types;
282}