hreq_h2/proto/streams/prioritize.rs
1use super::store::Resolve;
2use super::*;
3
4use crate::frame::{Reason, StreamId};
5
6use crate::codec::UserError;
7use crate::codec::UserError::*;
8
9use bytes::buf::ext::{BufExt, Take};
10use std::io;
11use std::task::{Context, Poll, Waker};
12use std::{cmp, fmt, mem};
13
14/// # Warning
15///
16/// Queued streams are ordered by stream ID, as we need to ensure that
17/// lower-numbered streams are sent headers before higher-numbered ones.
18/// This is because "idle" stream IDs – those which have been initiated but
19/// have yet to receive frames – will be implicitly closed on receipt of a
20/// frame on a higher stream ID. If these queues was not ordered by stream
21/// IDs, some mechanism would be necessary to ensure that the lowest-numberedh]
22/// idle stream is opened first.
23#[derive(Debug)]
24pub(super) struct Prioritize {
25 /// Queue of streams waiting for socket capacity to send a frame.
26 pending_send: store::Queue<stream::NextSend>,
27
28 /// Queue of streams waiting for window capacity to produce data.
29 pending_capacity: store::Queue<stream::NextSendCapacity>,
30
31 /// Streams waiting for capacity due to max concurrency
32 ///
33 /// The `SendRequest` handle is `Clone`. This enables initiating requests
34 /// from many tasks. However, offering this capability while supporting
35 /// backpressure at some level is tricky. If there are many `SendRequest`
36 /// handles and a single stream becomes available, which handle gets
37 /// assigned that stream? Maybe that handle is no longer ready to send a
38 /// request.
39 ///
40 /// The strategy used is to allow each `SendRequest` handle one buffered
41 /// request. A `SendRequest` handle is ready to send a request if it has no
42 /// associated buffered requests. This is the same strategy as `mpsc` in the
43 /// futures library.
44 pending_open: store::Queue<stream::NextOpen>,
45
46 /// Connection level flow control governing sent data
47 flow: FlowControl,
48
49 /// Stream ID of the last stream opened.
50 last_opened_id: StreamId,
51
52 /// What `DATA` frame is currently being sent in the codec.
53 in_flight_data_frame: InFlightData,
54}
55
56#[derive(Debug, Eq, PartialEq)]
57enum InFlightData {
58 /// There is no `DATA` frame in flight.
59 Nothing,
60 /// There is a `DATA` frame in flight belonging to the given stream.
61 DataFrame(store::Key),
62 /// There was a `DATA` frame, but the stream's queue was since cleared.
63 Drop,
64}
65
66pub(crate) struct Prioritized<B> {
67 // The buffer
68 inner: Take<B>,
69
70 end_of_stream: bool,
71
72 // The stream that this is associated with
73 stream: store::Key,
74}
75
76// ===== impl Prioritize =====
77
78impl Prioritize {
79 pub fn new(config: &Config) -> Prioritize {
80 let mut flow = FlowControl::new();
81
82 flow.inc_window(config.remote_init_window_sz)
83 .expect("invalid initial window size");
84
85 flow.assign_capacity(config.remote_init_window_sz);
86
87 log::trace!("Prioritize::new; flow={:?}", flow);
88
89 Prioritize {
90 pending_send: store::Queue::new(),
91 pending_capacity: store::Queue::new(),
92 pending_open: store::Queue::new(),
93 flow,
94 last_opened_id: StreamId::ZERO,
95 in_flight_data_frame: InFlightData::Nothing,
96 }
97 }
98
99 /// Queue a frame to be sent to the remote
100 pub fn queue_frame<B>(
101 &mut self,
102 frame: Frame<B>,
103 buffer: &mut Buffer<Frame<B>>,
104 stream: &mut store::Ptr,
105 task: &mut Option<Waker>,
106 ) {
107 // Queue the frame in the buffer
108 stream.pending_send.push_back(buffer, frame);
109 self.schedule_send(stream, task);
110 }
111
112 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
113 // If the stream is waiting to be opened, nothing more to do.
114 if stream.is_send_ready() {
115 log::trace!("schedule_send; {:?}", stream.id);
116 // Queue the stream
117 self.pending_send.push(stream);
118
119 // Notify the connection.
120 if let Some(task) = task.take() {
121 task.wake();
122 }
123 }
124 }
125
126 pub fn queue_open(&mut self, stream: &mut store::Ptr) {
127 self.pending_open.push(stream);
128 }
129
130 /// Send a data frame
131 pub fn send_data<B>(
132 &mut self,
133 frame: frame::Data<B>,
134 buffer: &mut Buffer<Frame<B>>,
135 stream: &mut store::Ptr,
136 counts: &mut Counts,
137 task: &mut Option<Waker>,
138 ) -> Result<(), UserError>
139 where
140 B: Buf,
141 {
142 let sz = frame.payload().remaining();
143
144 if sz > MAX_WINDOW_SIZE as usize {
145 return Err(UserError::PayloadTooBig);
146 }
147
148 let sz = sz as WindowSize;
149
150 if !stream.state.is_send_streaming() {
151 if stream.state.is_closed() {
152 return Err(InactiveStreamId);
153 } else {
154 return Err(UnexpectedFrameType);
155 }
156 }
157
158 // Update the buffered data counter
159 stream.buffered_send_data += sz;
160
161 log::trace!(
162 "send_data; sz={}; buffered={}; requested={}",
163 sz,
164 stream.buffered_send_data,
165 stream.requested_send_capacity
166 );
167
168 // Implicitly request more send capacity if not enough has been
169 // requested yet.
170 if stream.requested_send_capacity < stream.buffered_send_data {
171 // Update the target requested capacity
172 stream.requested_send_capacity = stream.buffered_send_data;
173
174 self.try_assign_capacity(stream);
175 }
176
177 if frame.is_end_stream() {
178 stream.state.send_close();
179 self.reserve_capacity(0, stream, counts);
180 }
181
182 log::trace!(
183 "send_data (2); available={}; buffered={}",
184 stream.send_flow.available(),
185 stream.buffered_send_data
186 );
187
188 // The `stream.buffered_send_data == 0` check is here so that, if a zero
189 // length data frame is queued to the front (there is no previously
190 // queued data), it gets sent out immediately even if there is no
191 // available send window.
192 //
193 // Sending out zero length data frames can be done to signal
194 // end-of-stream.
195 //
196 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
197 // The stream currently has capacity to send the data frame, so
198 // queue it up and notify the connection task.
199 self.queue_frame(frame.into(), buffer, stream, task);
200 } else {
201 // The stream has no capacity to send the frame now, save it but
202 // don't notify the connection task. Once additional capacity
203 // becomes available, the frame will be flushed.
204 stream.pending_send.push_back(buffer, frame.into());
205 }
206
207 Ok(())
208 }
209
210 /// Request capacity to send data
211 pub fn reserve_capacity(
212 &mut self,
213 capacity: WindowSize,
214 stream: &mut store::Ptr,
215 counts: &mut Counts,
216 ) {
217 log::trace!(
218 "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
219 stream.id,
220 capacity,
221 capacity + stream.buffered_send_data,
222 stream.requested_send_capacity
223 );
224
225 // Actual capacity is `capacity` + the current amount of buffered data.
226 // If it were less, then we could never send out the buffered data.
227 let capacity = capacity + stream.buffered_send_data;
228
229 if capacity == stream.requested_send_capacity {
230 // Nothing to do
231 } else if capacity < stream.requested_send_capacity {
232 // Update the target requested capacity
233 stream.requested_send_capacity = capacity;
234
235 // Currently available capacity assigned to the stream
236 let available = stream.send_flow.available().as_size();
237
238 // If the stream has more assigned capacity than requested, reclaim
239 // some for the connection
240 if available > capacity {
241 let diff = available - capacity;
242
243 stream.send_flow.claim_capacity(diff);
244
245 self.assign_connection_capacity(diff, stream, counts);
246 }
247 } else {
248 // If trying to *add* capacity, but the stream send side is closed,
249 // there's nothing to be done.
250 if stream.state.is_send_closed() {
251 return;
252 }
253
254 // Update the target requested capacity
255 stream.requested_send_capacity = capacity;
256
257 // Try to assign additional capacity to the stream. If none is
258 // currently available, the stream will be queued to receive some
259 // when more becomes available.
260 self.try_assign_capacity(stream);
261 }
262 }
263
264 pub fn recv_stream_window_update(
265 &mut self,
266 inc: WindowSize,
267 stream: &mut store::Ptr,
268 ) -> Result<(), Reason> {
269 log::trace!(
270 "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
271 stream.id,
272 stream.state,
273 inc,
274 flow = stream.send_flow
275 );
276
277 if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
278 // We can't send any data, so don't bother doing anything else.
279 return Ok(());
280 }
281
282 // Update the stream level flow control.
283 stream.send_flow.inc_window(inc)?;
284
285 // If the stream is waiting on additional capacity, then this will
286 // assign it (if available on the connection) and notify the producer
287 self.try_assign_capacity(stream);
288
289 Ok(())
290 }
291
292 pub fn recv_connection_window_update(
293 &mut self,
294 inc: WindowSize,
295 store: &mut Store,
296 counts: &mut Counts,
297 ) -> Result<(), Reason> {
298 // Update the connection's window
299 self.flow.inc_window(inc)?;
300
301 self.assign_connection_capacity(inc, store, counts);
302 Ok(())
303 }
304
305 /// Reclaim all capacity assigned to the stream and re-assign it to the
306 /// connection
307 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
308 let available = stream.send_flow.available().as_size();
309 stream.send_flow.claim_capacity(available);
310 // Re-assign all capacity to the connection
311 self.assign_connection_capacity(available, stream, counts);
312 }
313
314 /// Reclaim just reserved capacity, not buffered capacity, and re-assign
315 /// it to the connection
316 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
317 // only reclaim requested capacity that isn't already buffered
318 if stream.requested_send_capacity > stream.buffered_send_data {
319 let reserved = stream.requested_send_capacity - stream.buffered_send_data;
320
321 stream.send_flow.claim_capacity(reserved);
322 self.assign_connection_capacity(reserved, stream, counts);
323 }
324 }
325
326 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
327 while let Some(stream) = self.pending_capacity.pop(store) {
328 counts.transition(stream, |_, stream| {
329 log::trace!("clear_pending_capacity; stream={:?}", stream.id);
330 })
331 }
332 }
333
334 pub fn assign_connection_capacity<R>(
335 &mut self,
336 inc: WindowSize,
337 store: &mut R,
338 counts: &mut Counts,
339 ) where
340 R: Resolve,
341 {
342 log::trace!("assign_connection_capacity; inc={}", inc);
343
344 self.flow.assign_capacity(inc);
345
346 // Assign newly acquired capacity to streams pending capacity.
347 while self.flow.available() > 0 {
348 let stream = match self.pending_capacity.pop(store) {
349 Some(stream) => stream,
350 None => return,
351 };
352
353 // Streams pending capacity may have been reset before capacity
354 // became available. In that case, the stream won't want any
355 // capacity, and so we shouldn't "transition" on it, but just evict
356 // it and continue the loop.
357 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
358 continue;
359 }
360
361 counts.transition(stream, |_, mut stream| {
362 // Try to assign capacity to the stream. This will also re-queue the
363 // stream if there isn't enough connection level capacity to fulfill
364 // the capacity request.
365 self.try_assign_capacity(&mut stream);
366 })
367 }
368 }
369
370 /// Request capacity to send data
371 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
372 let total_requested = stream.requested_send_capacity;
373
374 // Total requested should never go below actual assigned
375 // (Note: the window size can go lower than assigned)
376 debug_assert!(total_requested >= stream.send_flow.available());
377
378 // The amount of additional capacity that the stream requests.
379 // Don't assign more than the window has available!
380 let additional = cmp::min(
381 total_requested - stream.send_flow.available().as_size(),
382 // Can't assign more than what is available
383 stream.send_flow.window_size() - stream.send_flow.available().as_size(),
384 );
385
386 log::trace!(
387 "try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
388 stream.id,
389 total_requested,
390 additional,
391 buffered = stream.buffered_send_data,
392 window = stream.send_flow.window_size(),
393 conn = self.flow.available()
394 );
395
396 if additional == 0 {
397 // Nothing more to do
398 return;
399 }
400
401 // If the stream has requested capacity, then it must be in the
402 // streaming state (more data could be sent) or there is buffered data
403 // waiting to be sent.
404 debug_assert!(
405 stream.state.is_send_streaming() || stream.buffered_send_data > 0,
406 "state={:?}",
407 stream.state
408 );
409
410 // The amount of currently available capacity on the connection
411 let conn_available = self.flow.available().as_size();
412
413 // First check if capacity is immediately available
414 if conn_available > 0 {
415 // The amount of capacity to assign to the stream
416 // TODO: Should prioritization factor into this?
417 let assign = cmp::min(conn_available, additional);
418
419 log::trace!(" assigning; stream={:?}, capacity={}", stream.id, assign,);
420
421 // Assign the capacity to the stream
422 stream.assign_capacity(assign);
423
424 // Claim the capacity from the connection
425 self.flow.claim_capacity(assign);
426 }
427
428 log::trace!(
429 "try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
430 stream.send_flow.available(),
431 stream.requested_send_capacity,
432 stream.buffered_send_data,
433 stream.send_flow.has_unavailable()
434 );
435
436 if stream.send_flow.available() < stream.requested_send_capacity
437 && stream.send_flow.has_unavailable()
438 {
439 // The stream requires additional capacity and the stream's
440 // window has available capacity, but the connection window
441 // does not.
442 //
443 // In this case, the stream needs to be queued up for when the
444 // connection has more capacity.
445 self.pending_capacity.push(stream);
446 }
447
448 // If data is buffered and the stream is send ready, then
449 // schedule the stream for execution
450 if stream.buffered_send_data > 0 && stream.is_send_ready() {
451 // TODO: This assertion isn't *exactly* correct. There can still be
452 // buffered send data while the stream's pending send queue is
453 // empty. This can happen when a large data frame is in the process
454 // of being **partially** sent. Once the window has been sent, the
455 // data frame will be returned to the prioritization layer to be
456 // re-scheduled.
457 //
458 // That said, it would be nice to figure out how to make this
459 // assertion correctly.
460 //
461 // debug_assert!(!stream.pending_send.is_empty());
462
463 self.pending_send.push(stream);
464 }
465 }
466
467 pub fn poll_complete<T, B>(
468 &mut self,
469 cx: &mut Context,
470 buffer: &mut Buffer<Frame<B>>,
471 store: &mut Store,
472 counts: &mut Counts,
473 dst: &mut Codec<T, Prioritized<B>>,
474 ) -> Poll<io::Result<()>>
475 where
476 T: AsyncWrite + Unpin,
477 B: Buf,
478 {
479 // Ensure codec is ready
480 ready!(dst.poll_ready(cx))?;
481
482 // Reclaim any frame that has previously been written
483 self.reclaim_frame(buffer, store, dst);
484
485 // The max frame length
486 let max_frame_len = dst.max_send_frame_size();
487
488 log::trace!("poll_complete");
489
490 loop {
491 self.schedule_pending_open(store, counts);
492
493 match self.pop_frame(buffer, store, max_frame_len, counts) {
494 Some(frame) => {
495 log::trace!("writing frame={:?}", frame);
496
497 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
498 if let Frame::Data(ref frame) = frame {
499 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
500 }
501 dst.buffer(frame).expect("invalid frame");
502
503 // Ensure the codec is ready to try the loop again.
504 ready!(dst.poll_ready(cx))?;
505
506 // Because, always try to reclaim...
507 self.reclaim_frame(buffer, store, dst);
508 }
509 None => {
510 // Try to flush the codec.
511 ready!(dst.flush(cx))?;
512
513 // This might release a data frame...
514 if !self.reclaim_frame(buffer, store, dst) {
515 return Poll::Ready(Ok(()));
516 }
517
518 // No need to poll ready as poll_complete() does this for
519 // us...
520 }
521 }
522 }
523 }
524
525 /// Tries to reclaim a pending data frame from the codec.
526 ///
527 /// Returns true if a frame was reclaimed.
528 ///
529 /// When a data frame is written to the codec, it may not be written in its
530 /// entirety (large chunks are split up into potentially many data frames).
531 /// In this case, the stream needs to be reprioritized.
532 fn reclaim_frame<T, B>(
533 &mut self,
534 buffer: &mut Buffer<Frame<B>>,
535 store: &mut Store,
536 dst: &mut Codec<T, Prioritized<B>>,
537 ) -> bool
538 where
539 B: Buf,
540 {
541 log::trace!("try reclaim frame");
542
543 // First check if there are any data chunks to take back
544 if let Some(frame) = dst.take_last_data_frame() {
545 log::trace!(
546 " -> reclaimed; frame={:?}; sz={}",
547 frame,
548 frame.payload().inner.get_ref().remaining()
549 );
550
551 let mut eos = false;
552 let key = frame.payload().stream;
553
554 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
555 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
556 InFlightData::Drop => {
557 log::trace!("not reclaiming frame for cancelled stream");
558 return false;
559 }
560 InFlightData::DataFrame(k) => {
561 debug_assert_eq!(k, key);
562 }
563 }
564
565 let mut frame = frame.map(|prioritized| {
566 // TODO: Ensure fully written
567 eos = prioritized.end_of_stream;
568 prioritized.inner.into_inner()
569 });
570
571 if frame.payload().has_remaining() {
572 let mut stream = store.resolve(key);
573
574 if eos {
575 frame.set_end_stream(true);
576 }
577
578 self.push_back_frame(frame.into(), buffer, &mut stream);
579
580 return true;
581 }
582 }
583
584 false
585 }
586
587 /// Push the frame to the front of the stream's deque, scheduling the
588 /// stream if needed.
589 fn push_back_frame<B>(
590 &mut self,
591 frame: Frame<B>,
592 buffer: &mut Buffer<Frame<B>>,
593 stream: &mut store::Ptr,
594 ) {
595 // Push the frame to the front of the stream's deque
596 stream.pending_send.push_front(buffer, frame);
597
598 // If needed, schedule the sender
599 if stream.send_flow.available() > 0 {
600 debug_assert!(!stream.pending_send.is_empty());
601 self.pending_send.push(stream);
602 }
603 }
604
605 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
606 log::trace!("clear_queue; stream={:?}", stream.id);
607
608 // TODO: make this more efficient?
609 while let Some(frame) = stream.pending_send.pop_front(buffer) {
610 log::trace!("dropping; frame={:?}", frame);
611 }
612
613 stream.buffered_send_data = 0;
614 stream.requested_send_capacity = 0;
615 if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
616 if stream.key() == key {
617 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
618 self.in_flight_data_frame = InFlightData::Drop;
619 }
620 }
621 }
622
623 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
624 while let Some(stream) = self.pending_send.pop(store) {
625 let is_pending_reset = stream.is_pending_reset_expiration();
626 counts.transition_after(stream, is_pending_reset);
627 }
628 }
629
630 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
631 while let Some(stream) = self.pending_open.pop(store) {
632 let is_pending_reset = stream.is_pending_reset_expiration();
633 counts.transition_after(stream, is_pending_reset);
634 }
635 }
636
637 fn pop_frame<B>(
638 &mut self,
639 buffer: &mut Buffer<Frame<B>>,
640 store: &mut Store,
641 max_len: usize,
642 counts: &mut Counts,
643 ) -> Option<Frame<Prioritized<B>>>
644 where
645 B: Buf,
646 {
647 log::trace!("pop_frame");
648
649 loop {
650 match self.pending_send.pop(store) {
651 Some(mut stream) => {
652 log::trace!(
653 "pop_frame; stream={:?}; stream.state={:?}",
654 stream.id,
655 stream.state
656 );
657
658 // It's possible that this stream, besides having data to send,
659 // is also queued to send a reset, and thus is already in the queue
660 // to wait for "some time" after a reset.
661 //
662 // To be safe, we just always ask the stream.
663 let is_pending_reset = stream.is_pending_reset_expiration();
664
665 log::trace!(
666 " --> stream={:?}; is_pending_reset={:?};",
667 stream.id,
668 is_pending_reset
669 );
670
671 let frame = match stream.pending_send.pop_front(buffer) {
672 Some(Frame::Data(mut frame)) => {
673 // Get the amount of capacity remaining for stream's
674 // window.
675 let stream_capacity = stream.send_flow.available();
676 let sz = frame.payload().remaining();
677
678 log::trace!(
679 " --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
680 available={}; requested={}; buffered={};",
681 frame.stream_id(),
682 sz,
683 eos = frame.is_end_stream(),
684 window = stream_capacity,
685 available = stream.send_flow.available(),
686 requested = stream.requested_send_capacity,
687 buffered = stream.buffered_send_data,
688 );
689
690 // Zero length data frames always have capacity to
691 // be sent.
692 if sz > 0 && stream_capacity == 0 {
693 log::trace!(
694 " --> stream capacity is 0; requested={}",
695 stream.requested_send_capacity
696 );
697
698 // Ensure that the stream is waiting for
699 // connection level capacity
700 //
701 // TODO: uncomment
702 // debug_assert!(stream.is_pending_send_capacity);
703
704 // The stream has no more capacity, this can
705 // happen if the remote reduced the stream
706 // window. In this case, we need to buffer the
707 // frame and wait for a window update...
708 stream.pending_send.push_front(buffer, frame.into());
709
710 continue;
711 }
712
713 // Only send up to the max frame length
714 let len = cmp::min(sz, max_len);
715
716 // Only send up to the stream's window capacity
717 let len =
718 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
719
720 // There *must* be be enough connection level
721 // capacity at this point.
722 debug_assert!(len <= self.flow.window_size());
723
724 log::trace!(" --> sending data frame; len={}", len);
725
726 // Update the flow control
727 log::trace!(" -- updating stream flow --");
728 stream.send_flow.send_data(len);
729
730 // Decrement the stream's buffered data counter
731 debug_assert!(stream.buffered_send_data >= len);
732 stream.buffered_send_data -= len;
733 stream.requested_send_capacity -= len;
734
735 // Assign the capacity back to the connection that
736 // was just consumed from the stream in the previous
737 // line.
738 self.flow.assign_capacity(len);
739
740 log::trace!(" -- updating connection flow --");
741 self.flow.send_data(len);
742
743 // Wrap the frame's data payload to ensure that the
744 // correct amount of data gets written.
745
746 let eos = frame.is_end_stream();
747 let len = len as usize;
748
749 if frame.payload().remaining() > len {
750 frame.set_end_stream(false);
751 }
752
753 Frame::Data(frame.map(|buf| Prioritized {
754 inner: buf.take(len),
755 end_of_stream: eos,
756 stream: stream.key(),
757 }))
758 }
759 Some(Frame::PushPromise(pp)) => {
760 let mut pushed =
761 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
762 pushed.is_pending_push = false;
763 // Transition stream from pending_push to pending_open
764 // if possible
765 if !pushed.pending_send.is_empty() {
766 if counts.can_inc_num_send_streams() {
767 counts.inc_num_send_streams(&mut pushed);
768 self.pending_send.push(&mut pushed);
769 } else {
770 self.queue_open(&mut pushed);
771 }
772 }
773 Frame::PushPromise(pp)
774 }
775 Some(frame) => frame.map(|_| {
776 unreachable!(
777 "Frame::map closure will only be called \
778 on DATA frames."
779 )
780 }),
781 None => {
782 if let Some(reason) = stream.state.get_scheduled_reset() {
783 stream.state.set_reset(reason);
784
785 let frame = frame::Reset::new(stream.id, reason);
786 Frame::Reset(frame)
787 } else {
788 // If the stream receives a RESET from the peer, it may have
789 // had data buffered to be sent, but all the frames are cleared
790 // in clear_queue(). Instead of doing O(N) traversal through queue
791 // to remove, lets just ignore the stream here.
792 log::trace!("removing dangling stream from pending_send");
793 // Since this should only happen as a consequence of `clear_queue`,
794 // we must be in a closed state of some kind.
795 debug_assert!(stream.state.is_closed());
796 counts.transition_after(stream, is_pending_reset);
797 continue;
798 }
799 }
800 };
801
802 log::trace!("pop_frame; frame={:?}", frame);
803
804 if cfg!(debug_assertions) && stream.state.is_idle() {
805 debug_assert!(stream.id > self.last_opened_id);
806 self.last_opened_id = stream.id;
807 }
808
809 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
810 // TODO: Only requeue the sender IF it is ready to send
811 // the next frame. i.e. don't requeue it if the next
812 // frame is a data frame and the stream does not have
813 // any more capacity.
814 self.pending_send.push(&mut stream);
815 }
816
817 counts.transition_after(stream, is_pending_reset);
818
819 return Some(frame);
820 }
821 None => return None,
822 }
823 }
824 }
825
826 fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
827 log::trace!("schedule_pending_open");
828 // check for any pending open streams
829 while counts.can_inc_num_send_streams() {
830 if let Some(mut stream) = self.pending_open.pop(store) {
831 log::trace!("schedule_pending_open; stream={:?}", stream.id);
832
833 counts.inc_num_send_streams(&mut stream);
834 self.pending_send.push(&mut stream);
835 stream.notify_send();
836 } else {
837 return;
838 }
839 }
840 }
841}
842
843// ===== impl Prioritized =====
844
845impl<B> Buf for Prioritized<B>
846where
847 B: Buf,
848{
849 fn remaining(&self) -> usize {
850 self.inner.remaining()
851 }
852
853 fn bytes(&self) -> &[u8] {
854 self.inner.bytes()
855 }
856
857 fn advance(&mut self, cnt: usize) {
858 self.inner.advance(cnt)
859 }
860}
861
862impl<B: Buf> fmt::Debug for Prioritized<B> {
863 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
864 fmt.debug_struct("Prioritized")
865 .field("remaining", &self.inner.get_ref().remaining())
866 .field("end_of_stream", &self.end_of_stream)
867 .field("stream", &self.stream)
868 .finish()
869 }
870}