s2n_quic_transport/stream/
stream_impl.rs1use crate::{
5 contexts::{OnTransmitError, WriteContext},
6 stream::{
7 incoming_connection_flow_controller::IncomingConnectionFlowController,
8 outgoing_connection_flow_controller::OutgoingConnectionFlowController,
9 receive_stream::ReceiveStream,
10 send_stream::SendStream,
11 stream_events::StreamEvents,
12 stream_interests::{StreamInterestProvider, StreamInterests},
13 StreamError,
14 },
15};
16use core::{task::Context, time::Duration};
17use s2n_quic_core::{
18 ack, endpoint,
19 frame::{stream::StreamRef, MaxStreamData, ResetStream, StopSending, StreamDataBlocked},
20 stream::{ops, StreamId},
21 time::{timer, Timestamp},
22 transport,
23 varint::VarInt,
24};
25
26#[derive(Debug)]
28pub struct StreamConfig {
29 pub stream_id: StreamId,
31 pub local_endpoint_type: endpoint::Type,
33 pub incoming_connection_flow_controller: IncomingConnectionFlowController,
35 pub outgoing_connection_flow_controller: OutgoingConnectionFlowController,
37 pub initial_receive_window: VarInt,
39 pub desired_flow_control_window: u32,
41 pub initial_send_window: VarInt,
43 pub max_send_buffer_size: u32,
45}
46
47pub trait StreamTrait: StreamInterestProvider + timer::Provider + core::fmt::Debug {
49 fn new(config: StreamConfig) -> Self;
51
52 fn stream_id(&self) -> StreamId;
54
55 fn on_data(
60 &mut self,
61 frame: &StreamRef,
62 events: &mut StreamEvents,
63 ) -> Result<(), transport::Error>;
64
65 fn on_stream_data_blocked(
68 &mut self,
69 frame: &StreamDataBlocked,
70 events: &mut StreamEvents,
71 ) -> Result<(), transport::Error>;
72
73 fn on_reset(
76 &mut self,
77 frame: &ResetStream,
78 events: &mut StreamEvents,
79 ) -> Result<(), transport::Error>;
80
81 fn on_max_stream_data(
84 &mut self,
85 frame: &MaxStreamData,
86 events: &mut StreamEvents,
87 ) -> Result<(), transport::Error>;
88
89 fn on_stop_sending(
92 &mut self,
93 frame: &StopSending,
94 events: &mut StreamEvents,
95 ) -> Result<(), transport::Error>;
96
97 fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents);
99
100 fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents);
102
103 fn update_blocked_sync_period(&mut self, blocked_sync_period: Duration);
106
107 fn on_timeout(&mut self, now: Timestamp);
109
110 fn on_internal_reset(&mut self, error: StreamError, events: &mut StreamEvents);
113
114 fn on_flush(&mut self, error: StreamError, events: &mut StreamEvents);
118
119 fn on_transmit<W: WriteContext>(&mut self, context: &mut W) -> Result<(), OnTransmitError>;
121
122 fn on_connection_window_available(&mut self);
124
125 fn poll_request(
128 &mut self,
129 request: &mut ops::Request,
130 context: Option<&Context>,
131 ) -> Result<ops::Response, StreamError>;
132}
133
134#[derive(Debug)]
137pub struct StreamImpl {
138 pub(super) stream_id: StreamId,
140 pub(super) receive_stream: ReceiveStream,
142 has_send: bool,
144 pub(super) send_stream: SendStream,
146}
147
148impl StreamImpl {
149 fn poll_request_impl(
150 &mut self,
151 request: &mut ops::Request,
152 context: Option<&Context>,
153 ) -> Result<ops::Response, StreamError> {
154 let mut response = ops::Response::default();
155 if let Some(rx) = request.rx.as_mut() {
156 match self.receive_stream.poll_request(rx, context) {
157 Ok(rx) => response.rx = Some(rx),
158 Err(err) => {
159 if response.tx.is_none() {
161 return Err(err);
162 } else {
163 response.rx = Some(ops::rx::Response {
164 status: ops::Status::Reset(err),
165 ..Default::default()
166 });
167 }
168 }
169 }
170 }
171
172 if let Some(tx) = request.tx.as_mut() {
173 match self.send_stream.poll_request(tx, context) {
174 Ok(tx) => response.tx = Some(tx),
175 Err(err) => {
176 if response.rx.is_none() {
178 return Err(err);
179 } else {
180 response.tx = Some(ops::tx::Response {
181 status: ops::Status::Reset(err),
182 ..Default::default()
183 });
184 }
185 }
186 }
187 }
188
189 Ok(response)
190 }
191}
192
193impl StreamTrait for StreamImpl {
194 fn new(config: StreamConfig) -> StreamImpl {
195 let receive_is_closed = config.stream_id.stream_type().is_unidirectional()
196 && config.stream_id.initiator() == config.local_endpoint_type;
197 let send_is_closed = config.stream_id.stream_type().is_unidirectional()
198 && config.stream_id.initiator() != config.local_endpoint_type;
199
200 StreamImpl {
201 stream_id: config.stream_id,
202 receive_stream: ReceiveStream::new(
203 receive_is_closed,
204 config.incoming_connection_flow_controller,
205 config.initial_receive_window,
206 config.desired_flow_control_window,
207 ),
208 has_send: !send_is_closed,
209 send_stream: SendStream::new(
210 config.outgoing_connection_flow_controller,
211 send_is_closed,
212 config.initial_send_window,
213 config.max_send_buffer_size,
214 ),
215 }
216 }
217
218 #[inline]
219 fn stream_id(&self) -> StreamId {
220 self.stream_id
221 }
222
223 #[inline]
226 fn on_data(
227 &mut self,
228 frame: &StreamRef,
229 events: &mut StreamEvents,
230 ) -> Result<(), transport::Error> {
231 self.receive_stream.on_data(frame, events)
232 }
233
234 #[inline]
235 fn on_stream_data_blocked(
236 &mut self,
237 frame: &StreamDataBlocked,
238 events: &mut StreamEvents,
239 ) -> Result<(), transport::Error> {
240 self.receive_stream.on_stream_data_blocked(frame, events)
241 }
242
243 #[inline]
244 fn on_reset(
245 &mut self,
246 frame: &ResetStream,
247 events: &mut StreamEvents,
248 ) -> Result<(), transport::Error> {
249 self.receive_stream.on_reset(frame, events)
250 }
251
252 #[inline]
253 fn on_max_stream_data(
254 &mut self,
255 frame: &MaxStreamData,
256 events: &mut StreamEvents,
257 ) -> Result<(), transport::Error> {
258 if !self.has_send {
263 return Err(transport::Error::STREAM_STATE_ERROR
264 .with_reason("MAX_STREAM_DATA sent on receive-only stream"));
265 }
266
267 self.send_stream.on_max_stream_data(frame, events)
268 }
269
270 #[inline]
271 fn on_stop_sending(
272 &mut self,
273 frame: &StopSending,
274 events: &mut StreamEvents,
275 ) -> Result<(), transport::Error> {
276 self.send_stream.on_stop_sending(frame, events)
277 }
278
279 #[inline]
280 fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents) {
281 self.receive_stream.on_packet_ack(ack_set);
282 self.send_stream.on_packet_ack(ack_set, events);
283 }
284
285 #[inline]
286 fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A, _events: &mut StreamEvents) {
287 self.receive_stream.on_packet_loss(ack_set);
288 self.send_stream.on_packet_loss(ack_set);
289 }
290
291 #[inline]
292 fn update_blocked_sync_period(&mut self, blocked_sync_period: Duration) {
293 self.send_stream
294 .update_blocked_sync_period(blocked_sync_period);
295 }
296
297 #[inline]
298 fn on_timeout(&mut self, now: Timestamp) {
299 self.send_stream.on_timeout(now)
300 }
301
302 #[inline]
303 fn on_internal_reset(&mut self, error: StreamError, events: &mut StreamEvents) {
304 self.receive_stream.on_internal_reset(error, events);
305 self.send_stream.on_internal_reset(error, events);
306 }
307
308 #[inline]
309 fn on_flush(&mut self, error: StreamError, events: &mut StreamEvents) {
310 self.receive_stream.on_internal_reset(error, events);
312
313 self.send_stream.on_flush(error, events);
315 }
316
317 #[inline]
318 fn on_transmit<W: WriteContext>(&mut self, context: &mut W) -> Result<(), OnTransmitError> {
319 self.receive_stream.on_transmit(self.stream_id, context)?;
321 self.send_stream.on_transmit(self.stream_id, context)
323 }
324
325 #[inline]
326 fn on_connection_window_available(&mut self) {
327 self.send_stream.on_connection_window_available()
328 }
329
330 fn poll_request(
333 &mut self,
334 request: &mut ops::Request,
335 context: Option<&Context>,
336 ) -> Result<ops::Response, StreamError> {
337 #[cfg(debug_assertions)]
338 let contract: crate::stream::contract::Request = (&*request).into();
339
340 let result = self.poll_request_impl(request, context);
341
342 #[cfg(debug_assertions)]
343 contract.validate_response(request, result.as_ref(), context);
344
345 result
346 }
347}
348
349impl timer::Provider for StreamImpl {
350 #[inline]
351 fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
352 self.send_stream.timers(query)?;
353 Ok(())
354 }
355}
356
357impl StreamInterestProvider for StreamImpl {
358 #[inline]
359 fn stream_interests(&self, interests: &mut StreamInterests) {
360 self.send_stream.stream_interests(interests);
361 self.receive_stream.stream_interests(interests);
362 }
363}