Skip to main content

hreq_h2/proto/streams/
prioritize.rs

1use super::store::Resolve;
2use super::*;
3
4use crate::frame::{Reason, StreamId};
5
6use crate::codec::UserError;
7use crate::codec::UserError::*;
8
9use bytes::buf::ext::{BufExt, Take};
10use std::io;
11use std::task::{Context, Poll, Waker};
12use std::{cmp, fmt, mem};
13
14/// # Warning
15///
16/// Queued streams are ordered by stream ID, as we need to ensure that
17/// lower-numbered streams are sent headers before higher-numbered ones.
18/// This is because "idle" stream IDs – those which have been initiated but
19/// have yet to receive frames – will be implicitly closed on receipt of a
20/// frame on a higher stream ID. If these queues was not ordered by stream
21/// IDs, some mechanism would be necessary to ensure that the lowest-numberedh]
22/// idle stream is opened first.
23#[derive(Debug)]
24pub(super) struct Prioritize {
25    /// Queue of streams waiting for socket capacity to send a frame.
26    pending_send: store::Queue<stream::NextSend>,
27
28    /// Queue of streams waiting for window capacity to produce data.
29    pending_capacity: store::Queue<stream::NextSendCapacity>,
30
31    /// Streams waiting for capacity due to max concurrency
32    ///
33    /// The `SendRequest` handle is `Clone`. This enables initiating requests
34    /// from many tasks. However, offering this capability while supporting
35    /// backpressure at some level is tricky. If there are many `SendRequest`
36    /// handles and a single stream becomes available, which handle gets
37    /// assigned that stream? Maybe that handle is no longer ready to send a
38    /// request.
39    ///
40    /// The strategy used is to allow each `SendRequest` handle one buffered
41    /// request. A `SendRequest` handle is ready to send a request if it has no
42    /// associated buffered requests. This is the same strategy as `mpsc` in the
43    /// futures library.
44    pending_open: store::Queue<stream::NextOpen>,
45
46    /// Connection level flow control governing sent data
47    flow: FlowControl,
48
49    /// Stream ID of the last stream opened.
50    last_opened_id: StreamId,
51
52    /// What `DATA` frame is currently being sent in the codec.
53    in_flight_data_frame: InFlightData,
54}
55
56#[derive(Debug, Eq, PartialEq)]
57enum InFlightData {
58    /// There is no `DATA` frame in flight.
59    Nothing,
60    /// There is a `DATA` frame in flight belonging to the given stream.
61    DataFrame(store::Key),
62    /// There was a `DATA` frame, but the stream's queue was since cleared.
63    Drop,
64}
65
66pub(crate) struct Prioritized<B> {
67    // The buffer
68    inner: Take<B>,
69
70    end_of_stream: bool,
71
72    // The stream that this is associated with
73    stream: store::Key,
74}
75
76// ===== impl Prioritize =====
77
78impl Prioritize {
79    pub fn new(config: &Config) -> Prioritize {
80        let mut flow = FlowControl::new();
81
82        flow.inc_window(config.remote_init_window_sz)
83            .expect("invalid initial window size");
84
85        flow.assign_capacity(config.remote_init_window_sz);
86
87        log::trace!("Prioritize::new; flow={:?}", flow);
88
89        Prioritize {
90            pending_send: store::Queue::new(),
91            pending_capacity: store::Queue::new(),
92            pending_open: store::Queue::new(),
93            flow,
94            last_opened_id: StreamId::ZERO,
95            in_flight_data_frame: InFlightData::Nothing,
96        }
97    }
98
99    /// Queue a frame to be sent to the remote
100    pub fn queue_frame<B>(
101        &mut self,
102        frame: Frame<B>,
103        buffer: &mut Buffer<Frame<B>>,
104        stream: &mut store::Ptr,
105        task: &mut Option<Waker>,
106    ) {
107        // Queue the frame in the buffer
108        stream.pending_send.push_back(buffer, frame);
109        self.schedule_send(stream, task);
110    }
111
112    pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
113        // If the stream is waiting to be opened, nothing more to do.
114        if stream.is_send_ready() {
115            log::trace!("schedule_send; {:?}", stream.id);
116            // Queue the stream
117            self.pending_send.push(stream);
118
119            // Notify the connection.
120            if let Some(task) = task.take() {
121                task.wake();
122            }
123        }
124    }
125
126    pub fn queue_open(&mut self, stream: &mut store::Ptr) {
127        self.pending_open.push(stream);
128    }
129
130    /// Send a data frame
131    pub fn send_data<B>(
132        &mut self,
133        frame: frame::Data<B>,
134        buffer: &mut Buffer<Frame<B>>,
135        stream: &mut store::Ptr,
136        counts: &mut Counts,
137        task: &mut Option<Waker>,
138    ) -> Result<(), UserError>
139    where
140        B: Buf,
141    {
142        let sz = frame.payload().remaining();
143
144        if sz > MAX_WINDOW_SIZE as usize {
145            return Err(UserError::PayloadTooBig);
146        }
147
148        let sz = sz as WindowSize;
149
150        if !stream.state.is_send_streaming() {
151            if stream.state.is_closed() {
152                return Err(InactiveStreamId);
153            } else {
154                return Err(UnexpectedFrameType);
155            }
156        }
157
158        // Update the buffered data counter
159        stream.buffered_send_data += sz;
160
161        log::trace!(
162            "send_data; sz={}; buffered={}; requested={}",
163            sz,
164            stream.buffered_send_data,
165            stream.requested_send_capacity
166        );
167
168        // Implicitly request more send capacity if not enough has been
169        // requested yet.
170        if stream.requested_send_capacity < stream.buffered_send_data {
171            // Update the target requested capacity
172            stream.requested_send_capacity = stream.buffered_send_data;
173
174            self.try_assign_capacity(stream);
175        }
176
177        if frame.is_end_stream() {
178            stream.state.send_close();
179            self.reserve_capacity(0, stream, counts);
180        }
181
182        log::trace!(
183            "send_data (2); available={}; buffered={}",
184            stream.send_flow.available(),
185            stream.buffered_send_data
186        );
187
188        // The `stream.buffered_send_data == 0` check is here so that, if a zero
189        // length data frame is queued to the front (there is no previously
190        // queued data), it gets sent out immediately even if there is no
191        // available send window.
192        //
193        // Sending out zero length data frames can be done to signal
194        // end-of-stream.
195        //
196        if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
197            // The stream currently has capacity to send the data frame, so
198            // queue it up and notify the connection task.
199            self.queue_frame(frame.into(), buffer, stream, task);
200        } else {
201            // The stream has no capacity to send the frame now, save it but
202            // don't notify the connection task. Once additional capacity
203            // becomes available, the frame will be flushed.
204            stream.pending_send.push_back(buffer, frame.into());
205        }
206
207        Ok(())
208    }
209
210    /// Request capacity to send data
211    pub fn reserve_capacity(
212        &mut self,
213        capacity: WindowSize,
214        stream: &mut store::Ptr,
215        counts: &mut Counts,
216    ) {
217        log::trace!(
218            "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
219            stream.id,
220            capacity,
221            capacity + stream.buffered_send_data,
222            stream.requested_send_capacity
223        );
224
225        // Actual capacity is `capacity` + the current amount of buffered data.
226        // If it were less, then we could never send out the buffered data.
227        let capacity = capacity + stream.buffered_send_data;
228
229        if capacity == stream.requested_send_capacity {
230            // Nothing to do
231        } else if capacity < stream.requested_send_capacity {
232            // Update the target requested capacity
233            stream.requested_send_capacity = capacity;
234
235            // Currently available capacity assigned to the stream
236            let available = stream.send_flow.available().as_size();
237
238            // If the stream has more assigned capacity than requested, reclaim
239            // some for the connection
240            if available > capacity {
241                let diff = available - capacity;
242
243                stream.send_flow.claim_capacity(diff);
244
245                self.assign_connection_capacity(diff, stream, counts);
246            }
247        } else {
248            // If trying to *add* capacity, but the stream send side is closed,
249            // there's nothing to be done.
250            if stream.state.is_send_closed() {
251                return;
252            }
253
254            // Update the target requested capacity
255            stream.requested_send_capacity = capacity;
256
257            // Try to assign additional capacity to the stream. If none is
258            // currently available, the stream will be queued to receive some
259            // when more becomes available.
260            self.try_assign_capacity(stream);
261        }
262    }
263
264    pub fn recv_stream_window_update(
265        &mut self,
266        inc: WindowSize,
267        stream: &mut store::Ptr,
268    ) -> Result<(), Reason> {
269        log::trace!(
270            "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
271            stream.id,
272            stream.state,
273            inc,
274            flow = stream.send_flow
275        );
276
277        if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
278            // We can't send any data, so don't bother doing anything else.
279            return Ok(());
280        }
281
282        // Update the stream level flow control.
283        stream.send_flow.inc_window(inc)?;
284
285        // If the stream is waiting on additional capacity, then this will
286        // assign it (if available on the connection) and notify the producer
287        self.try_assign_capacity(stream);
288
289        Ok(())
290    }
291
292    pub fn recv_connection_window_update(
293        &mut self,
294        inc: WindowSize,
295        store: &mut Store,
296        counts: &mut Counts,
297    ) -> Result<(), Reason> {
298        // Update the connection's window
299        self.flow.inc_window(inc)?;
300
301        self.assign_connection_capacity(inc, store, counts);
302        Ok(())
303    }
304
305    /// Reclaim all capacity assigned to the stream and re-assign it to the
306    /// connection
307    pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
308        let available = stream.send_flow.available().as_size();
309        stream.send_flow.claim_capacity(available);
310        // Re-assign all capacity to the connection
311        self.assign_connection_capacity(available, stream, counts);
312    }
313
314    /// Reclaim just reserved capacity, not buffered capacity, and re-assign
315    /// it to the connection
316    pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
317        // only reclaim requested capacity that isn't already buffered
318        if stream.requested_send_capacity > stream.buffered_send_data {
319            let reserved = stream.requested_send_capacity - stream.buffered_send_data;
320
321            stream.send_flow.claim_capacity(reserved);
322            self.assign_connection_capacity(reserved, stream, counts);
323        }
324    }
325
326    pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
327        while let Some(stream) = self.pending_capacity.pop(store) {
328            counts.transition(stream, |_, stream| {
329                log::trace!("clear_pending_capacity; stream={:?}", stream.id);
330            })
331        }
332    }
333
334    pub fn assign_connection_capacity<R>(
335        &mut self,
336        inc: WindowSize,
337        store: &mut R,
338        counts: &mut Counts,
339    ) where
340        R: Resolve,
341    {
342        log::trace!("assign_connection_capacity; inc={}", inc);
343
344        self.flow.assign_capacity(inc);
345
346        // Assign newly acquired capacity to streams pending capacity.
347        while self.flow.available() > 0 {
348            let stream = match self.pending_capacity.pop(store) {
349                Some(stream) => stream,
350                None => return,
351            };
352
353            // Streams pending capacity may have been reset before capacity
354            // became available. In that case, the stream won't want any
355            // capacity, and so we shouldn't "transition" on it, but just evict
356            // it and continue the loop.
357            if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
358                continue;
359            }
360
361            counts.transition(stream, |_, mut stream| {
362                // Try to assign capacity to the stream. This will also re-queue the
363                // stream if there isn't enough connection level capacity to fulfill
364                // the capacity request.
365                self.try_assign_capacity(&mut stream);
366            })
367        }
368    }
369
370    /// Request capacity to send data
371    fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
372        let total_requested = stream.requested_send_capacity;
373
374        // Total requested should never go below actual assigned
375        // (Note: the window size can go lower than assigned)
376        debug_assert!(total_requested >= stream.send_flow.available());
377
378        // The amount of additional capacity that the stream requests.
379        // Don't assign more than the window has available!
380        let additional = cmp::min(
381            total_requested - stream.send_flow.available().as_size(),
382            // Can't assign more than what is available
383            stream.send_flow.window_size() - stream.send_flow.available().as_size(),
384        );
385
386        log::trace!(
387            "try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
388            stream.id,
389            total_requested,
390            additional,
391            buffered = stream.buffered_send_data,
392            window = stream.send_flow.window_size(),
393            conn = self.flow.available()
394        );
395
396        if additional == 0 {
397            // Nothing more to do
398            return;
399        }
400
401        // If the stream has requested capacity, then it must be in the
402        // streaming state (more data could be sent) or there is buffered data
403        // waiting to be sent.
404        debug_assert!(
405            stream.state.is_send_streaming() || stream.buffered_send_data > 0,
406            "state={:?}",
407            stream.state
408        );
409
410        // The amount of currently available capacity on the connection
411        let conn_available = self.flow.available().as_size();
412
413        // First check if capacity is immediately available
414        if conn_available > 0 {
415            // The amount of capacity to assign to the stream
416            // TODO: Should prioritization factor into this?
417            let assign = cmp::min(conn_available, additional);
418
419            log::trace!("  assigning; stream={:?}, capacity={}", stream.id, assign,);
420
421            // Assign the capacity to the stream
422            stream.assign_capacity(assign);
423
424            // Claim the capacity from the connection
425            self.flow.claim_capacity(assign);
426        }
427
428        log::trace!(
429            "try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
430            stream.send_flow.available(),
431            stream.requested_send_capacity,
432            stream.buffered_send_data,
433            stream.send_flow.has_unavailable()
434        );
435
436        if stream.send_flow.available() < stream.requested_send_capacity
437            && stream.send_flow.has_unavailable()
438        {
439            // The stream requires additional capacity and the stream's
440            // window has available capacity, but the connection window
441            // does not.
442            //
443            // In this case, the stream needs to be queued up for when the
444            // connection has more capacity.
445            self.pending_capacity.push(stream);
446        }
447
448        // If data is buffered and the stream is send ready, then
449        // schedule the stream for execution
450        if stream.buffered_send_data > 0 && stream.is_send_ready() {
451            // TODO: This assertion isn't *exactly* correct. There can still be
452            // buffered send data while the stream's pending send queue is
453            // empty. This can happen when a large data frame is in the process
454            // of being **partially** sent. Once the window has been sent, the
455            // data frame will be returned to the prioritization layer to be
456            // re-scheduled.
457            //
458            // That said, it would be nice to figure out how to make this
459            // assertion correctly.
460            //
461            // debug_assert!(!stream.pending_send.is_empty());
462
463            self.pending_send.push(stream);
464        }
465    }
466
467    pub fn poll_complete<T, B>(
468        &mut self,
469        cx: &mut Context,
470        buffer: &mut Buffer<Frame<B>>,
471        store: &mut Store,
472        counts: &mut Counts,
473        dst: &mut Codec<T, Prioritized<B>>,
474    ) -> Poll<io::Result<()>>
475    where
476        T: AsyncWrite + Unpin,
477        B: Buf,
478    {
479        // Ensure codec is ready
480        ready!(dst.poll_ready(cx))?;
481
482        // Reclaim any frame that has previously been written
483        self.reclaim_frame(buffer, store, dst);
484
485        // The max frame length
486        let max_frame_len = dst.max_send_frame_size();
487
488        log::trace!("poll_complete");
489
490        loop {
491            self.schedule_pending_open(store, counts);
492
493            match self.pop_frame(buffer, store, max_frame_len, counts) {
494                Some(frame) => {
495                    log::trace!("writing frame={:?}", frame);
496
497                    debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
498                    if let Frame::Data(ref frame) = frame {
499                        self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
500                    }
501                    dst.buffer(frame).expect("invalid frame");
502
503                    // Ensure the codec is ready to try the loop again.
504                    ready!(dst.poll_ready(cx))?;
505
506                    // Because, always try to reclaim...
507                    self.reclaim_frame(buffer, store, dst);
508                }
509                None => {
510                    // Try to flush the codec.
511                    ready!(dst.flush(cx))?;
512
513                    // This might release a data frame...
514                    if !self.reclaim_frame(buffer, store, dst) {
515                        return Poll::Ready(Ok(()));
516                    }
517
518                    // No need to poll ready as poll_complete() does this for
519                    // us...
520                }
521            }
522        }
523    }
524
525    /// Tries to reclaim a pending data frame from the codec.
526    ///
527    /// Returns true if a frame was reclaimed.
528    ///
529    /// When a data frame is written to the codec, it may not be written in its
530    /// entirety (large chunks are split up into potentially many data frames).
531    /// In this case, the stream needs to be reprioritized.
532    fn reclaim_frame<T, B>(
533        &mut self,
534        buffer: &mut Buffer<Frame<B>>,
535        store: &mut Store,
536        dst: &mut Codec<T, Prioritized<B>>,
537    ) -> bool
538    where
539        B: Buf,
540    {
541        log::trace!("try reclaim frame");
542
543        // First check if there are any data chunks to take back
544        if let Some(frame) = dst.take_last_data_frame() {
545            log::trace!(
546                "  -> reclaimed; frame={:?}; sz={}",
547                frame,
548                frame.payload().inner.get_ref().remaining()
549            );
550
551            let mut eos = false;
552            let key = frame.payload().stream;
553
554            match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
555                InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
556                InFlightData::Drop => {
557                    log::trace!("not reclaiming frame for cancelled stream");
558                    return false;
559                }
560                InFlightData::DataFrame(k) => {
561                    debug_assert_eq!(k, key);
562                }
563            }
564
565            let mut frame = frame.map(|prioritized| {
566                // TODO: Ensure fully written
567                eos = prioritized.end_of_stream;
568                prioritized.inner.into_inner()
569            });
570
571            if frame.payload().has_remaining() {
572                let mut stream = store.resolve(key);
573
574                if eos {
575                    frame.set_end_stream(true);
576                }
577
578                self.push_back_frame(frame.into(), buffer, &mut stream);
579
580                return true;
581            }
582        }
583
584        false
585    }
586
587    /// Push the frame to the front of the stream's deque, scheduling the
588    /// stream if needed.
589    fn push_back_frame<B>(
590        &mut self,
591        frame: Frame<B>,
592        buffer: &mut Buffer<Frame<B>>,
593        stream: &mut store::Ptr,
594    ) {
595        // Push the frame to the front of the stream's deque
596        stream.pending_send.push_front(buffer, frame);
597
598        // If needed, schedule the sender
599        if stream.send_flow.available() > 0 {
600            debug_assert!(!stream.pending_send.is_empty());
601            self.pending_send.push(stream);
602        }
603    }
604
605    pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
606        log::trace!("clear_queue; stream={:?}", stream.id);
607
608        // TODO: make this more efficient?
609        while let Some(frame) = stream.pending_send.pop_front(buffer) {
610            log::trace!("dropping; frame={:?}", frame);
611        }
612
613        stream.buffered_send_data = 0;
614        stream.requested_send_capacity = 0;
615        if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
616            if stream.key() == key {
617                // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
618                self.in_flight_data_frame = InFlightData::Drop;
619            }
620        }
621    }
622
623    pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
624        while let Some(stream) = self.pending_send.pop(store) {
625            let is_pending_reset = stream.is_pending_reset_expiration();
626            counts.transition_after(stream, is_pending_reset);
627        }
628    }
629
630    pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
631        while let Some(stream) = self.pending_open.pop(store) {
632            let is_pending_reset = stream.is_pending_reset_expiration();
633            counts.transition_after(stream, is_pending_reset);
634        }
635    }
636
637    fn pop_frame<B>(
638        &mut self,
639        buffer: &mut Buffer<Frame<B>>,
640        store: &mut Store,
641        max_len: usize,
642        counts: &mut Counts,
643    ) -> Option<Frame<Prioritized<B>>>
644    where
645        B: Buf,
646    {
647        log::trace!("pop_frame");
648
649        loop {
650            match self.pending_send.pop(store) {
651                Some(mut stream) => {
652                    log::trace!(
653                        "pop_frame; stream={:?}; stream.state={:?}",
654                        stream.id,
655                        stream.state
656                    );
657
658                    // It's possible that this stream, besides having data to send,
659                    // is also queued to send a reset, and thus is already in the queue
660                    // to wait for "some time" after a reset.
661                    //
662                    // To be safe, we just always ask the stream.
663                    let is_pending_reset = stream.is_pending_reset_expiration();
664
665                    log::trace!(
666                        " --> stream={:?}; is_pending_reset={:?};",
667                        stream.id,
668                        is_pending_reset
669                    );
670
671                    let frame = match stream.pending_send.pop_front(buffer) {
672                        Some(Frame::Data(mut frame)) => {
673                            // Get the amount of capacity remaining for stream's
674                            // window.
675                            let stream_capacity = stream.send_flow.available();
676                            let sz = frame.payload().remaining();
677
678                            log::trace!(
679                                " --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
680                                 available={}; requested={}; buffered={};",
681                                frame.stream_id(),
682                                sz,
683                                eos = frame.is_end_stream(),
684                                window = stream_capacity,
685                                available = stream.send_flow.available(),
686                                requested = stream.requested_send_capacity,
687                                buffered = stream.buffered_send_data,
688                            );
689
690                            // Zero length data frames always have capacity to
691                            // be sent.
692                            if sz > 0 && stream_capacity == 0 {
693                                log::trace!(
694                                    " --> stream capacity is 0; requested={}",
695                                    stream.requested_send_capacity
696                                );
697
698                                // Ensure that the stream is waiting for
699                                // connection level capacity
700                                //
701                                // TODO: uncomment
702                                // debug_assert!(stream.is_pending_send_capacity);
703
704                                // The stream has no more capacity, this can
705                                // happen if the remote reduced the stream
706                                // window. In this case, we need to buffer the
707                                // frame and wait for a window update...
708                                stream.pending_send.push_front(buffer, frame.into());
709
710                                continue;
711                            }
712
713                            // Only send up to the max frame length
714                            let len = cmp::min(sz, max_len);
715
716                            // Only send up to the stream's window capacity
717                            let len =
718                                cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
719
720                            // There *must* be be enough connection level
721                            // capacity at this point.
722                            debug_assert!(len <= self.flow.window_size());
723
724                            log::trace!(" --> sending data frame; len={}", len);
725
726                            // Update the flow control
727                            log::trace!(" -- updating stream flow --");
728                            stream.send_flow.send_data(len);
729
730                            // Decrement the stream's buffered data counter
731                            debug_assert!(stream.buffered_send_data >= len);
732                            stream.buffered_send_data -= len;
733                            stream.requested_send_capacity -= len;
734
735                            // Assign the capacity back to the connection that
736                            // was just consumed from the stream in the previous
737                            // line.
738                            self.flow.assign_capacity(len);
739
740                            log::trace!(" -- updating connection flow --");
741                            self.flow.send_data(len);
742
743                            // Wrap the frame's data payload to ensure that the
744                            // correct amount of data gets written.
745
746                            let eos = frame.is_end_stream();
747                            let len = len as usize;
748
749                            if frame.payload().remaining() > len {
750                                frame.set_end_stream(false);
751                            }
752
753                            Frame::Data(frame.map(|buf| Prioritized {
754                                inner: buf.take(len),
755                                end_of_stream: eos,
756                                stream: stream.key(),
757                            }))
758                        }
759                        Some(Frame::PushPromise(pp)) => {
760                            let mut pushed =
761                                stream.store_mut().find_mut(&pp.promised_id()).unwrap();
762                            pushed.is_pending_push = false;
763                            // Transition stream from pending_push to pending_open
764                            // if possible
765                            if !pushed.pending_send.is_empty() {
766                                if counts.can_inc_num_send_streams() {
767                                    counts.inc_num_send_streams(&mut pushed);
768                                    self.pending_send.push(&mut pushed);
769                                } else {
770                                    self.queue_open(&mut pushed);
771                                }
772                            }
773                            Frame::PushPromise(pp)
774                        }
775                        Some(frame) => frame.map(|_| {
776                            unreachable!(
777                                "Frame::map closure will only be called \
778                                 on DATA frames."
779                            )
780                        }),
781                        None => {
782                            if let Some(reason) = stream.state.get_scheduled_reset() {
783                                stream.state.set_reset(reason);
784
785                                let frame = frame::Reset::new(stream.id, reason);
786                                Frame::Reset(frame)
787                            } else {
788                                // If the stream receives a RESET from the peer, it may have
789                                // had data buffered to be sent, but all the frames are cleared
790                                // in clear_queue(). Instead of doing O(N) traversal through queue
791                                // to remove, lets just ignore the stream here.
792                                log::trace!("removing dangling stream from pending_send");
793                                // Since this should only happen as a consequence of `clear_queue`,
794                                // we must be in a closed state of some kind.
795                                debug_assert!(stream.state.is_closed());
796                                counts.transition_after(stream, is_pending_reset);
797                                continue;
798                            }
799                        }
800                    };
801
802                    log::trace!("pop_frame; frame={:?}", frame);
803
804                    if cfg!(debug_assertions) && stream.state.is_idle() {
805                        debug_assert!(stream.id > self.last_opened_id);
806                        self.last_opened_id = stream.id;
807                    }
808
809                    if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
810                        // TODO: Only requeue the sender IF it is ready to send
811                        // the next frame. i.e. don't requeue it if the next
812                        // frame is a data frame and the stream does not have
813                        // any more capacity.
814                        self.pending_send.push(&mut stream);
815                    }
816
817                    counts.transition_after(stream, is_pending_reset);
818
819                    return Some(frame);
820                }
821                None => return None,
822            }
823        }
824    }
825
826    fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
827        log::trace!("schedule_pending_open");
828        // check for any pending open streams
829        while counts.can_inc_num_send_streams() {
830            if let Some(mut stream) = self.pending_open.pop(store) {
831                log::trace!("schedule_pending_open; stream={:?}", stream.id);
832
833                counts.inc_num_send_streams(&mut stream);
834                self.pending_send.push(&mut stream);
835                stream.notify_send();
836            } else {
837                return;
838            }
839        }
840    }
841}
842
843// ===== impl Prioritized =====
844
845impl<B> Buf for Prioritized<B>
846where
847    B: Buf,
848{
849    fn remaining(&self) -> usize {
850        self.inner.remaining()
851    }
852
853    fn bytes(&self) -> &[u8] {
854        self.inner.bytes()
855    }
856
857    fn advance(&mut self, cnt: usize) {
858        self.inner.advance(cnt)
859    }
860}
861
862impl<B: Buf> fmt::Debug for Prioritized<B> {
863    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
864        fmt.debug_struct("Prioritized")
865            .field("remaining", &self.inner.get_ref().remaining())
866            .field("end_of_stream", &self.end_of_stream)
867            .field("stream", &self.stream)
868            .finish()
869    }
870}