ant_quic/connection/streams/
send.rs

1use bytes::Bytes;
2use thiserror::Error;
3
4use crate::{VarInt, connection::send_buffer::SendBuffer, frame};
5
6#[derive(Debug)]
7pub(super) struct Send {
8    pub(super) max_data: u64,
9    pub(super) state: SendState,
10    pub(super) pending: SendBuffer,
11    pub(super) priority: i32,
12    /// Whether a frame containing a FIN bit must be transmitted, even if we don't have any new data
13    pub(super) fin_pending: bool,
14    /// Whether this stream is in the `connection_blocked` list of `Streams`
15    pub(super) connection_blocked: bool,
16    /// The reason the peer wants us to stop, if `STOP_SENDING` was received
17    pub(super) stop_reason: Option<VarInt>,
18}
19
20impl Send {
21    pub(super) fn new(max_data: VarInt) -> Box<Self> {
22        Box::new(Self {
23            max_data: max_data.into(),
24            state: SendState::Ready,
25            pending: SendBuffer::new(),
26            priority: 0,
27            fin_pending: false,
28            connection_blocked: false,
29            stop_reason: None,
30        })
31    }
32
33    /// Whether the stream has been reset
34    pub(super) fn is_reset(&self) -> bool {
35        matches!(self.state, SendState::ResetSent)
36    }
37
38    pub(super) fn finish(&mut self) -> Result<(), FinishError> {
39        if let Some(error_code) = self.stop_reason {
40            Err(FinishError::Stopped(error_code))
41        } else if self.state == SendState::Ready {
42            self.state = SendState::DataSent {
43                finish_acked: false,
44            };
45            self.fin_pending = true;
46            Ok(())
47        } else {
48            Err(FinishError::ClosedStream)
49        }
50    }
51
52    pub(super) fn write<S: BytesSource>(
53        &mut self,
54        source: &mut S,
55        limit: u64,
56    ) -> Result<Written, WriteError> {
57        if !self.is_writable() {
58            return Err(WriteError::ClosedStream);
59        }
60        if let Some(error_code) = self.stop_reason {
61            return Err(WriteError::Stopped(error_code));
62        }
63        let budget = self.max_data - self.pending.offset();
64        if budget == 0 {
65            return Err(WriteError::Blocked);
66        }
67        let mut limit = limit.min(budget) as usize;
68
69        let mut result = Written::default();
70        loop {
71            let (chunk, chunks_consumed) = source.pop_chunk(limit);
72            result.chunks += chunks_consumed;
73            result.bytes += chunk.len();
74
75            if chunk.is_empty() {
76                break;
77            }
78
79            limit -= chunk.len();
80            self.pending.write(chunk);
81        }
82
83        Ok(result)
84    }
85
86    /// Update stream state due to a reset sent by the local application
87    pub(super) fn reset(&mut self) {
88        use SendState::*;
89        if let DataSent { .. } | Ready = self.state {
90            self.state = ResetSent;
91        }
92    }
93
94    /// Handle STOP_SENDING
95    ///
96    /// Returns true if the stream was stopped due to this frame, and false
97    /// if it had been stopped before
98    pub(super) fn try_stop(&mut self, error_code: VarInt) -> bool {
99        if self.stop_reason.is_none() {
100            self.stop_reason = Some(error_code);
101            true
102        } else {
103            false
104        }
105    }
106
107    /// Returns whether the stream has been finished and all data has been acknowledged by the peer
108    pub(super) fn ack(&mut self, frame: frame::StreamMeta) -> bool {
109        self.pending.ack(frame.offsets);
110        match self.state {
111            SendState::DataSent {
112                ref mut finish_acked,
113            } => {
114                *finish_acked |= frame.fin;
115                *finish_acked && self.pending.is_fully_acked()
116            }
117            _ => false,
118        }
119    }
120
121    /// Handle increase to stream-level flow control limit
122    ///
123    /// Returns whether the stream was unblocked
124    pub(super) fn increase_max_data(&mut self, offset: u64) -> bool {
125        if offset <= self.max_data || self.state != SendState::Ready {
126            return false;
127        }
128        let was_blocked = self.pending.offset() == self.max_data;
129        self.max_data = offset;
130        was_blocked
131    }
132
133    pub(super) fn offset(&self) -> u64 {
134        self.pending.offset()
135    }
136
137    pub(super) fn is_pending(&self) -> bool {
138        self.pending.has_unsent_data() || self.fin_pending
139    }
140
141    pub(super) fn is_writable(&self) -> bool {
142        matches!(self.state, SendState::Ready)
143    }
144}
145
146/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
147///
148/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
149/// a configured limit.
150pub(crate) struct BytesArray<'a> {
151    /// The wrapped slice of `Bytes`
152    chunks: &'a mut [Bytes],
153    /// The amount of chunks consumed from this source
154    consumed: usize,
155}
156
157impl<'a> BytesArray<'a> {
158    pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
159        Self {
160            chunks,
161            consumed: 0,
162        }
163    }
164}
165
166impl BytesSource for BytesArray<'_> {
167    fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
168        // The loop exists to skip empty chunks while still marking them as
169        // consumed
170        let mut chunks_consumed = 0;
171
172        while self.consumed < self.chunks.len() {
173            let chunk = &mut self.chunks[self.consumed];
174
175            if chunk.len() <= limit {
176                let chunk = std::mem::take(chunk);
177                self.consumed += 1;
178                chunks_consumed += 1;
179                if chunk.is_empty() {
180                    continue;
181                }
182                return (chunk, chunks_consumed);
183            } else if limit > 0 {
184                let chunk = chunk.split_to(limit);
185                return (chunk, chunks_consumed);
186            } else {
187                break;
188            }
189        }
190
191        (Bytes::new(), chunks_consumed)
192    }
193}
194
195/// A [`BytesSource`] implementation for `&[u8]`
196///
197/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
198/// created from a reference. This allows to defer the allocation until it is
199/// known how much data needs to be copied.
200pub(crate) struct ByteSlice<'a> {
201    /// The wrapped byte slice
202    data: &'a [u8],
203}
204
205impl<'a> ByteSlice<'a> {
206    pub(crate) fn from_slice(data: &'a [u8]) -> Self {
207        Self { data }
208    }
209}
210
211impl BytesSource for ByteSlice<'_> {
212    fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
213        let limit = limit.min(self.data.len());
214        if limit == 0 {
215            return (Bytes::new(), 0);
216        }
217
218        let chunk = Bytes::from(self.data[..limit].to_owned());
219        self.data = &self.data[chunk.len()..];
220
221        let chunks_consumed = usize::from(self.data.is_empty());
222        (chunk, chunks_consumed)
223    }
224}
225
226/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
227///
228/// The purpose of this data type is to defer conversion as long as possible,
229/// so that no heap allocation is required in case no data is writable.
230pub(super) trait BytesSource {
231    /// Returns the next chunk from the source of owned chunks.
232    ///
233    /// This method will consume parts of the source.
234    /// Calling it will yield `Bytes` elements up to the configured `limit`.
235    ///
236    /// Returns:
237    /// - A `Bytes` object containing the data (empty if limit is zero or no more data is available)
238    /// - The number of complete chunks consumed from the source
239    fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
240}
241
242/// Indicates how many bytes and chunks had been transferred in a write operation
243#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
244pub struct Written {
245    /// The amount of bytes which had been written
246    pub bytes: usize,
247    /// The amount of full chunks which had been written
248    ///
249    /// If a chunk was only partially written, it will not be counted by this field.
250    pub chunks: usize,
251}
252
253/// Errors triggered while writing to a send stream
254#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
255pub enum WriteError {
256    /// The peer is not able to accept additional data, or the connection is congested.
257    ///
258    /// If the peer issues additional flow control credit, a [`StreamEvent::Writable`] event will
259    /// be generated, indicating that retrying the write might succeed.
260    ///
261    /// [`StreamEvent::Writable`]: crate::StreamEvent::Writable
262    #[error("unable to accept further writes")]
263    Blocked,
264    /// The peer is no longer accepting data on this stream, and it has been implicitly reset. The
265    /// stream cannot be finished or further written to.
266    ///
267    /// Carries an application-defined error code.
268    ///
269    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
270    #[error("stopped by peer: code {0}")]
271    Stopped(VarInt),
272    /// The stream has not been opened or has already been finished or reset
273    #[error("closed stream")]
274    ClosedStream,
275    /// The connection was closed
276    #[error("connection closed")]
277    ConnectionClosed,
278}
279
280/// Stream sending state
281#[derive(Debug, Copy, Clone, Eq, PartialEq)]
282pub(super) enum SendState {
283    /// Sending new data
284    Ready,
285    /// Stream was finished; now sending retransmits only
286    DataSent { finish_acked: bool },
287    /// Sent RESET
288    ResetSent,
289}
290
291/// Reasons why attempting to finish a stream might fail
292#[derive(Debug, Error, Clone, PartialEq, Eq)]
293pub enum FinishError {
294    /// The peer is no longer accepting data on this stream. No
295    /// [`StreamEvent::Finished`] event will be emitted for this stream.
296    ///
297    /// Carries an application-defined error code.
298    ///
299    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
300    #[error("stopped by peer: code {0}")]
301    Stopped(VarInt),
302    /// The stream has not been opened or was already finished or reset
303    #[error("closed stream")]
304    ClosedStream,
305    /// The connection was closed
306    #[error("connection closed")]
307    ConnectionClosed,
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[test]
315    fn bytes_array() {
316        let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
317        for limit in 0..full.len() {
318            let mut chunks = [
319                Bytes::from_static(b""),
320                Bytes::from_static(b"Hello "),
321                Bytes::from_static(b"Wo"),
322                Bytes::from_static(b""),
323                Bytes::from_static(b"r"),
324                Bytes::from_static(b"ld"),
325                Bytes::from_static(b""),
326                Bytes::from_static(b" 12345678"),
327                Bytes::from_static(b"9 ABCDE"),
328                Bytes::from_static(b"F"),
329                Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
330            ];
331            let num_chunks = chunks.len();
332            let last_chunk_len = chunks[chunks.len() - 1].len();
333
334            let mut array = BytesArray::from_chunks(&mut chunks);
335
336            let mut buf = Vec::new();
337            let mut chunks_popped = 0;
338            let mut chunks_consumed = 0;
339            let mut remaining = limit;
340            loop {
341                let (chunk, consumed) = array.pop_chunk(remaining);
342                chunks_consumed += consumed;
343
344                if !chunk.is_empty() {
345                    buf.extend_from_slice(&chunk);
346                    remaining -= chunk.len();
347                    chunks_popped += 1;
348                } else {
349                    break;
350                }
351            }
352
353            assert_eq!(&buf[..], &full[..limit]);
354
355            if limit == full.len() {
356                // Full consumption of the last chunk
357                assert_eq!(chunks_consumed, num_chunks);
358                // Since there are empty chunks, we consume more than there are popped
359                assert_eq!(chunks_consumed, chunks_popped + 3);
360            } else if limit > full.len() - last_chunk_len {
361                // Partial consumption of the last chunk
362                assert_eq!(chunks_consumed, num_chunks - 1);
363                assert_eq!(chunks_consumed, chunks_popped + 2);
364            }
365        }
366    }
367
368    #[test]
369    fn byte_slice() {
370        let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
371        for limit in 0..full.len() {
372            let mut array = ByteSlice::from_slice(&full[..]);
373
374            let mut buf = Vec::new();
375            let mut chunks_popped = 0;
376            let mut chunks_consumed = 0;
377            let mut remaining = limit;
378            loop {
379                let (chunk, consumed) = array.pop_chunk(remaining);
380                chunks_consumed += consumed;
381
382                if !chunk.is_empty() {
383                    buf.extend_from_slice(&chunk);
384                    remaining -= chunk.len();
385                    chunks_popped += 1;
386                } else {
387                    break;
388                }
389            }
390
391            assert_eq!(&buf[..], &full[..limit]);
392            if limit != 0 {
393                assert_eq!(chunks_popped, 1);
394            } else {
395                assert_eq!(chunks_popped, 0);
396            }
397
398            if limit == full.len() {
399                assert_eq!(chunks_consumed, 1);
400            } else {
401                assert_eq!(chunks_consumed, 0);
402            }
403        }
404    }
405}