1mod codec;
38
39use codec::{Codec, Message, ProtocolWrapper, Type};
40use crate::handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
41use futures::ready;
42use tetsy_libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId};
43use tetsy_libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
44use lru::LruCache;
45use std::{collections::{HashMap, HashSet, VecDeque}, task::{Context, Poll}};
46use std::{cmp::max, num::NonZeroU16};
47use super::{
48 ProtocolSupport,
49 RequestId,
50 RequestResponse,
51 RequestResponseCodec,
52 RequestResponseConfig,
53 RequestResponseEvent,
54 RequestResponseMessage,
55};
56
57pub type ResponseChannel<R> = super::ResponseChannel<Message<R>>;
58
59pub struct Throttled<C>
61where
62 C: RequestResponseCodec + Send,
63 C::Protocol: Sync
64{
65 id: u32,
67 behaviour: RequestResponse<Codec<C>>,
69 peer_info: HashMap<PeerId, PeerInfo>,
71 offline_peer_info: LruCache<PeerId, PeerInfo>,
73 default_limit: Limit,
75 limit_overrides: HashMap<PeerId, Limit>,
77 events: VecDeque<Event<C::Request, C::Response, Message<C::Response>>>,
79 next_grant_id: u64
81}
82
83#[derive(Clone, Copy, Debug)]
85struct Grant {
86 id: GrantId,
88 request: RequestId,
90 credit: u16
93}
94
95#[derive(Clone, Copy, Debug)]
97struct Limit {
98 max_recv: NonZeroU16,
100 next_max: NonZeroU16
103}
104
105impl Limit {
106 fn new(max: NonZeroU16) -> Self {
108 Limit {
113 max_recv: NonZeroU16::new(1).expect("1 > 0"),
114 next_max: max
115 }
116 }
117
118 fn set(&mut self, next: NonZeroU16) {
123 self.next_max = next
124 }
125
126 fn switch(&mut self) -> u16 {
128 self.max_recv = self.next_max;
129 self.max_recv.get()
130 }
131}
132
133type GrantId = u64;
134
135#[derive(Clone, Debug)]
137struct SendBudget {
138 grant: Option<GrantId>,
140 remaining: u16,
142 received: HashSet<RequestId>,
147}
148
149#[derive(Clone, Debug)]
151struct RecvBudget {
152 grant: Option<Grant>,
158 limit: Limit,
161 remaining: u16,
163 sent: HashSet<RequestId>,
172}
173
174#[derive(Clone, Debug)]
176struct PeerInfo {
177 send_budget: SendBudget,
178 recv_budget: RecvBudget,
179}
180
181impl PeerInfo {
182 fn new(recv_limit: Limit) -> Self {
183 PeerInfo {
184 send_budget: SendBudget {
185 grant: None,
186 remaining: 1,
187 received: HashSet::new(),
188 },
189 recv_budget: RecvBudget {
190 grant: None,
191 limit: recv_limit,
192 remaining: 1,
193 sent: HashSet::new(),
194 }
195 }
196 }
197
198 fn into_disconnected(mut self) -> Self {
199 self.send_budget.received = HashSet::new();
200 self.send_budget.remaining = 1;
201 self.recv_budget.sent = HashSet::new();
202 self.recv_budget.remaining = max(1, self.recv_budget.remaining);
203 self.recv_budget.grant = None;
206 self
207 }
208}
209
210impl<C> Throttled<C>
211where
212 C: RequestResponseCodec + Send + Clone,
213 C::Protocol: Sync
214{
215 pub fn new<I>(c: C, protos: I, cfg: RequestResponseConfig) -> Self
217 where
218 I: IntoIterator<Item = (C::Protocol, ProtocolSupport)>,
219 C: Send,
220 C::Protocol: Sync
221 {
222 let protos = protos.into_iter().map(|(p, ps)| (ProtocolWrapper::new(b"/t/1", p), ps));
223 Throttled::from(RequestResponse::new(Codec::new(c, 8192), protos, cfg))
224 }
225
226 pub fn from(behaviour: RequestResponse<Codec<C>>) -> Self {
228 Throttled {
229 id: rand::random(),
230 behaviour,
231 peer_info: HashMap::new(),
232 offline_peer_info: LruCache::new(8192),
233 default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")),
234 limit_overrides: HashMap::new(),
235 events: VecDeque::new(),
236 next_grant_id: 0
237 }
238 }
239
240 pub fn set_receive_limit(&mut self, limit: NonZeroU16) {
242 log::trace!("{:08x}: new default limit: {:?}", self.id, limit);
243 self.default_limit = Limit::new(limit)
244 }
245
246 pub fn override_receive_limit(&mut self, p: &PeerId, limit: NonZeroU16) {
248 log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit);
249 if let Some(info) = self.peer_info.get_mut(p) {
250 info.recv_budget.limit.set(limit)
251 } else if let Some(info) = self.offline_peer_info.get_mut(p) {
252 info.recv_budget.limit.set(limit)
253 }
254 self.limit_overrides.insert(*p, Limit::new(limit));
255 }
256
257 pub fn remove_override(&mut self, p: &PeerId) {
259 log::trace!("{:08x}: removing limit override for {}", self.id, p);
260 self.limit_overrides.remove(p);
261 }
262
263 pub fn can_send(&mut self, p: &PeerId) -> bool {
265 self.peer_info.get(p).map(|i| i.send_budget.remaining > 0).unwrap_or(true)
266 }
267
268 pub fn send_request(&mut self, p: &PeerId, req: C::Request) -> Result<RequestId, C::Request> {
274 let connected = &mut self.peer_info;
275 let disconnected = &mut self.offline_peer_info;
276 let remaining =
277 if let Some(info) = connected.get_mut(p).or_else(|| disconnected.get_mut(p)) {
278 if info.send_budget.remaining == 0 {
279 log::trace!("{:08x}: no more budget to send another request to {}", self.id, p);
280 return Err(req)
281 }
282 info.send_budget.remaining -= 1;
283 info.send_budget.remaining
284 } else {
285 let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
286 let mut info = PeerInfo::new(limit);
287 info.send_budget.remaining -= 1;
288 let remaining = info.send_budget.remaining;
289 self.offline_peer_info.put(*p, info);
290 remaining
291 };
292
293 let rid = self.behaviour.send_request(p, Message::request(req));
294
295 log::trace! { "{:08x}: sending request {} to {} (budget remaining = {})",
296 self.id,
297 rid,
298 p,
299 remaining
300 };
301
302 Ok(rid)
303 }
304
305 pub fn send_response(&mut self, ch: ResponseChannel<C::Response>, res: C::Response)
309 -> Result<(), C::Response>
310 {
311 log::trace!("{:08x}: sending response {} to peer {}", self.id, ch.request_id(), &ch.peer);
312 if let Some(info) = self.peer_info.get_mut(&ch.peer) {
313 if info.recv_budget.remaining == 0 { let crd = info.recv_budget.limit.switch();
315 info.recv_budget.remaining = info.recv_budget.limit.max_recv.get();
316 self.send_credit(&ch.peer, crd);
317 }
318 }
319 match self.behaviour.send_response(ch, Message::response(res)) {
320 Ok(()) => Ok(()),
321 Err(m) => Err(m.into_parts().1.expect("Missing response data.")),
322 }
323 }
324
325 pub fn add_address(&mut self, p: &PeerId, a: Multiaddr) {
329 self.behaviour.add_address(p, a)
330 }
331
332 pub fn remove_address(&mut self, p: &PeerId, a: &Multiaddr) {
336 self.behaviour.remove_address(p, a)
337 }
338
339 pub fn is_connected(&self, p: &PeerId) -> bool {
343 self.behaviour.is_connected(p)
344 }
345
346 pub fn is_pending_outbound(&self, p: &PeerId, r: &RequestId) -> bool {
350 self.behaviour.is_pending_outbound(p, r)
351 }
352
353
354 pub fn is_pending_inbound(&self, p: &PeerId, r: &RequestId) -> bool {
359 self.behaviour.is_pending_inbound(p, r)
360 }
361
362 fn send_credit(&mut self, p: &PeerId, credit: u16) {
364 if let Some(info) = self.peer_info.get_mut(p) {
365 let cid = self.next_grant_id;
366 self.next_grant_id += 1;
367 let rid = self.behaviour.send_request(p, Message::credit(credit, cid));
368 log::trace!("{:08x}: sending {} credit as grant {} to {}", self.id, credit, cid, p);
369 let grant = Grant { id: cid, request: rid, credit };
370 info.recv_budget.grant = Some(grant);
371 info.recv_budget.sent.insert(rid);
372 }
373 }
374}
375
376#[derive(Debug)]
378pub enum Event<Req, Res, CRes = Res> {
379 Event(RequestResponseEvent<Req, Res, CRes>),
381 TooManyInboundRequests(PeerId),
383 ResumeSending(PeerId)
387}
388
389impl<C> NetworkBehaviour for Throttled<C>
390where
391 C: RequestResponseCodec + Send + Clone + 'static,
392 C::Protocol: Sync
393{
394 type ProtocolsHandler = RequestResponseHandler<Codec<C>>;
395 type OutEvent = Event<C::Request, C::Response, Message<C::Response>>;
396
397 fn new_handler(&mut self) -> Self::ProtocolsHandler {
398 self.behaviour.new_handler()
399 }
400
401 fn addresses_of_peer(&mut self, p: &PeerId) -> Vec<Multiaddr> {
402 self.behaviour.addresses_of_peer(p)
403 }
404
405 fn inject_connection_established(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
406 self.behaviour.inject_connection_established(p, id, end)
407 }
408
409 fn inject_connection_closed(&mut self, peer: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
410 self.behaviour.inject_connection_closed(peer, id, end);
411 if let Some(info) = self.peer_info.get_mut(peer) {
412 if let Some(grant) = &mut info.recv_budget.grant {
413 log::debug! { "{:08x}: resending credit grant {} to {} after connection closed",
414 self.id,
415 grant.id,
416 peer
417 };
418 let msg = Message::credit(grant.credit, grant.id);
419 grant.request = self.behaviour.send_request(peer, msg)
420 }
421 }
422 }
423
424 fn inject_connected(&mut self, p: &PeerId) {
425 log::trace!("{:08x}: connected to {}", self.id, p);
426 self.behaviour.inject_connected(p);
427 if !self.peer_info.contains_key(p) {
429 if let Some(info) = self.offline_peer_info.pop(p) {
430 let recv_budget = info.recv_budget.remaining;
431 self.peer_info.insert(*p, info);
432 if recv_budget > 1 {
433 self.send_credit(p, recv_budget - 1);
434 }
435 } else {
436 let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
437 self.peer_info.insert(*p, PeerInfo::new(limit));
438 }
439 }
440 }
441
442 fn inject_disconnected(&mut self, p: &PeerId) {
443 log::trace!("{:08x}: disconnected from {}", self.id, p);
444 if let Some(info) = self.peer_info.remove(p) {
445 self.offline_peer_info.put(*p, info.into_disconnected());
446 }
447 self.behaviour.inject_disconnected(p)
448 }
449
450 fn inject_dial_failure(&mut self, p: &PeerId) {
451 self.behaviour.inject_dial_failure(p)
452 }
453
454 fn inject_event(&mut self, p: PeerId, i: ConnectionId, e: RequestResponseHandlerEvent<Codec<C>>) {
455 self.behaviour.inject_event(p, i, e)
456 }
457
458 fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters)
459 -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<C>>, Self::OutEvent>>
460 {
461 loop {
462 if let Some(ev) = self.events.pop_front() {
463 return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
464 } else if self.events.capacity() > super::EMPTY_QUEUE_SHRINK_THRESHOLD {
465 self.events.shrink_to_fit()
466 }
467
468 let event = match ready!(self.behaviour.poll(cx, params)) {
469 | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::Message { peer, message }) => {
470 let message = match message {
471 | RequestResponseMessage::Response { request_id, response } =>
472 match &response.header().typ {
473 | Some(Type::Ack) => {
474 if let Some(info) = self.peer_info.get_mut(&peer) {
475 if let Some(id) = info.recv_budget.grant.as_ref().map(|c| c.id) {
476 if Some(id) == response.header().ident {
477 log::trace!("{:08x}: received ack {} from {}", self.id, id, peer);
478 info.recv_budget.grant = None;
479 }
480 }
481 info.recv_budget.sent.remove(&request_id);
482 }
483 continue
484 }
485 | Some(Type::Response) => {
486 log::trace!("{:08x}: received response {} from {}", self.id, request_id, peer);
487 if let Some(rs) = response.into_parts().1 {
488 RequestResponseMessage::Response { request_id, response: rs }
489 } else {
490 log::error! { "{:08x}: missing data for response {} from peer {}",
491 self.id,
492 request_id,
493 peer
494 }
495 continue
496 }
497 }
498 | ty => {
499 log::trace! {
500 "{:08x}: unknown message type: {:?} from {}; expected response or credit",
501 self.id,
502 ty,
503 peer
504 };
505 continue
506 }
507 }
508 | RequestResponseMessage::Request { request_id, request, channel } =>
509 match &request.header().typ {
510 | Some(Type::Credit) => {
511 if let Some(info) = self.peer_info.get_mut(&peer) {
512 let id = if let Some(n) = request.header().ident {
513 n
514 } else {
515 log::warn! { "{:08x}: missing credit id in message from {}",
516 self.id,
517 peer
518 }
519 continue
520 };
521 let credit = request.header().credit.unwrap_or(0);
522 log::trace! { "{:08x}: received {} additional credit {} from {}",
523 self.id,
524 credit,
525 id,
526 peer
527 };
528 if info.send_budget.grant < Some(id) {
529 if info.send_budget.remaining == 0 && credit > 0 {
530 log::trace!("{:08x}: sending to peer {} can resume", self.id, peer);
531 self.events.push_back(Event::ResumeSending(peer))
532 }
533 info.send_budget.remaining += credit;
534 info.send_budget.grant = Some(id);
535 }
536 let _ = self.behaviour.send_response(channel, Message::ack(id));
539 info.send_budget.received.insert(request_id);
540 }
541 continue
542 }
543 | Some(Type::Request) => {
544 if let Some(info) = self.peer_info.get_mut(&peer) {
545 log::trace! { "{:08x}: received request {} (recv. budget = {})",
546 self.id,
547 request_id,
548 info.recv_budget.remaining
549 };
550 if info.recv_budget.remaining == 0 {
551 log::debug!("{:08x}: peer {} exceeds its budget", self.id, peer);
552 self.events.push_back(Event::TooManyInboundRequests(peer));
553 continue
554 }
555 info.recv_budget.remaining -= 1;
556 info.recv_budget.grant = None;
560 }
561 if let Some(rq) = request.into_parts().1 {
562 RequestResponseMessage::Request { request_id, request: rq, channel }
563 } else {
564 log::error! { "{:08x}: missing data for request {} from peer {}",
565 self.id,
566 request_id,
567 peer
568 }
569 continue
570 }
571 }
572 | ty => {
573 log::trace! {
574 "{:08x}: unknown message type: {:?} from {}; expected request or ack",
575 self.id,
576 ty,
577 peer
578 };
579 continue
580 }
581 }
582 };
583 let event = RequestResponseEvent::Message { peer, message };
584 NetworkBehaviourAction::GenerateEvent(Event::Event(event))
585 }
586 | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::OutboundFailure {
587 peer,
588 request_id,
589 error
590 }) => {
591 if let Some(info) = self.peer_info.get_mut(&peer) {
592 if let Some(grant) = info.recv_budget.grant.as_mut() {
593 if grant.request == request_id {
594 log::debug! {
595 "{:08x}: failed to send {} as credit {} to {}; retrying...",
596 self.id,
597 grant.credit,
598 grant.id,
599 peer
600 };
601 let msg = Message::credit(grant.credit, grant.id);
602 grant.request = self.behaviour.send_request(&peer, msg);
603 }
604 }
605
606 if info.recv_budget.sent.remove(&request_id) {
609 continue
610 }
611 }
612 let event = RequestResponseEvent::OutboundFailure { peer, request_id, error };
613 NetworkBehaviourAction::GenerateEvent(Event::Event(event))
614 }
615 | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::InboundFailure {
616 peer,
617 request_id,
618 error
619 }) => {
620 if let Some(info) = self.peer_info.get_mut(&peer) {
623 if info.send_budget.received.remove(&request_id) {
624 log::debug! {
625 "{:08}: failed to acknowledge credit grant from {}: {:?}",
626 self.id, peer, error
627 };
628 continue
629 }
630 }
631 let event = RequestResponseEvent::InboundFailure { peer, request_id, error };
632 NetworkBehaviourAction::GenerateEvent(Event::Event(event))
633 }
634 | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::ResponseSent {
635 peer,
636 request_id
637 }) => {
638 if let Some(info) = self.peer_info.get_mut(&peer) {
641 if info.send_budget.received.remove(&request_id) {
642 log::trace! {
643 "{:08}: successfully sent ACK for credit grant {:?}.",
644 self.id,
645 info.send_budget.grant,
646 }
647 continue
648 }
649 }
650 NetworkBehaviourAction::GenerateEvent(Event::Event(
651 RequestResponseEvent::ResponseSent { peer, request_id }))
652 }
653 | NetworkBehaviourAction::DialAddress { address } =>
654 NetworkBehaviourAction::DialAddress { address },
655 | NetworkBehaviourAction::DialPeer { peer_id, condition } =>
656 NetworkBehaviourAction::DialPeer { peer_id, condition },
657 | NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
658 NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
659 | NetworkBehaviourAction::ReportObservedAddr { address, score } =>
660 NetworkBehaviourAction::ReportObservedAddr { address, score }
661 };
662
663 return Poll::Ready(event)
664 }
665 }
666}