1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
    contexts::{OnTransmitError, WriteContext},
    stream::{
        incoming_connection_flow_controller::IncomingConnectionFlowController,
        outgoing_connection_flow_controller::OutgoingConnectionFlowController,
        receive_stream::ReceiveStream,
        send_stream::SendStream,
        stream_events::StreamEvents,
        stream_interests::{StreamInterestProvider, StreamInterests},
        StreamError,
    },
};
use core::{task::Context, time::Duration};
use s2n_quic_core::{
    ack, endpoint,
    frame::{stream::StreamRef, MaxStreamData, ResetStream, StopSending, StreamDataBlocked},
    stream::{ops, StreamId},
    time::{timer, Timestamp},
    transport,
    varint::VarInt,
};

/// Configuration values for a Stream
#[derive(Debug)]
pub struct StreamConfig {
    /// The [`Stream`]s identifier
    pub stream_id: StreamId,
    /// The type of the local endpoint
    pub local_endpoint_type: endpoint::Type,
    /// The connection-wide flow controller for receiving data
    pub incoming_connection_flow_controller: IncomingConnectionFlowController,
    /// The connection-wide flow controller for sending data
    pub outgoing_connection_flow_controller: OutgoingConnectionFlowController,
    /// The initial flow control window for receiving data
    pub initial_receive_window: VarInt,
    /// The desired flow control window that we want to maintain on the receiving side
    pub desired_flow_control_window: u32,
    /// The initial flow control window for sending data
    pub initial_send_window: VarInt,
    /// The maximum buffered amount of data on the sending side
    pub max_send_buffer_size: u32,
}

/// A trait which represents an internally used `Stream`
pub trait StreamTrait: StreamInterestProvider + timer::Provider + core::fmt::Debug {
    /// Creates a new `Stream` instance with the given configuration
    fn new(config: StreamConfig) -> Self;

    /// Returns the Streams ID
    fn stream_id(&self) -> StreamId;

    // These functions are called from the packet delivery thread

