s2n_quic_transport/stream/
stream_impl.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    contexts::{OnTransmitError, WriteContext},
6    stream::{
7        incoming_connection_flow_controller::IncomingConnectionFlowController,
8        outgoing_connection_flow_controller::OutgoingConnectionFlowController,
9        receive_stream::ReceiveStream,
10        send_stream::SendStream,
11        stream_events::StreamEvents,
12        stream_interests::{StreamInterestProvider, StreamInterests},
13        StreamError,
14    },
15};
16use core::{task::Context, time::Duration};
17use s2n_quic_core::{
18    ack, endpoint,
19    frame::{stream::StreamRef, MaxStreamData, ResetStream, StopSending, StreamDataBlocked},
20    stream::{ops, StreamId},
21    time::{timer, Timestamp},
22    transport,
23    varint::VarInt,
24};
25
26/// Configuration values for a Stream
27#[derive(Debug)]
28pub struct StreamConfig {
29    /// The [`Stream`]s identifier
30    pub stream_id: StreamId,
31    /// The type of the local endpoint
32    pub local_endpoint_type: endpoint::Type,
33    /// The connection-wide flow controller for receiving data
34    pub incoming_connection_flow_controller: IncomingConnectionFlowController,
35    /// The connection-wide flow controller for sending data
36    pub outgoing_connection_flow_controller: OutgoingConnectionFlowController,
37    /// The initial flow control window for receiving data
38    pub initial_receive_window: VarInt,
39    /// The desired flow control window that we want to maintain on the receiving side
40    pub desired_flow_control_window: u32,
41    /// The initial flow control window for sending data
42    pub initial_send_window: VarInt,
43    /// The maximum buffered amount of data on the sending side
44    pub max_send_buffer_size: u32,
45}
46
47/// A trait which represents an internally used `Stream`
48pub trait StreamTrait: StreamInterestProvider + timer::Provider + core::fmt::Debug {
49    /// Creates a new `Stream` instance with the given configuration
50    fn new(config: StreamConfig) -> Self;
51
52    /// Returns the Streams ID
53    fn stream_id(&self) -> StreamId;
54
55    // These functions are called from the packet delivery thread
56
57    /// This is called when a `STREAM_DATA` frame had been received for
58    /// this stream
59    fn on_data(
60        &mut self,
61        frame: &StreamRef,
62        events: &mut StreamEvents,
63    ) -> Result<(), transport::Error>;
64
65    /// This is called when a `STREAM_DATA_BLOCKED` frame had been received for
66    /// this stream
67    fn on_stream_data_blocked(
68        &mut self,
69        frame: &StreamDataBlocked,
70        events: &mut StreamEvents,
71    ) -> Result<(), transport::Error>;
72
73    /// This is called when a `RESET_STREAM` frame had been received for
74    /// this stream
75    fn on_reset(
76        &mut self,
77        frame: &ResetStream,
78        events: &mut StreamEvents,
79    ) -> Result<(), transport::Error>;
80
81    /// This is called when a `MAX_STREAM_DATA` frame had been received for
82    /// this stream
83    fn on_max_stream_data(
84        &mut self,
85        frame: &MaxStreamData,
86        events: &mut StreamEvents,
87    ) -> Result<(), transport::Error>;
88
89    /// This is called when a `STOP_SENDING` frame had been received for
90    /// this stream
91    fn on_stop_sending(
92        &mut self,
93        frame: &StopSending,
94        events: &mut StreamEvents,
95    ) -> Result<(), transport::Error>;
96
97    /// This method gets called when a packet delivery got acknowledged
98    fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents);
99
100    /// This method gets called when a packet loss is reported
101    fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents);
102
103    /// Updates the period at which `STREAM_DATA_BLOCKED` frames are sent to the peer
104    /// if the application is blocked by peer limits.
105    fn update_blocked_sync_period(&mut self, blocked_sync_period: Duration);
106
107    /// Called when the connection timer expires
108    fn on_timeout(&mut self, now: Timestamp);
109
110    /// This method gets called when a stream gets reset due to a reason that is
111    /// not related to a frame. E.g. due to a connection failure.
112    fn on_internal_reset(&mut self, error: StreamError, events: &mut StreamEvents);
113
114    /// This method is called when the application drops the connection
115    ///
116    /// The stream should finish any pending operations and close
117    fn on_flush(&mut self, error: StreamError, events: &mut StreamEvents);
118
119    /// Queries the component for any outgoing frames that need to get sent
120    fn on_transmit<W: WriteContext>(&mut self, context: &mut W) -> Result<(), OnTransmitError>;
121
122    /// This method is called when a connection window is available
123    fn on_connection_window_available(&mut self);
124
125    // These functions are called from the client API
126
127    fn poll_request(
128        &mut self,
129        request: &mut ops::Request,
130        context: Option<&Context>,
131    ) -> Result<ops::Response, StreamError>;
132}
133
134/// The implementation of a `Stream`.
135/// This is mostly a facade over the reading and writing half of the `Stream`.
136#[derive(Debug)]
137pub struct StreamImpl {
138    /// The stream ID
139    pub(super) stream_id: StreamId,
140    /// Manages the receiving side of the stream
141    pub(super) receive_stream: ReceiveStream,
142    /// Set to `true` when this stream has a sending side
143    has_send: bool,
144    /// Manages the sending side of the stream
145    pub(super) send_stream: SendStream,
146}
147
148impl StreamImpl {
149    fn poll_request_impl(
150        &mut self,
151        request: &mut ops::Request,
152        context: Option<&Context>,
153    ) -> Result<ops::Response, StreamError> {
154        let mut response = ops::Response::default();
155        if let Some(rx) = request.rx.as_mut() {
156            match self.receive_stream.poll_request(rx, context) {
157                Ok(rx) => response.rx = Some(rx),
158                Err(err) => {
159                    // only fail the request if tx is not set
160                    if response.tx.is_none() {
161                        return Err(err);
162                    } else {
163                        response.rx = Some(ops::rx::Response {
164                            status: ops::Status::Reset(err),
165                            ..Default::default()
166                        });
167                    }
168                }
169            }
170        }
171
172        if let Some(tx) = request.tx.as_mut() {
173            match self.send_stream.poll_request(tx, context) {
174                Ok(tx) => response.tx = Some(tx),
175                Err(err) => {
176                    // only fail the request if rx is not set
177                    if response.rx.is_none() {
178                        return Err(err);
179                    } else {
180                        response.tx = Some(ops::tx::Response {
181                            status: ops::Status::Reset(err),
182                            ..Default::default()
183                        });
184                    }
185                }
186            }
187        }
188
189        Ok(response)
190    }
191}
192
193impl StreamTrait for StreamImpl {
194    fn new(config: StreamConfig) -> StreamImpl {
195        let receive_is_closed = config.stream_id.stream_type().is_unidirectional()
196            && config.stream_id.initiator() == config.local_endpoint_type;
197        let send_is_closed = config.stream_id.stream_type().is_unidirectional()
198            && config.stream_id.initiator() != config.local_endpoint_type;
199
200        StreamImpl {
201            stream_id: config.stream_id,
202            receive_stream: ReceiveStream::new(
203                receive_is_closed,
204                config.incoming_connection_flow_controller,
205                config.initial_receive_window,
206                config.desired_flow_control_window,
207            ),
208            has_send: !send_is_closed,
209            send_stream: SendStream::new(
210                config.outgoing_connection_flow_controller,
211                send_is_closed,
212                config.initial_send_window,
213                config.max_send_buffer_size,
214            ),
215        }
216    }
217
218    #[inline]
219    fn stream_id(&self) -> StreamId {
220        self.stream_id
221    }
222
223    // These functions are called from the packet delivery thread
224
225    #[inline]
226    fn on_data(
227        &mut self,
228        frame: &StreamRef,
229        events: &mut StreamEvents,
230    ) -> Result<(), transport::Error> {
231        self.receive_stream.on_data(frame, events)
232    }
233
234    #[inline]
235    fn on_stream_data_blocked(
236        &mut self,
237        frame: &StreamDataBlocked,
238        events: &mut StreamEvents,
239    ) -> Result<(), transport::Error> {
240        self.receive_stream.on_stream_data_blocked(frame, events)
241    }
242
243    #[inline]
244    fn on_reset(
245        &mut self,
246        frame: &ResetStream,
247        events: &mut StreamEvents,
248    ) -> Result<(), transport::Error> {
249        self.receive_stream.on_reset(frame, events)
250    }
251
252    #[inline]
253    fn on_max_stream_data(
254        &mut self,
255        frame: &MaxStreamData,
256        events: &mut StreamEvents,
257    ) -> Result<(), transport::Error> {
258        //= https://www.rfc-editor.org/rfc/rfc9000#section-19.10
259        //# An endpoint that
260        //# receives a MAX_STREAM_DATA frame for a receive-only stream MUST
261        //# terminate the connection with error STREAM_STATE_ERROR.
262        if !self.has_send {
263            return Err(transport::Error::STREAM_STATE_ERROR
264                .with_reason("MAX_STREAM_DATA sent on receive-only stream"));
265        }
266
267        self.send_stream.on_max_stream_data(frame, events)
268    }
269
270    #[inline]
271    fn on_stop_sending(
272        &mut self,
273        frame: &StopSending,
274        events: &mut StreamEvents,
275    ) -> Result<(), transport::Error> {
276        self.send_stream.on_stop_sending(frame, events)
277    }
278
279    #[inline]
280    fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents) {
281        self.receive_stream.on_packet_ack(ack_set);
282        self.send_stream.on_packet_ack(ack_set, events);
283    }
284
285    #[inline]
286    fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A, _events: &mut StreamEvents) {
287        self.receive_stream.on_packet_loss(ack_set);
288        self.send_stream.on_packet_loss(ack_set);
289    }
290
291    #[inline]
292    fn update_blocked_sync_period(&mut self, blocked_sync_period: Duration) {
293        self.send_stream
294            .update_blocked_sync_period(blocked_sync_period);
295    }
296
297    #[inline]
298    fn on_timeout(&mut self, now: Timestamp) {
299        self.send_stream.on_timeout(now)
300    }
301
302    #[inline]
303    fn on_internal_reset(&mut self, error: StreamError, events: &mut StreamEvents) {
304        self.receive_stream.on_internal_reset(error, events);
305        self.send_stream.on_internal_reset(error, events);
306    }
307
308    #[inline]
309    fn on_flush(&mut self, error: StreamError, events: &mut StreamEvents) {
310        // flushing a receive stream is the same as resetting it
311        self.receive_stream.on_internal_reset(error, events);
312
313        // tell the send stream to flush any pending data
314        self.send_stream.on_flush(error, events);
315    }
316
317    #[inline]
318    fn on_transmit<W: WriteContext>(&mut self, context: &mut W) -> Result<(), OnTransmitError> {
319        // Query the receiving side for outgoing data
320        self.receive_stream.on_transmit(self.stream_id, context)?;
321        // And the sending side
322        self.send_stream.on_transmit(self.stream_id, context)
323    }
324
325    #[inline]
326    fn on_connection_window_available(&mut self) {
327        self.send_stream.on_connection_window_available()
328    }
329
330    // These functions are called from the client API
331
332    fn poll_request(
333        &mut self,
334        request: &mut ops::Request,
335        context: Option<&Context>,
336    ) -> Result<ops::Response, StreamError> {
337        #[cfg(debug_assertions)]
338        let contract: crate::stream::contract::Request = (&*request).into();
339
340        let result = self.poll_request_impl(request, context);
341
342        #[cfg(debug_assertions)]
343        contract.validate_response(request, result.as_ref(), context);
344
345        result
346    }
347}
348
349impl timer::Provider for StreamImpl {
350    #[inline]
351    fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
352        self.send_stream.timers(query)?;
353        Ok(())
354    }
355}
356
357impl StreamInterestProvider for StreamImpl {
358    #[inline]
359    fn stream_interests(&self, interests: &mut StreamInterests) {
360        self.send_stream.stream_interests(interests);
361        self.receive_stream.stream_interests(interests);
362    }
363}