httpbis/common/
stream.rs

1use std::cmp;
2
3use bytes::Bytes;
4
5use crate::error;
6
7use crate::solicit::end_stream::EndStream;
8use crate::solicit::header::Headers;
9use crate::solicit::session::StreamState;
10use crate::solicit::window_size::NonNegativeWindowSize;
11use crate::solicit::window_size::WindowSize;
12
13use super::types::Types;
14
15use super::stream_queue::StreamQueue;
16use super::window_size;
17use crate::common::stream_handler::StreamHandlerInternal;
18use crate::data_or_headers::DataOrHeaders;
19use crate::data_or_headers_with_flag::DataOrHeadersWithFlag;
20use crate::ErrorCode;
21
22pub enum HttpStreamCommand {
23    Headers(Headers, EndStream),
24    Data(Bytes, EndStream),
25    Rst(ErrorCode),
26}
27
28impl HttpStreamCommand {
29    pub fn from(part: DataOrHeadersWithFlag) -> HttpStreamCommand {
30        let end_stream = match part.last {
31            true => EndStream::Yes,
32            false => EndStream::No,
33        };
34        match part.content {
35            DataOrHeaders::Data(data) => HttpStreamCommand::Data(data, end_stream),
36            DataOrHeaders::Headers(headers) => HttpStreamCommand::Headers(headers, end_stream),
37        }
38    }
39}
40
41#[must_use]
42pub struct DroppedData {
43    pub size: usize,
44}
45
46#[derive(Debug, PartialEq, Eq, Clone)]
47pub struct HttpStreamStateSnapshot {
48    pub state: StreamState,
49    pub out_window_size: i32,
50    pub in_window_size: i32,
51    pub pump_out_window_size: isize,
52    pub queued_out_data_size: usize,
53    pub out_data_size: usize,
54}
55
56#[derive(Eq, PartialEq, Copy, Clone, Debug)]
57pub enum InMessageStage {
58    Initial,
59    AfterInitialHeaders,
60    AfterTrailingHeaders,
61}
62
63/// All HTTP/2 stream state.
64/// Note the state must be kept in sync with other fields,
65/// thus sometimes this object must be manipulated with `HttpStreamRef`.
66pub(crate) struct HttpStreamCommon<T: Types> {
67    pub specific: T::HttpStreamSpecific,
68    pub state: StreamState,
69    pub out_window_size: WindowSize,
70    pub in_window_size: NonNegativeWindowSize,
71    pub outgoing: StreamQueue,
72    pub peer_tx: Option<T::StreamHandlerHolder>,
73    // task waiting for window increase
74    pub pump_out_window: window_size::StreamOutWindowSender,
75    // Incoming remaining content-length
76    pub in_rem_content_length: Option<u64>,
77    pub in_message_stage: InMessageStage,
78}
79
80impl<T: Types> HttpStreamCommon<T> {
81    pub fn new(
82        in_window_size: u32,
83        out_window_size: u32,
84        pump_out_window: window_size::StreamOutWindowSender,
85        in_rem_content_length: Option<u64>,
86        in_message_stage: InMessageStage,
87        specific: T::HttpStreamSpecific,
88    ) -> HttpStreamCommon<T> {
89        HttpStreamCommon {
90            specific,
91            state: StreamState::Open,
92            in_window_size: NonNegativeWindowSize::new(in_window_size as i32),
93            out_window_size: WindowSize::new(out_window_size as i32),
94            outgoing: StreamQueue::new(),
95            peer_tx: None,
96            pump_out_window,
97            in_rem_content_length,
98            in_message_stage,
99        }
100    }
101
102    pub fn snapshot(&self) -> HttpStreamStateSnapshot {
103        HttpStreamStateSnapshot {
104            state: self.state,
105            out_window_size: self.out_window_size.size(),
106            in_window_size: self.in_window_size.size(),
107            pump_out_window_size: self.pump_out_window.get(),
108            queued_out_data_size: self.outgoing.data_size(),
109            out_data_size: self.outgoing.data_size(),
110        }
111    }
112
113    pub fn close_local(&mut self) {
114        trace!("close local");
115        self.state = match self.state {
116            StreamState::Closed | StreamState::HalfClosedRemote => StreamState::Closed,
117            _ => StreamState::HalfClosedLocal,
118        };
119    }
120
121    pub fn close_remote(&mut self) {
122        trace!("close remote");
123        self.state = match self.state {
124            StreamState::Closed | StreamState::HalfClosedLocal => StreamState::Closed,
125            _ => StreamState::HalfClosedRemote,
126        };
127    }
128
129    pub fn conn_died(mut self, error: error::Error) {
130        if let Some(handler) = self.peer_tx.take() {
131            drop(handler.error(error));
132        }
133    }
134
135    /// Must be kept in sync with `pop_outg`.
136    pub fn is_writable(&self) -> bool {
137        match self.outgoing.front() {
138            Some(front) => match front {
139                DataOrHeaders::Headers(..) => true,
140                DataOrHeaders::Data(data) => data.len() == 0 || self.out_window_size.size() > 0,
141            },
142            None => {
143                if let Some(_error_code) = self.outgoing.end() {
144                    if !self.state.is_closed_local() {
145                        return true;
146                    }
147                };
148
149                false
150            }
151        }
152    }
153
154    #[cfg(debug_assertions)]
155    pub fn pop_outg(&mut self, conn_out_window_size: &mut WindowSize) -> Option<HttpStreamCommand> {
156        let writable = self.is_writable();
157        let conn_out_window_size_before = conn_out_window_size.size();
158
159        let command = self.pop_outg_impl(conn_out_window_size);
160        if command.is_some() {
161            assert!(writable);
162        } else {
163            assert!(
164                !writable || conn_out_window_size_before == 0,
165                "popped nothing but writable: {}, conn out window size before: {}, {:?}",
166                writable,
167                conn_out_window_size_before,
168                self.snapshot()
169            );
170        }
171        command
172    }
173
174    #[cfg(not(debug_assertions))]
175    pub fn pop_outg(&mut self, conn_out_window_size: &mut WindowSize) -> Option<HttpStreamCommand> {
176        self.pop_outg_impl(conn_out_window_size)
177    }
178
179    fn pop_outg_impl(
180        &mut self,
181        conn_out_window_size: &mut WindowSize,
182    ) -> Option<HttpStreamCommand> {
183        if self.outgoing.is_empty() {
184            return if let Some(error_code) = self.outgoing.end() {
185                if self.state.is_closed_local() {
186                    None
187                } else {
188                    self.close_local();
189                    Some(match error_code {
190                        ErrorCode::NoError => HttpStreamCommand::Data(Bytes::new(), EndStream::Yes),
191                        error_code => HttpStreamCommand::Rst(error_code),
192                    })
193                }
194            } else {
195                None
196            };
197        }
198
199        let pop_headers = if let &DataOrHeaders::Headers(..) = self.outgoing.front().unwrap() {
200            true
201        } else {
202            false
203        };
204        if pop_headers {
205            let r = self.outgoing.pop_front().unwrap();
206            let last = self.outgoing.end() == Some(ErrorCode::NoError);
207            if last {
208                self.close_local();
209            }
210            return Some(HttpStreamCommand::from(DataOrHeadersWithFlag {
211                content: r,
212                last: last,
213            }));
214        }
215
216        if self.out_window_size.size() <= 0 || conn_out_window_size.size() <= 0 {
217            return None;
218        }
219
220        let mut data = if let Some(DataOrHeaders::Data(data)) = self.outgoing.pop_front() {
221            data
222        } else {
223            unreachable!()
224        };
225
226        // Min of connection and stream window size
227        let max_window = cmp::min(self.out_window_size.size(), conn_out_window_size.size());
228
229        if data.len() as usize > max_window as usize {
230            trace!("truncating data of len {} to {}", data.len(), max_window);
231            let size = max_window as usize;
232            let rem = data.split_off(size);
233            self.outgoing.push_front(DataOrHeaders::Data(rem));
234        };
235
236        self.out_window_size
237            .try_decrease_to_non_negative(data.len() as i32)
238            .unwrap();
239        conn_out_window_size
240            .try_decrease_to_non_negative(data.len() as i32)
241            .unwrap();
242
243        let last = self.outgoing.end() == Some(ErrorCode::NoError);
244        if last {
245            self.close_local();
246        }
247
248        Some(HttpStreamCommand::from(DataOrHeadersWithFlag {
249            content: DataOrHeaders::Data(data),
250            last: last,
251        }))
252    }
253
254    pub fn data_recvd(&mut self, data: Bytes, last: bool) {
255        if let Some(ref mut response_handler) = self.peer_tx {
256            // TODO: reset stream if rx is dead
257            drop(response_handler.data_frame(data, last));
258        }
259    }
260
261    pub fn rst_recvd(&mut self, error_code: ErrorCode) -> DroppedData {
262        if let Some(response_handler) = self.peer_tx.take() {
263            drop(response_handler.rst(error_code));
264        }
265        DroppedData {
266            size: self.outgoing.data_size(),
267        }
268    }
269
270    pub fn goaway_recvd(&mut self, _raw_error_code: u32) {
271        if let Some(response_handler) = self.peer_tx.take() {
272            // it is OK to ignore error: handler may be already dead
273            drop(response_handler.error(error::Error::GoawayReceived));
274        }
275    }
276}
277
278pub(crate) trait HttpStreamDataSpecific: Send + 'static {}
279
280pub(crate) trait HttpStreamData {
281    type Types: Types;
282}