    /// This is called when a `STREAM_DATA` frame had been received for
    /// this stream
    fn on_data(
        &mut self,
        frame: &StreamRef,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error>;

    /// This is called when a `STREAM_DATA_BLOCKED` frame had been received for
    /// this stream
    fn on_stream_data_blocked(
        &mut self,
        frame: &StreamDataBlocked,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error>;

    /// This is called when a `RESET_STREAM` frame had been received for
    /// this stream
    fn on_reset(
        &mut self,
        frame: &ResetStream,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error>;

    /// This is called when a `MAX_STREAM_DATA` frame had been received for
    /// this stream
    fn on_max_stream_data(
        &mut self,
        frame: &MaxStreamData,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error>;

    /// This is called when a `STOP_SENDING` frame had been received for
    /// this stream
    fn on_stop_sending(
        &mut self,
        frame: &StopSending,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error>;

    /// This method gets called when a packet delivery got acknowledged
    fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents);

    /// This method gets called when a packet loss is reported
    fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents);

    /// Updates the period at which `STREAM_DATA_BLOCKED` frames are sent to the peer
    /// if the application is blocked by peer limits.
    fn update_blocked_sync_period(&mut self, blocked_sync_period: Duration);

    /// Called when the connection timer expires
    fn on_timeout(&mut self, now: Timestamp);

    /// This method gets called when a stream gets reset due to a reason that is
    /// not related to a frame. E.g. due to a connection failure.
    fn on_internal_reset(&mut self, error: StreamError, events: &mut StreamEvents);

    /// This method is called when the application drops the connection
    ///
    /// The stream should finish any pending operations and close
    fn on_flush(&mut self, error: StreamError, events: &mut StreamEvents);

    /// Queries the component for any outgoing frames that need to get sent
    fn on_transmit<W: WriteContext>(&mut self, context: &mut W) -> Result<(), OnTransmitError>;

    /// This method is called when a connection window is available
    fn on_connection_window_available(&mut self);

    // These functions are called from the client API

    fn poll_request(
        &mut self,
        request: &mut ops::Request,
        context: Option<&Context>,
    ) -> Result<ops::Response, StreamError>;
}

/// The implementation of a `Stream`.
/// This is mostly a facade over the reading and writing half of the `Stream`.
#[derive(Debug)]
pub struct StreamImpl {
    /// The stream ID
    pub(super) stream_id: StreamId,
    /// Manages the receiving side of the stream
    pub(super) receive_stream: ReceiveStream,
    /// Set to `true` when this stream has a sending side
    has_send: bool,
    /// Manages the sending side of the stream
    pub(super) send_stream: SendStream,
}

impl StreamImpl {
    fn poll_request_impl(
        &mut self,
        request: &mut ops::Request,
        context: Option<&Context>,
    ) -> Result<ops::Response, StreamError> {
        let mut response = ops::Response::default();
        if let Some(rx) = request.rx.as_mut() {
            match self.receive_stream.poll_request(rx, context) {
                Ok(rx) => response.rx = Some(rx),
                Err(err) => {
                    // only fail the request if tx is not set
                    if response.tx.is_none() {
                        return Err(err);
                    } else {
                        response.rx = Some(ops::rx::Response {
                            status: ops::Status::Reset(err),
                            ..Default::default()
                        });
                    }
                }
            }
        }

        if let Some(tx) = request.tx.as_mut() {
            match self.send_stream.poll_request(tx, context) {
                Ok(tx) => response.tx = Some(tx),
                Err(err) => {
                    // only fail the request if rx is not set
                    if response.rx.is_none() {
                        return Err(err);
                    } else {
                        response.tx = Some(ops::tx::Response {
                            status: ops::Status::Reset(err),
                            ..Default::default()
                        });
                    }
                }
            }
        }

        Ok(response)
    }
}

impl StreamTrait for StreamImpl {
    fn new(config: StreamConfig) -> StreamImpl {
        let receive_is_closed = config.stream_id.stream_type().is_unidirectional()
            && config.stream_id.initiator() == config.local_endpoint_type;
        let send_is_closed = config.stream_id.stream_type().is_unidirectional()
            && config.stream_id.initiator() != config.local_endpoint_type;

        StreamImpl {
            stream_id: config.stream_id,
            receive_stream: ReceiveStream::new(
                receive_is_closed,
                config.incoming_connection_flow_controller,
                config.initial_receive_window,
                config.desired_flow_control_window,
            ),
            has_send: !send_is_closed,
            send_stream: SendStream::new(
                config.outgoing_connection_flow_controller,
                send_is_closed,
                config.initial_send_window,
                config.max_send_buffer_size,
            ),
        }
    }

    #[inline]
    fn stream_id(&self) -> StreamId {
        self.stream_id
    }

    // These functions are called from the packet delivery thread

    #[inline]
    fn on_data(
        &mut self,
        frame: &StreamRef,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error> {
        self.receive_stream.on_data(frame, events)
    }

    #[inline]
    fn on_stream_data_blocked(
        &mut self,
        frame: &StreamDataBlocked,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error> {
        self.receive_stream.on_stream_data_blocked(frame, events)
    }

    #[inline]
    fn on_reset(
        &mut self,
        frame: &ResetStream,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error> {
        self.receive_stream.on_reset(frame, events)
    }

    #[inline]
    fn on_max_stream_data(
        &mut self,
        frame: &MaxStreamData,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error> {
        //= https://www.rfc-editor.org/rfc/rfc9000#section-19.10
        //# An endpoint that
        //# receives a MAX_STREAM_DATA frame for a receive-only stream MUST
        //# terminate the connection with error STREAM_STATE_ERROR.
        if !self.has_send {
            return Err(transport::Error::STREAM_STATE_ERROR
                .with_reason("MAX_STREAM_DATA sent on receive-only stream"));
        }

        self.send_stream.on_max_stream_data(frame, events)
    }

    #[inline]
    fn on_stop_sending(
        &mut self,
        frame: &StopSending,
        events: &mut StreamEvents,
    ) -> Result<(), transport::Error> {
        self.send_stream.on_stop_sending(frame, events)
    }

    #[inline]
    fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A, events: &mut StreamEvents) {
        self.receive_stream.on_packet_ack(ack_set);
        self.send_stream.on_packet_ack(ack_set, events);
    }

    #[inline]
    fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A, _events: &mut StreamEvents) {
        self.receive_stream.on_packet_loss(ack_set);
        self.send_stream.on_packet_loss(ack_set);
    }

    #[inline]
    fn update_blocked_sync_period(&mut self, blocked_sync_period: Duration) {
        self.send_stream
            .update_blocked_sync_period(blocked_sync_period);
    }

    #[inline]
    fn on_timeout(&mut self, now: Timestamp) {
        self.send_stream.on_timeout(now)
    }

    #[inline]
    fn on_internal_reset(&mut self, error: StreamError, events: &mut StreamEvents) {
        self.receive_stream.on_internal_reset(error, events);
        self.send_stream.on_internal_reset(error, events);
    }

    #[inline]
    fn on_flush(&mut self, error: StreamError, events: &mut StreamEvents) {
        // flushing a receive stream is the same as resetting it
        self.receive_stream.on_internal_reset(error, events);

        // tell the send stream to flush any pending data
        self.send_stream.on_flush(error, events);
    }

    #[inline]
    fn on_transmit<W: WriteContext>(&mut self, context: &mut W) -> Result<(), OnTransmitError> {
        // Query the receiving side for outgoing data
        self.receive_stream.on_transmit(self.stream_id, context)?;
        // And the sending side
        self.send_stream.on_transmit(self.stream_id, context)
    }

    #[inline]
    fn on_connection_window_available(&mut self) {
        self.send_stream.on_connection_window_available()
    }

    // These functions are called from the client API

    fn poll_request(
        &mut self,
        request: &mut ops::Request,
        context: Option<&Context>,
    ) -> Result<ops::Response, StreamError> {
        #[cfg(debug_assertions)]
        let contract: crate::stream::contract::Request = (&*request).into();

        let result = self.poll_request_impl(request, context);

        #[cfg(debug_assertions)]
        contract.validate_response(request, result.as_ref(), context);

        result
    }
}

impl timer::Provider for StreamImpl {
    #[inline]
    fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
        self.send_stream.timers(query)?;
        Ok(())
    }
}

impl StreamInterestProvider for StreamImpl {
    #[inline]
    fn stream_interests(&self, interests: &mut StreamInterests) {
        self.send_stream.stream_interests(interests);
        self.receive_stream.stream_interests(interests);
    }
}