request_response/
throttled.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Limit the number of requests peers can send to each other.
22//!
23//! Each peer is assigned a budget for sending and a budget for receiving
24//! requests. Initially a peer assumes it has a send budget of 1. When its
25//! budget has been used up its remote peer will send a credit message which
26//! informs it how many more requests it can send before it needs to wait for
27//! the next credit message. Credit messages which error or time out are
28//! retried until they have reached the peer which is assumed once a
29//! corresponding ack or a new request has been received from the peer.
30//!
31//! The `Throttled` behaviour wraps an existing `RequestResponse` behaviour
32//! and uses a codec implementation that sends ordinary requests and responses
33//! as well as a special credit message to which an ack message is expected
34//! as a response. It does so by putting a small CBOR encoded header in front
35//! of each message the inner codec produces.
36
37mod codec;
38
39use codec::{Codec, Message, ProtocolWrapper, Type};
40use crate::handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
41use futures::ready;
42use tetsy_libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId};
43use tetsy_libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
44use lru::LruCache;
45use std::{collections::{HashMap, HashSet, VecDeque}, task::{Context, Poll}};
46use std::{cmp::max, num::NonZeroU16};
47use super::{
48    ProtocolSupport,
49    RequestId,
50    RequestResponse,
51    RequestResponseCodec,
52    RequestResponseConfig,
53    RequestResponseEvent,
54    RequestResponseMessage,
55};
56
57pub type ResponseChannel<R> = super::ResponseChannel<Message<R>>;
58
59/// A wrapper around [`RequestResponse`] which adds request limits per peer.
60pub struct Throttled<C>
61where
62    C: RequestResponseCodec + Send,
63    C::Protocol: Sync
64{
65    /// A random id used for logging.
66    id: u32,
67    /// The wrapped behaviour.
68    behaviour: RequestResponse<Codec<C>>,
69    /// Information per peer.
70    peer_info: HashMap<PeerId, PeerInfo>,
71    /// Information about previously connected peers.
72    offline_peer_info: LruCache<PeerId, PeerInfo>,
73    /// The default limit applies to all peers unless overriden.
74    default_limit: Limit,
75    /// Permanent limit overrides per peer.
76    limit_overrides: HashMap<PeerId, Limit>,
77    /// Pending events to report in `Throttled::poll`.
78    events: VecDeque<Event<C::Request, C::Response, Message<C::Response>>>,
79    /// The current credit ID.
80    next_grant_id: u64
81}
82
83/// Information about a credit grant that is sent to remote peers.
84#[derive(Clone, Copy, Debug)]
85struct Grant {
86    /// The grant ID. Used to deduplicate retransmitted credit grants.
87    id: GrantId,
88    /// The ID of the outbound credit grant message.
89    request: RequestId,
90    /// The credit given in this grant, i.e. the number of additional
91    /// requests the remote is allowed to send.
92    credit: u16
93}
94
95/// Max. number of inbound requests that can be received.
96#[derive(Clone, Copy, Debug)]
97struct Limit {
98    /// The current receive limit.
99    max_recv: NonZeroU16,
100    /// The next receive limit which becomes active after
101    /// the current limit has been reached.
102    next_max: NonZeroU16
103}
104
105impl Limit {
106    /// Create a new limit.
107    fn new(max: NonZeroU16) -> Self {
108        // The max. limit provided will be effective after the initial request
109        // from a peer which is always allowed has been answered. Values greater
110        // than 1 would prevent sending the credit grant, leading to a stalling
111        // sender so we must not use `max` right away.
112        Limit {
113            max_recv: NonZeroU16::new(1).expect("1 > 0"),
114            next_max: max
115        }
116    }
117
118    /// Set a new limit.
119    ///
120    /// The new limit becomes effective when all current inbound
121    /// requests have been processed and replied to.
122    fn set(&mut self, next: NonZeroU16) {
123        self.next_max = next
124    }
125
126    /// Activate the new limit.
127    fn switch(&mut self) -> u16 {
128        self.max_recv = self.next_max;
129        self.max_recv.get()
130    }
131}
132
133type GrantId = u64;
134
135/// Information related to the current send budget with a peer.
136#[derive(Clone, Debug)]
137struct SendBudget {
138    /// The last received credit grant.
139    grant: Option<GrantId>,
140    /// The remaining credit for requests to send.
141    remaining: u16,
142    /// Credit grant requests received and acknowledged where the outcome
143    /// of the acknowledgement (i.e. response sent) is still undetermined.
144    /// Used to avoid emitting events for successful (`ResponseSent`) or failed
145    /// acknowledgements.
146    received: HashSet<RequestId>,
147}
148
149/// Information related to the current receive budget with a peer.
150#[derive(Clone, Debug)]
151struct RecvBudget {
152    /// The grant currently given to the remote but yet to be acknowledged.
153    ///
154    /// Set to `Some` when a new grant is sent to the remote, followed
155    /// by `None` when an acknowledgment or a request is received. The
156    /// latter is seen as an implicit acknowledgement.
157    grant: Option<Grant>,
158    /// The limit for new credit grants when the `remaining` credit is
159    /// exhausted.
160    limit: Limit,
161    /// The remaining credit for requests to receive.
162    remaining: u16,
163    /// Credit grants sent whose outcome is still undetermined.
164    /// Used to avoid emitting events for failed credit grants.
165    ///
166    /// > **Note**: While receiving an inbound request is an implicit
167    /// > acknowledgement for the last sent `grant`, the outcome of
168    /// > the outbound request remains undetermined until a success or
169    /// > failure event is received for that request or the corresponding
170    /// > connection closes.
171    sent: HashSet<RequestId>,
172}
173
174/// Budget information about a peer.
175#[derive(Clone, Debug)]
176struct PeerInfo {
177    send_budget: SendBudget,
178    recv_budget: RecvBudget,
179}
180
181impl PeerInfo {
182    fn new(recv_limit: Limit) -> Self {
183        PeerInfo {
184            send_budget: SendBudget {
185                grant: None,
186                remaining: 1,
187                received: HashSet::new(),
188            },
189            recv_budget: RecvBudget {
190                grant: None,
191                limit: recv_limit,
192                remaining: 1,
193                sent: HashSet::new(),
194            }
195        }
196    }
197
198    fn into_disconnected(mut self) -> Self {
199        self.send_budget.received = HashSet::new();
200        self.send_budget.remaining = 1;
201        self.recv_budget.sent = HashSet::new();
202        self.recv_budget.remaining = max(1, self.recv_budget.remaining);
203        // Since we potentially reset the remaining receive budget,
204        // we forget about the potentially still unacknowledged last grant.
205        self.recv_budget.grant = None;
206        self
207    }
208}
209
210impl<C> Throttled<C>
211where
212    C: RequestResponseCodec + Send + Clone,
213    C::Protocol: Sync
214{
215    /// Create a new throttled request-response behaviour.
216    pub fn new<I>(c: C, protos: I, cfg: RequestResponseConfig) -> Self
217    where
218        I: IntoIterator<Item = (C::Protocol, ProtocolSupport)>,
219        C: Send,
220        C::Protocol: Sync
221    {
222        let protos = protos.into_iter().map(|(p, ps)| (ProtocolWrapper::new(b"/t/1", p), ps));
223        Throttled::from(RequestResponse::new(Codec::new(c, 8192), protos, cfg))
224    }
225
226    /// Wrap an existing `RequestResponse` behaviour and apply send/recv limits.
227    pub fn from(behaviour: RequestResponse<Codec<C>>) -> Self {
228        Throttled {
229            id: rand::random(),
230            behaviour,
231            peer_info: HashMap::new(),
232            offline_peer_info: LruCache::new(8192),
233            default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")),
234            limit_overrides: HashMap::new(),
235            events: VecDeque::new(),
236            next_grant_id: 0
237        }
238    }
239
240    /// Set the global default receive limit per peer.
241    pub fn set_receive_limit(&mut self, limit: NonZeroU16) {
242        log::trace!("{:08x}: new default limit: {:?}", self.id, limit);
243        self.default_limit = Limit::new(limit)
244    }
245
246    /// Override the receive limit of a single peer.
247    pub fn override_receive_limit(&mut self, p: &PeerId, limit: NonZeroU16) {
248        log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit);
249        if let Some(info) = self.peer_info.get_mut(p) {
250            info.recv_budget.limit.set(limit)
251        } else if let Some(info) = self.offline_peer_info.get_mut(p) {
252            info.recv_budget.limit.set(limit)
253        }
254        self.limit_overrides.insert(*p, Limit::new(limit));
255    }
256
257    /// Remove any limit overrides for the given peer.
258    pub fn remove_override(&mut self, p: &PeerId) {
259        log::trace!("{:08x}: removing limit override for {}", self.id, p);
260        self.limit_overrides.remove(p);
261    }
262
263    /// Has the limit of outbound requests been reached for the given peer?
264    pub fn can_send(&mut self, p: &PeerId) -> bool {
265        self.peer_info.get(p).map(|i| i.send_budget.remaining > 0).unwrap_or(true)
266    }
267
268    /// Send a request to a peer.
269    ///
270    /// If the limit of outbound requests has been reached, the request is
271    /// returned. Sending more outbound requests should only be attempted
272    /// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`].
273    pub fn send_request(&mut self, p: &PeerId, req: C::Request) -> Result<RequestId, C::Request> {
274        let connected = &mut self.peer_info;
275        let disconnected = &mut self.offline_peer_info;
276        let remaining =
277            if let Some(info) = connected.get_mut(p).or_else(|| disconnected.get_mut(p)) {
278                if info.send_budget.remaining == 0 {
279                    log::trace!("{:08x}: no more budget to send another request to {}", self.id, p);
280                    return Err(req)
281                }
282                info.send_budget.remaining -= 1;
283                info.send_budget.remaining
284            } else {
285                let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
286                let mut info = PeerInfo::new(limit);
287                info.send_budget.remaining -= 1;
288                let remaining = info.send_budget.remaining;
289                self.offline_peer_info.put(*p, info);
290                remaining
291            };
292
293        let rid = self.behaviour.send_request(p, Message::request(req));
294
295        log::trace! { "{:08x}: sending request {} to {} (budget remaining = {})",
296            self.id,
297            rid,
298            p,
299            remaining
300        };
301
302        Ok(rid)
303    }
304
305    /// Answer an inbound request with a response.
306    ///
307    /// See [`RequestResponse::send_response`] for details.
308    pub fn send_response(&mut self, ch: ResponseChannel<C::Response>, res: C::Response)
309        -> Result<(), C::Response>
310    {
311        log::trace!("{:08x}: sending response {} to peer {}", self.id, ch.request_id(), &ch.peer);
312        if let Some(info) = self.peer_info.get_mut(&ch.peer) {
313            if info.recv_budget.remaining == 0 { // need to send more credit to the remote peer
314                let crd = info.recv_budget.limit.switch();
315                info.recv_budget.remaining = info.recv_budget.limit.max_recv.get();
316                self.send_credit(&ch.peer, crd);
317            }
318        }
319        match self.behaviour.send_response(ch, Message::response(res)) {
320            Ok(()) => Ok(()),
321            Err(m) => Err(m.into_parts().1.expect("Missing response data.")),
322        }
323    }
324
325    /// Add a known peer address.
326    ///
327    /// See [`RequestResponse::add_address`] for details.
328    pub fn add_address(&mut self, p: &PeerId, a: Multiaddr) {
329        self.behaviour.add_address(p, a)
330    }
331
332    /// Remove a previously added peer address.
333    ///
334    /// See [`RequestResponse::remove_address`] for details.
335    pub fn remove_address(&mut self, p: &PeerId, a: &Multiaddr) {
336        self.behaviour.remove_address(p, a)
337    }
338
339    /// Are we connected to the given peer?
340    ///
341    /// See [`RequestResponse::is_connected`] for details.
342    pub fn is_connected(&self, p: &PeerId) -> bool {
343        self.behaviour.is_connected(p)
344    }
345
346    /// Are we waiting for a response to the given request?
347    ///
348    /// See [`RequestResponse::is_pending_outbound`] for details.
349    pub fn is_pending_outbound(&self, p: &PeerId, r: &RequestId) -> bool {
350        self.behaviour.is_pending_outbound(p, r)
351    }
352
353
354    /// Is the remote waiting for the local node to respond to the given
355    /// request?
356    ///
357    /// See [`RequestResponse::is_pending_inbound`] for details.
358    pub fn is_pending_inbound(&self, p: &PeerId, r: &RequestId) -> bool {
359        self.behaviour.is_pending_inbound(p, r)
360    }
361
362    /// Send a credit grant to the given peer.
363    fn send_credit(&mut self, p: &PeerId, credit: u16) {
364        if let Some(info) = self.peer_info.get_mut(p) {
365            let cid = self.next_grant_id;
366            self.next_grant_id += 1;
367            let rid = self.behaviour.send_request(p, Message::credit(credit, cid));
368            log::trace!("{:08x}: sending {} credit as grant {} to {}", self.id, credit, cid, p);
369            let grant = Grant { id: cid, request: rid, credit };
370            info.recv_budget.grant = Some(grant);
371            info.recv_budget.sent.insert(rid);
372        }
373    }
374}
375
376/// A Wrapper around [`RequestResponseEvent`].
377#[derive(Debug)]
378pub enum Event<Req, Res, CRes = Res> {
379    /// A regular request-response event.
380    Event(RequestResponseEvent<Req, Res, CRes>),
381    /// We received more inbound requests than allowed.
382    TooManyInboundRequests(PeerId),
383    /// When previously reaching the send limit of a peer,
384    /// this event is eventually emitted when sending is
385    /// allowed to resume.
386    ResumeSending(PeerId)
387}
388
389impl<C> NetworkBehaviour for Throttled<C>
390where
391    C: RequestResponseCodec + Send + Clone + 'static,
392    C::Protocol: Sync
393{
394    type ProtocolsHandler = RequestResponseHandler<Codec<C>>;
395    type OutEvent = Event<C::Request, C::Response, Message<C::Response>>;
396
397    fn new_handler(&mut self) -> Self::ProtocolsHandler {
398        self.behaviour.new_handler()
399    }
400
401    fn addresses_of_peer(&mut self, p: &PeerId) -> Vec<Multiaddr> {
402        self.behaviour.addresses_of_peer(p)
403    }
404
405    fn inject_connection_established(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
406        self.behaviour.inject_connection_established(p, id, end)
407    }
408
409    fn inject_connection_closed(&mut self, peer: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
410        self.behaviour.inject_connection_closed(peer, id, end);
411        if let Some(info) = self.peer_info.get_mut(peer) {
412            if let Some(grant) = &mut info.recv_budget.grant {
413                log::debug! { "{:08x}: resending credit grant {} to {} after connection closed",
414                    self.id,
415                    grant.id,
416                    peer
417                };
418                let msg = Message::credit(grant.credit, grant.id);
419                grant.request = self.behaviour.send_request(peer, msg)
420            }
421        }
422    }
423
424    fn inject_connected(&mut self, p: &PeerId) {
425        log::trace!("{:08x}: connected to {}", self.id, p);
426        self.behaviour.inject_connected(p);
427        // The limit may have been added by `Throttled::send_request` already.
428        if !self.peer_info.contains_key(p) {
429            if let Some(info) = self.offline_peer_info.pop(p) {
430                let recv_budget = info.recv_budget.remaining;
431                self.peer_info.insert(*p, info);
432                if recv_budget > 1 {
433                    self.send_credit(p, recv_budget - 1);
434                }
435            } else {
436                let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
437                self.peer_info.insert(*p, PeerInfo::new(limit));
438            }
439        }
440    }
441
442    fn inject_disconnected(&mut self, p: &PeerId) {
443        log::trace!("{:08x}: disconnected from {}", self.id, p);
444        if let Some(info) = self.peer_info.remove(p) {
445            self.offline_peer_info.put(*p, info.into_disconnected());
446        }
447        self.behaviour.inject_disconnected(p)
448    }
449
450    fn inject_dial_failure(&mut self, p: &PeerId) {
451        self.behaviour.inject_dial_failure(p)
452    }
453
454    fn inject_event(&mut self, p: PeerId, i: ConnectionId, e: RequestResponseHandlerEvent<Codec<C>>) {
455        self.behaviour.inject_event(p, i, e)
456    }
457
458    fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters)
459        -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<C>>, Self::OutEvent>>
460    {
461        loop {
462            if let Some(ev) = self.events.pop_front() {
463                return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
464            } else if self.events.capacity() > super::EMPTY_QUEUE_SHRINK_THRESHOLD {
465                self.events.shrink_to_fit()
466            }
467
468            let event = match ready!(self.behaviour.poll(cx, params)) {
469                | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::Message { peer, message }) => {
470                    let message = match message {
471                        | RequestResponseMessage::Response { request_id, response } =>
472                            match &response.header().typ {
473                                | Some(Type::Ack) => {
474                                    if let Some(info) = self.peer_info.get_mut(&peer) {
475                                        if let Some(id) = info.recv_budget.grant.as_ref().map(|c| c.id) {
476                                            if Some(id) == response.header().ident {
477                                                log::trace!("{:08x}: received ack {} from {}", self.id, id, peer);
478                                                info.recv_budget.grant = None;
479                                            }
480                                        }
481                                        info.recv_budget.sent.remove(&request_id);
482                                    }
483                                    continue
484                                }
485                                | Some(Type::Response) => {
486                                    log::trace!("{:08x}: received response {} from {}", self.id, request_id, peer);
487                                    if let Some(rs) = response.into_parts().1 {
488                                        RequestResponseMessage::Response { request_id, response: rs }
489                                    } else {
490                                        log::error! { "{:08x}: missing data for response {} from peer {}",
491                                            self.id,
492                                            request_id,
493                                            peer
494                                        }
495                                        continue
496                                    }
497                                }
498                                | ty => {
499                                    log::trace! {
500                                        "{:08x}: unknown message type: {:?} from {}; expected response or credit",
501                                        self.id,
502                                        ty,
503                                        peer
504                                    };
505                                    continue
506                                }
507                            }
508                        | RequestResponseMessage::Request { request_id, request, channel } =>
509                            match &request.header().typ {
510                                | Some(Type::Credit) => {
511                                    if let Some(info) = self.peer_info.get_mut(&peer) {
512                                        let id = if let Some(n) = request.header().ident {
513                                            n
514                                        } else {
515                                            log::warn! { "{:08x}: missing credit id in message from {}",
516                                                self.id,
517                                                peer
518                                            }
519                                            continue
520                                        };
521                                        let credit = request.header().credit.unwrap_or(0);
522                                        log::trace! { "{:08x}: received {} additional credit {} from {}",
523                                            self.id,
524                                            credit,
525                                            id,
526                                            peer
527                                        };
528                                        if info.send_budget.grant < Some(id) {
529                                            if info.send_budget.remaining == 0 && credit > 0 {
530                                                log::trace!("{:08x}: sending to peer {} can resume", self.id, peer);
531                                                self.events.push_back(Event::ResumeSending(peer))
532                                            }
533                                            info.send_budget.remaining += credit;
534                                            info.send_budget.grant = Some(id);
535                                        }
536                                        // Note: Failing to send a response to a credit grant is
537                                        // handled along with other inbound failures further below.
538                                        let _ = self.behaviour.send_response(channel, Message::ack(id));
539                                        info.send_budget.received.insert(request_id);
540                                    }
541                                    continue
542                                }
543                                | Some(Type::Request) => {
544                                    if let Some(info) = self.peer_info.get_mut(&peer) {
545                                        log::trace! { "{:08x}: received request {} (recv. budget = {})",
546                                            self.id,
547                                            request_id,
548                                            info.recv_budget.remaining
549                                        };
550                                        if info.recv_budget.remaining == 0 {
551                                            log::debug!("{:08x}: peer {} exceeds its budget", self.id, peer);
552                                            self.events.push_back(Event::TooManyInboundRequests(peer));
553                                            continue
554                                        }
555                                        info.recv_budget.remaining -= 1;
556                                        // We consider a request as proof that our credit grant has
557                                        // reached the peer. Usually, an ACK has already been
558                                        // received.
559                                        info.recv_budget.grant = None;
560                                    }
561                                    if let Some(rq) = request.into_parts().1 {
562                                        RequestResponseMessage::Request { request_id, request: rq, channel }
563                                    } else {
564                                        log::error! { "{:08x}: missing data for request {} from peer {}",
565                                            self.id,
566                                            request_id,
567                                            peer
568                                        }
569                                        continue
570                                    }
571                                }
572                                | ty => {
573                                    log::trace! {
574                                        "{:08x}: unknown message type: {:?} from {}; expected request or ack",
575                                        self.id,
576                                        ty,
577                                        peer
578                                    };
579                                    continue
580                                }
581                            }
582                    };
583                    let event = RequestResponseEvent::Message { peer, message };
584                    NetworkBehaviourAction::GenerateEvent(Event::Event(event))
585                }
586                | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::OutboundFailure {
587                    peer,
588                    request_id,
589                    error
590                }) => {
591                    if let Some(info) = self.peer_info.get_mut(&peer) {
592                        if let Some(grant) = info.recv_budget.grant.as_mut() {
593                            if grant.request == request_id {
594                                log::debug! {
595                                    "{:08x}: failed to send {} as credit {} to {}; retrying...",
596                                    self.id,
597                                    grant.credit,
598                                    grant.id,
599                                    peer
600                                };
601                                let msg = Message::credit(grant.credit, grant.id);
602                                grant.request = self.behaviour.send_request(&peer, msg);
603                            }
604                        }
605
606                        // If the outbound failure was for a credit message, don't report it on
607                        // the public API and retry the sending.
608                        if info.recv_budget.sent.remove(&request_id) {
609                            continue
610                        }
611                    }
612                    let event = RequestResponseEvent::OutboundFailure { peer, request_id, error };
613                    NetworkBehaviourAction::GenerateEvent(Event::Event(event))
614                }
615                | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::InboundFailure {
616                    peer,
617                    request_id,
618                    error
619                }) => {
620                    // If the inbound failure occurred in the context of responding to a
621                    // credit grant, don't report it on the public API.
622                    if let Some(info) = self.peer_info.get_mut(&peer) {
623                        if info.send_budget.received.remove(&request_id) {
624                            log::debug! {
625                                "{:08}: failed to acknowledge credit grant from {}: {:?}",
626                                self.id, peer, error
627                            };
628                            continue
629                        }
630                    }
631                    let event = RequestResponseEvent::InboundFailure { peer, request_id, error };
632                    NetworkBehaviourAction::GenerateEvent(Event::Event(event))
633                }
634                | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::ResponseSent {
635                    peer,
636                    request_id
637                }) => {
638                    // If this event is for an ACK response that was sent for
639                    // the last received credit grant, skip it.
640                    if let Some(info) = self.peer_info.get_mut(&peer) {
641                        if info.send_budget.received.remove(&request_id) {
642                            log::trace! {
643                                "{:08}: successfully sent ACK for credit grant {:?}.",
644                                self.id,
645                                info.send_budget.grant,
646                            }
647                            continue
648                        }
649                    }
650                    NetworkBehaviourAction::GenerateEvent(Event::Event(
651                        RequestResponseEvent::ResponseSent { peer, request_id }))
652                }
653                | NetworkBehaviourAction::DialAddress { address } =>
654                    NetworkBehaviourAction::DialAddress { address },
655                | NetworkBehaviourAction::DialPeer { peer_id, condition } =>
656                    NetworkBehaviourAction::DialPeer { peer_id, condition },
657                | NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
658                    NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
659                | NetworkBehaviourAction::ReportObservedAddr { address, score } =>
660                    NetworkBehaviourAction::ReportObservedAddr { address, score }
661            };
662
663            return Poll::Ready(event)
664        }
665    }
666}