iroh_blobs/
downloader.rs

1//! Handle downloading blobs and collections concurrently and from nodes.
2//!
3//! The [`Downloader`] interacts with four main components to this end.
4//! - `ProviderMap`: Where the downloader obtains information about nodes that could be
5//!   used to perform a download.
6//! - [`Store`]: Where data is stored.
7//!
8//! Once a download request is received, the logic is as follows:
9//! 1. The `ProviderMap` is queried for nodes. From these nodes some are selected
10//!    prioritizing connected nodes with lower number of active requests. If no useful node is
11//!    connected, or useful connected nodes have no capacity to perform the request, a connection
12//!    attempt is started using the `DialerT`.
13//! 2. The download is queued for processing at a later time. Downloads are not performed right
14//!    away. Instead, they are initially delayed to allow the node to obtain the data itself, and
15//!    to wait for the new connection to be established if necessary.
16//! 3. Once a request is ready to be sent after a delay (initial or for a retry), the preferred
17//!    node is used if available. The request is now considered active.
18//!
19//! Concurrency is limited in different ways:
20//! - *Total number of active request:* This is a way to prevent a self DoS by overwhelming our own
21//!   bandwidth capacity. This is a best effort heuristic since it doesn't take into account how
22//!   much data we are actually requesting or receiving.
23//! - *Total number of connected nodes:* Peer connections are kept for a longer time than they are
24//!   strictly needed since it's likely they will be useful soon again.
25//! - *Requests per node*: to avoid overwhelming nodes with requests, the number of concurrent
26//!   requests to a single node is also limited.
27
28use std::{
29    collections::{
30        hash_map::{self, Entry},
31        HashMap, HashSet,
32    },
33    fmt,
34    future::Future,
35    num::NonZeroUsize,
36    pin::Pin,
37    sync::{
38        atomic::{AtomicU64, Ordering},
39        Arc,
40    },
41    task::Poll,
42    time::Duration,
43};
44
45use anyhow::anyhow;
46use futures_lite::{future::BoxedLocal, Stream, StreamExt};
47use hashlink::LinkedHashSet;
48use iroh::{endpoint, Endpoint, NodeAddr, NodeId};
49use iroh_metrics::inc;
50use tokio::{
51    sync::{mpsc, oneshot},
52    task::JoinSet,
53};
54use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
55use tracing::{debug, error, error_span, trace, warn, Instrument};
56
57use crate::{
58    get::{db::DownloadProgress, Stats},
59    metrics::Metrics,
60    store::Store,
61    util::{local_pool::LocalPoolHandle, progress::ProgressSender},
62    BlobFormat, Hash, HashAndFormat,
63};
64
65mod get;
66mod invariants;
67mod progress;
68mod test;
69
70use self::progress::{BroadcastProgressSender, ProgressSubscriber, ProgressTracker};
71
72/// Duration for which we keep nodes connected after they were last useful to us.
73const IDLE_PEER_TIMEOUT: Duration = Duration::from_secs(10);
74/// Capacity of the channel used to communicate between the [`Downloader`] and the [`Service`].
75const SERVICE_CHANNEL_CAPACITY: usize = 128;
76
77/// Identifier for a download intent.
78#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, derive_more::Display)]
79pub struct IntentId(pub u64);
80
81/// Trait modeling a dialer. This allows for IO-less testing.
82trait DialerT: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
83    /// Type of connections returned by the Dialer.
84    type Connection: Clone + 'static;
85    /// Dial a node.
86    fn queue_dial(&mut self, node_id: NodeId);
87    /// Get the number of dialing nodes.
88    fn pending_count(&self) -> usize;
89    /// Check if a node is being dialed.
90    fn is_pending(&self, node: NodeId) -> bool;
91    /// Get the node id of our node.
92    fn node_id(&self) -> NodeId;
93}
94
95/// Signals what should be done with the request when it fails.
96#[derive(Debug)]
97pub enum FailureAction {
98    /// The request was cancelled by us.
99    AllIntentsDropped,
100    /// An error occurred that prevents the request from being retried at all.
101    AbortRequest(anyhow::Error),
102    /// An error occurred that suggests the node should not be used in general.
103    DropPeer(anyhow::Error),
104    /// An error occurred in which neither the node nor the request are at fault.
105    RetryLater(anyhow::Error),
106}
107
108/// Future of a get request, for the checking stage.
109type GetStartFut<N> = BoxedLocal<Result<GetOutput<N>, FailureAction>>;
110/// Future of a get request, for the downloading stage.
111type GetProceedFut = BoxedLocal<InternalDownloadResult>;
112
113/// Trait modelling performing a single request over a connection. This allows for IO-less testing.
114pub trait Getter {
115    /// Type of connections the Getter requires to perform a download.
116    type Connection: 'static;
117    /// Type of the intermediary state returned from [`Self::get`] if a connection is needed.
118    type NeedsConn: NeedsConn<Self::Connection>;
119    /// Returns a future that checks the local store if the request is already complete, returning
120    /// a struct implementing [`NeedsConn`] if we need a network connection to proceed.
121    fn get(
122        &mut self,
123        kind: DownloadKind,
124        progress_sender: BroadcastProgressSender,
125    ) -> GetStartFut<Self::NeedsConn>;
126}
127
128/// Trait modelling the intermediary state when a connection is needed to proceed.
129pub trait NeedsConn<C>: std::fmt::Debug + 'static {
130    /// Proceeds the download with the given connection.
131    fn proceed(self, conn: C) -> GetProceedFut;
132}
133
134/// Output returned from [`Getter::get`].
135#[derive(Debug)]
136pub enum GetOutput<N> {
137    /// The request is already complete in the local store.
138    Complete(Stats),
139    /// The request needs a connection to continue.
140    NeedsConn(N),
141}
142
143/// Concurrency limits for the [`Downloader`].
144#[derive(Debug)]
145pub struct ConcurrencyLimits {
146    /// Maximum number of requests the service performs concurrently.
147    pub max_concurrent_requests: usize,
148    /// Maximum number of requests performed by a single node concurrently.
149    pub max_concurrent_requests_per_node: usize,
150    /// Maximum number of open connections the service maintains.
151    pub max_open_connections: usize,
152    /// Maximum number of nodes to dial concurrently for a single request.
153    pub max_concurrent_dials_per_hash: usize,
154}
155
156impl Default for ConcurrencyLimits {
157    fn default() -> Self {
158        // these numbers should be checked against a running node and might depend on platform
159        ConcurrencyLimits {
160            max_concurrent_requests: 50,
161            max_concurrent_requests_per_node: 4,
162            max_open_connections: 25,
163            max_concurrent_dials_per_hash: 5,
164        }
165    }
166}
167
168impl ConcurrencyLimits {
169    /// Checks if the maximum number of concurrent requests has been reached.
170    fn at_requests_capacity(&self, active_requests: usize) -> bool {
171        active_requests >= self.max_concurrent_requests
172    }
173
174    /// Checks if the maximum number of concurrent requests per node has been reached.
175    fn node_at_request_capacity(&self, active_node_requests: usize) -> bool {
176        active_node_requests >= self.max_concurrent_requests_per_node
177    }
178
179    /// Checks if the maximum number of connections has been reached.
180    fn at_connections_capacity(&self, active_connections: usize) -> bool {
181        active_connections >= self.max_open_connections
182    }
183
184    /// Checks if the maximum number of concurrent dials per hash has been reached.
185    ///
186    /// Note that this limit is not strictly enforced, and not checked in
187    /// [`Service::check_invariants`]. A certain hash can exceed this limit in a valid way if some
188    /// of its providers are dialed for another hash. However, once the limit is reached,
189    /// no new dials will be initiated for the hash.
190    fn at_dials_per_hash_capacity(&self, concurrent_dials: usize) -> bool {
191        concurrent_dials >= self.max_concurrent_dials_per_hash
192    }
193}
194
195/// Configuration for retry behavior of the [`Downloader`].
196#[derive(Debug)]
197pub struct RetryConfig {
198    /// Maximum number of retry attempts for a node that failed to dial or failed with IO errors.
199    pub max_retries_per_node: u32,
200    /// The initial delay to wait before retrying a node. On subsequent failures, the retry delay
201    /// will be multiplied with the number of failed retries.
202    pub initial_retry_delay: Duration,
203}
204
205impl Default for RetryConfig {
206    fn default() -> Self {
207        Self {
208            max_retries_per_node: 6,
209            initial_retry_delay: Duration::from_millis(500),
210        }
211    }
212}
213
214/// A download request.
215#[derive(Debug, Clone)]
216pub struct DownloadRequest {
217    kind: DownloadKind,
218    nodes: Vec<NodeAddr>,
219    progress: Option<ProgressSubscriber>,
220}
221
222impl DownloadRequest {
223    /// Create a new download request.
224    ///
225    /// It is the responsibility of the caller to ensure that the data is tagged either with a
226    /// temp tag or with a persistent tag to make sure the data is not garbage collected during
227    /// the download.
228    ///
229    /// If this is not done, there download will proceed as normal, but there is no guarantee
230    /// that the data is still available when the download is complete.
231    pub fn new(
232        resource: impl Into<DownloadKind>,
233        nodes: impl IntoIterator<Item = impl Into<NodeAddr>>,
234    ) -> Self {
235        Self {
236            kind: resource.into(),
237            nodes: nodes.into_iter().map(|n| n.into()).collect(),
238            progress: None,
239        }
240    }
241
242    /// Pass a progress sender to receive progress updates.
243    pub fn progress_sender(mut self, sender: ProgressSubscriber) -> Self {
244        self.progress = Some(sender);
245        self
246    }
247}
248
249/// The kind of resource to download.
250#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, derive_more::From, derive_more::Into)]
251pub struct DownloadKind(HashAndFormat);
252
253impl DownloadKind {
254    /// Get the hash of this download
255    pub const fn hash(&self) -> Hash {
256        self.0.hash
257    }
258
259    /// Get the format of this download
260    pub const fn format(&self) -> BlobFormat {
261        self.0.format
262    }
263
264    /// Get the [`HashAndFormat`] pair of this download
265    pub const fn hash_and_format(&self) -> HashAndFormat {
266        self.0
267    }
268}
269
270impl fmt::Display for DownloadKind {
271    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272        write!(f, "{}:{:?}", self.0.hash.fmt_short(), self.0.format)
273    }
274}
275
276/// The result of a download request, as returned to the application code.
277type ExternalDownloadResult = Result<Stats, DownloadError>;
278
279/// The result of a download request, as used in this module.
280type InternalDownloadResult = Result<Stats, FailureAction>;
281
282/// Error returned when a download could not be completed.
283#[derive(Debug, Clone, thiserror::Error)]
284pub enum DownloadError {
285    /// Failed to download from any provider
286    #[error("Failed to complete download")]
287    DownloadFailed,
288    /// The download was cancelled by us
289    #[error("Download cancelled by us")]
290    Cancelled,
291    /// No provider nodes found
292    #[error("No provider nodes found")]
293    NoProviders,
294    /// Failed to receive response from service.
295    #[error("Failed to receive response from download service")]
296    ActorClosed,
297}
298
299/// Handle to interact with a download request.
300#[derive(Debug)]
301pub struct DownloadHandle {
302    /// Id used to identify the request in the [`Downloader`].
303    id: IntentId,
304    /// Kind of download.
305    kind: DownloadKind,
306    /// Receiver to retrieve the return value of this download.
307    receiver: oneshot::Receiver<ExternalDownloadResult>,
308}
309
310impl Future for DownloadHandle {
311    type Output = ExternalDownloadResult;
312
313    fn poll(
314        mut self: std::pin::Pin<&mut Self>,
315        cx: &mut std::task::Context<'_>,
316    ) -> std::task::Poll<Self::Output> {
317        use std::task::Poll::*;
318        // make it easier on holders of the handle to poll the result, removing the receiver error
319        // from the middle
320        match std::pin::Pin::new(&mut self.receiver).poll(cx) {
321            Ready(Ok(result)) => Ready(result),
322            Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)),
323            Pending => Pending,
324        }
325    }
326}
327
328/// Handle for the download services.
329#[derive(Clone, Debug)]
330pub struct Downloader {
331    /// Next id to use for a download intent.
332    next_id: Arc<AtomicU64>,
333    /// Channel to communicate with the service.
334    msg_tx: mpsc::Sender<Message>,
335}
336
337impl Downloader {
338    /// Create a new Downloader with the default [`ConcurrencyLimits`] and [`RetryConfig`].
339    pub fn new<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle) -> Self
340    where
341        S: Store,
342    {
343        Self::with_config(store, endpoint, rt, Default::default(), Default::default())
344    }
345
346    /// Create a new Downloader with custom [`ConcurrencyLimits`] and [`RetryConfig`].
347    pub fn with_config<S>(
348        store: S,
349        endpoint: Endpoint,
350        rt: LocalPoolHandle,
351        concurrency_limits: ConcurrencyLimits,
352        retry_config: RetryConfig,
353    ) -> Self
354    where
355        S: Store,
356    {
357        let me = endpoint.node_id().fmt_short();
358        let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
359        let dialer = Dialer::new(endpoint);
360
361        let create_future = move || {
362            let getter = get::IoGetter {
363                store: store.clone(),
364            };
365
366            let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
367
368            service.run().instrument(error_span!("downloader", %me))
369        };
370        rt.spawn_detached(create_future);
371        Self {
372            next_id: Arc::new(AtomicU64::new(0)),
373            msg_tx,
374        }
375    }
376
377    /// Queue a download.
378    pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle {
379        let kind = request.kind;
380        let intent_id = IntentId(self.next_id.fetch_add(1, Ordering::SeqCst));
381        let (sender, receiver) = oneshot::channel();
382        let handle = DownloadHandle {
383            id: intent_id,
384            kind,
385            receiver,
386        };
387        let msg = Message::Queue {
388            on_finish: sender,
389            request,
390            intent_id,
391        };
392        // if this fails polling the handle will fail as well since the sender side of the oneshot
393        // will be dropped
394        if let Err(send_err) = self.msg_tx.send(msg).await {
395            let msg = send_err.0;
396            debug!(?msg, "download not sent");
397        }
398        handle
399    }
400
401    /// Cancel a download.
402    // NOTE: receiving the handle ensures an intent can't be cancelled twice
403    pub async fn cancel(&self, handle: DownloadHandle) {
404        let DownloadHandle {
405            id,
406            kind,
407            receiver: _,
408        } = handle;
409        let msg = Message::CancelIntent { id, kind };
410        if let Err(send_err) = self.msg_tx.send(msg).await {
411            let msg = send_err.0;
412            debug!(?msg, "cancel not sent");
413        }
414    }
415
416    /// Declare that certain nodes can be used to download a hash.
417    ///
418    /// Note that this does not start a download, but only provides new nodes to already queued
419    /// downloads. Use [`Self::queue`] to queue a download.
420    pub async fn nodes_have(&mut self, hash: Hash, nodes: Vec<NodeId>) {
421        let msg = Message::NodesHave { hash, nodes };
422        if let Err(send_err) = self.msg_tx.send(msg).await {
423            let msg = send_err.0;
424            debug!(?msg, "nodes have not been sent")
425        }
426    }
427}
428
429/// Messages the service can receive.
430#[derive(derive_more::Debug)]
431enum Message {
432    /// Queue a download intent.
433    Queue {
434        request: DownloadRequest,
435        #[debug(skip)]
436        on_finish: oneshot::Sender<ExternalDownloadResult>,
437        intent_id: IntentId,
438    },
439    /// Declare that nodes have a certain hash and can be used for downloading.
440    NodesHave { hash: Hash, nodes: Vec<NodeId> },
441    /// Cancel an intent. The associated request will be cancelled when the last intent is
442    /// cancelled.
443    CancelIntent { id: IntentId, kind: DownloadKind },
444}
445
446#[derive(derive_more::Debug)]
447struct IntentHandlers {
448    #[debug("oneshot::Sender<DownloadResult>")]
449    on_finish: oneshot::Sender<ExternalDownloadResult>,
450    on_progress: Option<ProgressSubscriber>,
451}
452
453/// Information about a request.
454#[derive(Debug)]
455struct RequestInfo<NC> {
456    /// Registered intents with progress senders and result callbacks.
457    intents: HashMap<IntentId, IntentHandlers>,
458    progress_sender: BroadcastProgressSender,
459    get_state: Option<NC>,
460}
461
462/// Information about a request in progress.
463#[derive(derive_more::Debug)]
464struct ActiveRequestInfo {
465    /// Token used to cancel the future doing the request.
466    #[debug(skip)]
467    cancellation: CancellationToken,
468    /// Peer doing this request attempt.
469    node: NodeId,
470}
471
472#[derive(Debug, Default)]
473struct RetryState {
474    /// How many times did we retry this node?
475    retry_count: u32,
476    /// Whether the node is currently queued for retry.
477    retry_is_queued: bool,
478}
479
480/// State of the connection to this node.
481#[derive(derive_more::Debug)]
482struct ConnectionInfo<Conn> {
483    /// Connection to this node.
484    #[debug(skip)]
485    conn: Conn,
486    /// State of this node.
487    state: ConnectedState,
488}
489
490impl<Conn> ConnectionInfo<Conn> {
491    /// Create a new idle node.
492    fn new_idle(connection: Conn, drop_key: delay_queue::Key) -> Self {
493        ConnectionInfo {
494            conn: connection,
495            state: ConnectedState::Idle { drop_key },
496        }
497    }
498
499    /// Count of active requests for the node.
500    fn active_requests(&self) -> usize {
501        match self.state {
502            ConnectedState::Busy { active_requests } => active_requests.get(),
503            ConnectedState::Idle { .. } => 0,
504        }
505    }
506
507    /// Returns `true` if the node is currently idle.
508    fn is_idle(&self) -> bool {
509        matches!(self.state, ConnectedState::Idle { .. })
510    }
511}
512
513/// State of a connected node.
514#[derive(derive_more::Debug)]
515enum ConnectedState {
516    /// Peer is handling at least one request.
517    Busy {
518        #[debug("{}", active_requests.get())]
519        active_requests: NonZeroUsize,
520    },
521    /// Peer is idle.
522    Idle {
523        #[debug(skip)]
524        drop_key: delay_queue::Key,
525    },
526}
527
528#[derive(Debug)]
529enum NodeState<'a, Conn> {
530    Connected(&'a ConnectionInfo<Conn>),
531    Dialing,
532    WaitForRetry,
533    Disconnected,
534}
535
536#[derive(Debug)]
537struct Service<G: Getter, D: DialerT> {
538    /// The getter performs individual requests.
539    getter: G,
540    /// Map to query for nodes that we believe have the data we are looking for.
541    providers: ProviderMap,
542    /// Dialer to get connections for required nodes.
543    dialer: D,
544    /// Limits to concurrent tasks handled by the service.
545    concurrency_limits: ConcurrencyLimits,
546    /// Configuration for retry behavior.
547    retry_config: RetryConfig,
548    /// Channel to receive messages from the service's handle.
549    msg_rx: mpsc::Receiver<Message>,
550    /// Nodes to which we have an active or idle connection.
551    connected_nodes: HashMap<NodeId, ConnectionInfo<D::Connection>>,
552    /// We track a retry state for nodes which failed to dial or in a transfer.
553    retry_node_state: HashMap<NodeId, RetryState>,
554    /// Delay queue for retrying failed nodes.
555    retry_nodes_queue: delay_queue::DelayQueue<NodeId>,
556    /// Delay queue for dropping idle nodes.
557    goodbye_nodes_queue: delay_queue::DelayQueue<NodeId>,
558    /// Queue of pending downloads.
559    queue: Queue,
560    /// Information about pending and active requests.
561    requests: HashMap<DownloadKind, RequestInfo<G::NeedsConn>>,
562    /// State of running downloads.
563    active_requests: HashMap<DownloadKind, ActiveRequestInfo>,
564    /// Tasks for currently running downloads.
565    in_progress_downloads: JoinSet<(DownloadKind, InternalDownloadResult)>,
566    /// Progress tracker
567    progress_tracker: ProgressTracker,
568}
569impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
570    fn new(
571        getter: G,
572        dialer: D,
573        concurrency_limits: ConcurrencyLimits,
574        retry_config: RetryConfig,
575        msg_rx: mpsc::Receiver<Message>,
576    ) -> Self {
577        Service {
578            getter,
579            dialer,
580            msg_rx,
581            concurrency_limits,
582            retry_config,
583            connected_nodes: Default::default(),
584            retry_node_state: Default::default(),
585            providers: Default::default(),
586            requests: Default::default(),
587            retry_nodes_queue: delay_queue::DelayQueue::default(),
588            goodbye_nodes_queue: delay_queue::DelayQueue::default(),
589            active_requests: Default::default(),
590            in_progress_downloads: Default::default(),
591            progress_tracker: ProgressTracker::new(),
592            queue: Default::default(),
593        }
594    }
595
596    /// Main loop for the service.
597    async fn run(mut self) {
598        loop {
599            trace!("wait for tick");
600            inc!(Metrics, downloader_tick_main);
601            tokio::select! {
602                Some((node, conn_result)) = self.dialer.next() => {
603                    trace!(node=%node.fmt_short(), "tick: connection ready");
604                    inc!(Metrics, downloader_tick_connection_ready);
605                    self.on_connection_ready(node, conn_result);
606                }
607                maybe_msg = self.msg_rx.recv() => {
608                    trace!(msg=?maybe_msg, "tick: message received");
609                    inc!(Metrics, downloader_tick_message_received);
610                    match maybe_msg {
611                        Some(msg) => self.handle_message(msg).await,
612                        None => return self.shutdown().await,
613                    }
614                }
615                Some(res) = self.in_progress_downloads.join_next(), if !self.in_progress_downloads.is_empty() => {
616                    match res {
617                        Ok((kind, result)) => {
618                            trace!(%kind, "tick: transfer completed");
619                            inc!(Metrics, downloader_tick_transfer_completed);
620                            self.on_download_completed(kind, result);
621                        }
622                        Err(err) => {
623                            warn!(?err, "transfer task panicked");
624                            inc!(Metrics, downloader_tick_transfer_failed);
625                        }
626                    }
627                }
628                Some(expired) = self.retry_nodes_queue.next() => {
629                    let node = expired.into_inner();
630                    trace!(node=%node.fmt_short(), "tick: retry node");
631                    inc!(Metrics, downloader_tick_retry_node);
632                    self.on_retry_wait_elapsed(node);
633                }
634                Some(expired) = self.goodbye_nodes_queue.next() => {
635                    let node = expired.into_inner();
636                    trace!(node=%node.fmt_short(), "tick: goodbye node");
637                    inc!(Metrics, downloader_tick_goodbye_node);
638                    self.disconnect_idle_node(node, "idle expired");
639                }
640            }
641
642            self.process_head();
643
644            #[cfg(any(test, debug_assertions))]
645            self.check_invariants();
646        }
647    }
648
649    /// Handle receiving a [`Message`].
650    // This is called in the actor loop, and only async because subscribing to an existing transfer
651    // sends the initial state.
652    async fn handle_message(&mut self, msg: Message) {
653        match msg {
654            Message::Queue {
655                request,
656                on_finish,
657                intent_id,
658            } => {
659                self.handle_queue_new_download(request, intent_id, on_finish)
660                    .await
661            }
662            Message::CancelIntent { id, kind } => self.handle_cancel_download(id, kind).await,
663            Message::NodesHave { hash, nodes } => {
664                let updated = self
665                    .providers
666                    .add_nodes_if_hash_exists(hash, nodes.iter().cloned());
667                if updated {
668                    self.queue.unpark_hash(hash);
669                }
670            }
671        }
672    }
673
674    /// Handle a [`Message::Queue`].
675    ///
676    /// If this intent maps to a request that already exists, it will be registered with it. If the
677    /// request is new it will be scheduled.
678    async fn handle_queue_new_download(
679        &mut self,
680        request: DownloadRequest,
681        intent_id: IntentId,
682        on_finish: oneshot::Sender<ExternalDownloadResult>,
683    ) {
684        let DownloadRequest {
685            kind,
686            nodes,
687            progress,
688        } = request;
689        debug!(%kind, nodes=?nodes.iter().map(|n| n.node_id.fmt_short()).collect::<Vec<_>>(), "queue intent");
690
691        // store the download intent
692        let intent_handlers = IntentHandlers {
693            on_finish,
694            on_progress: progress,
695        };
696
697        // add the nodes to the provider map
698        // (skip the node id of our own node - we should never attempt to download from ourselves)
699        let node_ids = nodes
700            .iter()
701            .map(|n| n.node_id)
702            .filter(|node_id| *node_id != self.dialer.node_id());
703        let updated = self.providers.add_hash_with_nodes(kind.hash(), node_ids);
704
705        // queue the transfer (if not running) or attach to transfer progress (if already running)
706        match self.requests.entry(kind) {
707            hash_map::Entry::Occupied(mut entry) => {
708                if let Some(on_progress) = &intent_handlers.on_progress {
709                    // this is async because it sends the current state over the progress channel
710                    if let Err(err) = self
711                        .progress_tracker
712                        .subscribe(kind, on_progress.clone())
713                        .await
714                    {
715                        debug!(?err, %kind, "failed to subscribe progress sender to transfer");
716                    }
717                }
718                entry.get_mut().intents.insert(intent_id, intent_handlers);
719            }
720            hash_map::Entry::Vacant(entry) => {
721                let progress_sender = self.progress_tracker.track(
722                    kind,
723                    intent_handlers
724                        .on_progress
725                        .clone()
726                        .into_iter()
727                        .collect::<Vec<_>>(),
728                );
729
730                let get_state = match self.getter.get(kind, progress_sender.clone()).await {
731                    Err(err) => {
732                        // This prints a "FailureAction" which is somewhat weird, but that's all we get here.
733                        tracing::error!(?err, "failed queuing new download");
734                        self.finalize_download(
735                            kind,
736                            [(intent_id, intent_handlers)].into(),
737                            // TODO: add better error variant? this is only triggered if the local
738                            // store failed with local IO.
739                            Err(DownloadError::DownloadFailed),
740                        );
741                        return;
742                    }
743                    Ok(GetOutput::Complete(stats)) => {
744                        self.finalize_download(
745                            kind,
746                            [(intent_id, intent_handlers)].into(),
747                            Ok(stats),
748                        );
749                        return;
750                    }
751                    Ok(GetOutput::NeedsConn(state)) => {
752                        // early exit if no providers.
753                        if self.providers.get_candidates(&kind.hash()).next().is_none() {
754                            self.finalize_download(
755                                kind,
756                                [(intent_id, intent_handlers)].into(),
757                                Err(DownloadError::NoProviders),
758                            );
759                            return;
760                        }
761                        state
762                    }
763                };
764                entry.insert(RequestInfo {
765                    intents: [(intent_id, intent_handlers)].into_iter().collect(),
766                    progress_sender,
767                    get_state: Some(get_state),
768                });
769                self.queue.insert(kind);
770            }
771        }
772
773        if updated && self.queue.is_parked(&kind) {
774            // the transfer is on hold for pending retries, and we added new nodes, so move back to queue.
775            self.queue.unpark(&kind);
776        }
777    }
778
779    /// Cancels a download intent.
780    ///
781    /// This removes the intent from the list of intents for the `kind`. If the removed intent was
782    /// the last one for the `kind`, this means that the download is no longer needed. In this
783    /// case, the `kind` will be removed from the list of pending downloads - and, if the download was
784    /// already started, the download task will be cancelled.
785    ///
786    /// The method is async because it will send a final abort event on the progress sender.
787    async fn handle_cancel_download(&mut self, intent_id: IntentId, kind: DownloadKind) {
788        let Entry::Occupied(mut occupied_entry) = self.requests.entry(kind) else {
789            warn!(%kind, %intent_id, "cancel download called for unknown download");
790            return;
791        };
792
793        let request_info = occupied_entry.get_mut();
794        if let Some(handlers) = request_info.intents.remove(&intent_id) {
795            handlers.on_finish.send(Err(DownloadError::Cancelled)).ok();
796
797            if let Some(sender) = handlers.on_progress {
798                self.progress_tracker.unsubscribe(&kind, &sender);
799                sender
800                    .send(DownloadProgress::Abort(serde_error::Error::new(
801                        &*anyhow::Error::from(DownloadError::Cancelled),
802                    )))
803                    .await
804                    .ok();
805            }
806        }
807
808        if request_info.intents.is_empty() {
809            occupied_entry.remove();
810            if let Entry::Occupied(occupied_entry) = self.active_requests.entry(kind) {
811                occupied_entry.remove().cancellation.cancel();
812            } else {
813                self.queue.remove(&kind);
814            }
815            self.remove_hash_if_not_queued(&kind.hash());
816        }
817    }
818
819    /// Handle receiving a new connection.
820    fn on_connection_ready(&mut self, node: NodeId, result: anyhow::Result<D::Connection>) {
821        debug_assert!(
822            !self.connected_nodes.contains_key(&node),
823            "newly connected node is not yet connected"
824        );
825        match result {
826            Ok(connection) => {
827                trace!(node=%node.fmt_short(), "connected to node");
828                let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
829                self.connected_nodes
830                    .insert(node, ConnectionInfo::new_idle(connection, drop_key));
831            }
832            Err(err) => {
833                debug!(%node, %err, "connection to node failed");
834                self.disconnect_and_retry(node);
835            }
836        }
837    }
838
839    fn on_download_completed(&mut self, kind: DownloadKind, result: InternalDownloadResult) {
840        // first remove the request
841        let active_request_info = self
842            .active_requests
843            .remove(&kind)
844            .expect("request was active");
845
846        // get general request info
847        let request_info = self.requests.remove(&kind).expect("request was active");
848
849        let ActiveRequestInfo { node, .. } = active_request_info;
850
851        // get node info
852        let node_info = self
853            .connected_nodes
854            .get_mut(&node)
855            .expect("node exists in the mapping");
856
857        // update node busy/idle state
858        node_info.state = match NonZeroUsize::new(node_info.active_requests() - 1) {
859            None => {
860                // last request of the node was this one, switch to idle
861                let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
862                ConnectedState::Idle { drop_key }
863            }
864            Some(active_requests) => ConnectedState::Busy { active_requests },
865        };
866
867        match &result {
868            Ok(_) => {
869                debug!(%kind, node=%node.fmt_short(), "download successful");
870                // clear retry state if operation was successful
871                self.retry_node_state.remove(&node);
872            }
873            Err(FailureAction::AllIntentsDropped) => {
874                debug!(%kind, node=%node.fmt_short(), "download cancelled");
875            }
876            Err(FailureAction::AbortRequest(reason)) => {
877                debug!(%kind, node=%node.fmt_short(), %reason, "download failed: abort request");
878                // do not try to download the hash from this node again
879                self.providers.remove_hash_from_node(&kind.hash(), &node);
880            }
881            Err(FailureAction::DropPeer(reason)) => {
882                debug!(%kind, node=%node.fmt_short(), %reason, "download failed: drop node");
883                if node_info.is_idle() {
884                    // remove the node
885                    self.remove_node(node, "explicit drop");
886                } else {
887                    // do not try to download the hash from this node again
888                    self.providers.remove_hash_from_node(&kind.hash(), &node);
889                }
890            }
891            Err(FailureAction::RetryLater(reason)) => {
892                debug!(%kind, node=%node.fmt_short(), %reason, "download failed: retry later");
893                if node_info.is_idle() {
894                    self.disconnect_and_retry(node);
895                }
896            }
897        };
898
899        // we finalize the download if either the download was successful,
900        // or if it should never proceed because all intents were dropped,
901        // or if we don't have any candidates to proceed with anymore.
902        let finalize = match &result {
903            Ok(_) | Err(FailureAction::AllIntentsDropped) => true,
904            _ => !self.providers.has_candidates(&kind.hash()),
905        };
906
907        if finalize {
908            let result = result.map_err(|_| DownloadError::DownloadFailed);
909            self.finalize_download(kind, request_info.intents, result);
910        } else {
911            // reinsert the download at the front of the queue to try from the next node
912            self.requests.insert(kind, request_info);
913            self.queue.insert_front(kind);
914        }
915    }
916
917    /// Finalize a download.
918    ///
919    /// This triggers the intent return channels, and removes the download from the progress tracker
920    /// and provider map.
921    fn finalize_download(
922        &mut self,
923        kind: DownloadKind,
924        intents: HashMap<IntentId, IntentHandlers>,
925        result: ExternalDownloadResult,
926    ) {
927        self.progress_tracker.remove(&kind);
928        self.remove_hash_if_not_queued(&kind.hash());
929        for (_id, handlers) in intents.into_iter() {
930            handlers.on_finish.send(result.clone()).ok();
931        }
932    }
933
934    fn on_retry_wait_elapsed(&mut self, node: NodeId) {
935        // check if the node is still needed
936        let Some(hashes) = self.providers.node_hash.get(&node) else {
937            self.retry_node_state.remove(&node);
938            return;
939        };
940        let Some(state) = self.retry_node_state.get_mut(&node) else {
941            warn!(node=%node.fmt_short(), "missing retry state for node ready for retry");
942            return;
943        };
944        state.retry_is_queued = false;
945        for hash in hashes {
946            self.queue.unpark_hash(*hash);
947        }
948    }
949
950    /// Start the next downloads, or dial nodes, if limits permit and the queue is non-empty.
951    ///
952    /// This is called after all actions. If there is nothing to do, it will return cheaply.
953    /// Otherwise, we will check the next hash in the queue, and:
954    /// * start the transfer if we are connected to a provider and limits are ok
955    /// * or, connect to a provider, if there is one we are not dialing yet and limits are ok
956    /// * or, disconnect an idle node if it would allow us to connect to a provider,
957    /// * or, if all providers are waiting for retry, park the download
958    /// * or, if our limits are reached, do nothing for now
959    ///
960    /// The download requests will only be popped from the queue once we either start the transfer
961    /// from a connected node [`NextStep::StartTransfer`], or if we abort the download on
962    /// [`NextStep::OutOfProviders`]. In all other cases, the request is kept at the top of the
963    /// queue, so the next call to [`Self::process_head`] will evaluate the situation again - and
964    /// so forth, until either [`NextStep::StartTransfer`] or [`NextStep::OutOfProviders`] is
965    /// reached.
966    fn process_head(&mut self) {
967        // start as many queued downloads as allowed by the request limits.
968        loop {
969            let Some(kind) = self.queue.front().cloned() else {
970                break;
971            };
972
973            let next_step = self.next_step(&kind);
974            trace!(%kind, ?next_step, "process_head");
975
976            match next_step {
977                NextStep::Wait => break,
978                NextStep::StartTransfer(node) => {
979                    let _ = self.queue.pop_front();
980                    debug!(%kind, node=%node.fmt_short(), "start transfer");
981                    self.start_download(kind, node);
982                }
983                NextStep::Dial(node) => {
984                    debug!(%kind, node=%node.fmt_short(), "dial node");
985                    self.dialer.queue_dial(node);
986                }
987                NextStep::DialQueuedDisconnect(node, key) => {
988                    let idle_node = self.goodbye_nodes_queue.remove(&key).into_inner();
989                    self.disconnect_idle_node(idle_node, "drop idle for new dial");
990                    debug!(%kind, node=%node.fmt_short(), idle_node=%idle_node.fmt_short(), "dial node, disconnect idle node)");
991                    self.dialer.queue_dial(node);
992                }
993                NextStep::Park => {
994                    debug!(%kind, "park download: all providers waiting for retry");
995                    self.queue.park_front();
996                }
997                NextStep::OutOfProviders => {
998                    debug!(%kind, "abort download: out of providers");
999                    let _ = self.queue.pop_front();
1000                    let info = self.requests.remove(&kind).expect("queued downloads exist");
1001                    self.finalize_download(kind, info.intents, Err(DownloadError::NoProviders));
1002                }
1003            }
1004        }
1005    }
1006
1007    /// Drop the connection to a node and insert it into the the retry queue.
1008    fn disconnect_and_retry(&mut self, node: NodeId) {
1009        self.disconnect_idle_node(node, "queue retry");
1010        let retry_state = self.retry_node_state.entry(node).or_default();
1011        retry_state.retry_count += 1;
1012        if retry_state.retry_count <= self.retry_config.max_retries_per_node {
1013            // node can be retried
1014            debug!(node=%node.fmt_short(), retry_count=retry_state.retry_count, "queue retry");
1015            let timeout = self.retry_config.initial_retry_delay * retry_state.retry_count;
1016            self.retry_nodes_queue.insert(node, timeout);
1017            retry_state.retry_is_queued = true;
1018        } else {
1019            // node is dead
1020            self.remove_node(node, "retries exceeded");
1021        }
1022    }
1023
1024    /// Calculate the next step needed to proceed the download for `kind`.
1025    ///
1026    /// This is called once `kind` has reached the head of the queue, see [`Self::process_head`].
1027    /// It can be called repeatedly, and does nothing on itself, only calculate what *should* be
1028    /// done next.
1029    ///
1030    /// See [`NextStep`] for details on the potential next steps returned from this method.
1031    fn next_step(&self, kind: &DownloadKind) -> NextStep {
1032        // If the total requests capacity is reached, we have to wait until an active request
1033        // completes.
1034        if self
1035            .concurrency_limits
1036            .at_requests_capacity(self.active_requests.len())
1037        {
1038            return NextStep::Wait;
1039        };
1040
1041        let mut candidates = self.providers.get_candidates(&kind.hash()).peekable();
1042        // If we have no provider candidates for this download, there's nothing else we can do.
1043        if candidates.peek().is_none() {
1044            return NextStep::OutOfProviders;
1045        }
1046
1047        // Track if there is provider node to which we are connected and which is not at its request capacity.
1048        // If there are more than one, take the one with the least amount of running transfers.
1049        let mut best_connected: Option<(NodeId, usize)> = None;
1050        // Track if there is a disconnected provider node to which we can potentially connect.
1051        let mut next_to_dial = None;
1052        // Track the number of provider nodes that are currently being dialed.
1053        let mut currently_dialing = 0;
1054        // Track if we have at least one provider node which is currently at its request capacity.
1055        // If this is the case, we will never return [`NextStep::OutOfProviders`] but [`NextStep::Wait`]
1056        // instead, because we can still try that node once it has finished its work.
1057        let mut has_exhausted_provider = false;
1058        // Track if we have at least one provider node that is currently in the retry queue.
1059        let mut has_retrying_provider = false;
1060
1061        for node in candidates {
1062            match self.node_state(node) {
1063                NodeState::Connected(info) => {
1064                    let active_requests = info.active_requests();
1065                    if self
1066                        .concurrency_limits
1067                        .node_at_request_capacity(active_requests)
1068                    {
1069                        has_exhausted_provider = true;
1070                    } else {
1071                        best_connected = Some(match best_connected.take() {
1072                            Some(old) if old.1 <= active_requests => old,
1073                            _ => (node, active_requests),
1074                        });
1075                    }
1076                }
1077                NodeState::Dialing => {
1078                    currently_dialing += 1;
1079                }
1080                NodeState::WaitForRetry => {
1081                    has_retrying_provider = true;
1082                }
1083                NodeState::Disconnected => {
1084                    if next_to_dial.is_none() {
1085                        next_to_dial = Some(node);
1086                    }
1087                }
1088            }
1089        }
1090
1091        let has_dialing = currently_dialing > 0;
1092
1093        // If we have a connected provider node with free slots, use it!
1094        if let Some((node, _active_requests)) = best_connected {
1095            NextStep::StartTransfer(node)
1096        }
1097        // If we have a node which could be dialed: Check capacity and act accordingly.
1098        else if let Some(node) = next_to_dial {
1099            // We check if the dial capacity for this hash is exceeded: We only start new dials for
1100            // the hash if we are below the limit.
1101            //
1102            // If other requests trigger dials for providers of this hash, the limit may be
1103            // exceeded, but then we just don't start further dials and wait until one completes.
1104            let at_dial_capacity = has_dialing
1105                && self
1106                    .concurrency_limits
1107                    .at_dials_per_hash_capacity(currently_dialing);
1108            // Check if we reached the global connection limit.
1109            let at_connections_capacity = self.at_connections_capacity();
1110
1111            // All slots are free: We can dial our candidate.
1112            if !at_connections_capacity && !at_dial_capacity {
1113                NextStep::Dial(node)
1114            }
1115            // The hash has free dial capacity, but the global connection capacity is reached.
1116            // But if we have idle nodes, we will disconnect the longest idling node, and then dial our
1117            // candidate.
1118            else if at_connections_capacity
1119                && !at_dial_capacity
1120                && !self.goodbye_nodes_queue.is_empty()
1121            {
1122                let key = self.goodbye_nodes_queue.peek().expect("just checked");
1123                NextStep::DialQueuedDisconnect(node, key)
1124            }
1125            // No dial capacity, and no idling nodes: We have to wait until capacity is freed up.
1126            else {
1127                NextStep::Wait
1128            }
1129        }
1130        // If we have pending dials to candidates, or connected candidates which are busy
1131        // with other work: Wait for one of these to become available.
1132        else if has_exhausted_provider || has_dialing {
1133            NextStep::Wait
1134        }
1135        // All providers are in the retry queue: Park this request until they can be tried again.
1136        else if has_retrying_provider {
1137            NextStep::Park
1138        }
1139        // We have no candidates left: Nothing more to do.
1140        else {
1141            NextStep::OutOfProviders
1142        }
1143    }
1144
1145    /// Start downloading from the given node.
1146    ///
1147    /// Panics if hash is not in self.requests or node is not in self.nodes.
1148    fn start_download(&mut self, kind: DownloadKind, node: NodeId) {
1149        let node_info = self.connected_nodes.get_mut(&node).expect("node exists");
1150        let request_info = self.requests.get_mut(&kind).expect("request exists");
1151        let progress = request_info.progress_sender.clone();
1152        // .expect("queued state exists");
1153
1154        // create the active request state
1155        let cancellation = CancellationToken::new();
1156        let state = ActiveRequestInfo {
1157            cancellation: cancellation.clone(),
1158            node,
1159        };
1160        let conn = node_info.conn.clone();
1161
1162        // If this is the first provider node we try, we have an initial state
1163        // from starting the generator in Self::handle_queue_new_download.
1164        // If this not the first provider node we try, we have to recreate the generator, because
1165        // we can only resume it once.
1166        let get_state = match request_info.get_state.take() {
1167            Some(state) => Either::Left(async move { Ok(GetOutput::NeedsConn(state)) }),
1168            None => Either::Right(self.getter.get(kind, progress)),
1169        };
1170        let fut = async move {
1171            // NOTE: it's an open question if we should do timeouts at this point. Considerations from @Frando:
1172            // > at this stage we do not know the size of the download, so the timeout would have
1173            // > to be so large that it won't be useful for non-huge downloads. At the same time,
1174            // > this means that a super slow node would block a download from succeeding for a long
1175            // > time, while faster nodes could be readily available.
1176            // As a conclusion, timeouts should be added only after downloads are known to be bounded
1177            let fut = async move {
1178                match get_state.await? {
1179                    GetOutput::Complete(stats) => Ok(stats),
1180                    GetOutput::NeedsConn(state) => state.proceed(conn).await,
1181                }
1182            };
1183            tokio::pin!(fut);
1184            let res = tokio::select! {
1185                _ = cancellation.cancelled() => Err(FailureAction::AllIntentsDropped),
1186                res = &mut fut => res
1187            };
1188            trace!("transfer finished");
1189
1190            (kind, res)
1191        }
1192        .instrument(error_span!("transfer", %kind, node=%node.fmt_short()));
1193        node_info.state = match &node_info.state {
1194            ConnectedState::Busy { active_requests } => ConnectedState::Busy {
1195                active_requests: active_requests.saturating_add(1),
1196            },
1197            ConnectedState::Idle { drop_key } => {
1198                self.goodbye_nodes_queue.remove(drop_key);
1199                ConnectedState::Busy {
1200                    active_requests: NonZeroUsize::new(1).expect("clearly non zero"),
1201                }
1202            }
1203        };
1204        self.active_requests.insert(kind, state);
1205        self.in_progress_downloads.spawn_local(fut);
1206    }
1207
1208    fn disconnect_idle_node(&mut self, node: NodeId, reason: &'static str) -> bool {
1209        if let Some(info) = self.connected_nodes.remove(&node) {
1210            match info.state {
1211                ConnectedState::Idle { drop_key } => {
1212                    self.goodbye_nodes_queue.try_remove(&drop_key);
1213                    true
1214                }
1215                ConnectedState::Busy { .. } => {
1216                    warn!("expected removed node to be idle, but is busy (removal reason: {reason:?})");
1217                    self.connected_nodes.insert(node, info);
1218                    false
1219                }
1220            }
1221        } else {
1222            true
1223        }
1224    }
1225
1226    fn remove_node(&mut self, node: NodeId, reason: &'static str) {
1227        debug!(node = %node.fmt_short(), %reason, "remove node");
1228        if self.disconnect_idle_node(node, reason) {
1229            self.providers.remove_node(&node);
1230            self.retry_node_state.remove(&node);
1231        }
1232    }
1233
1234    fn node_state(&self, node: NodeId) -> NodeState<'_, D::Connection> {
1235        if let Some(info) = self.connected_nodes.get(&node) {
1236            NodeState::Connected(info)
1237        } else if self.dialer.is_pending(node) {
1238            NodeState::Dialing
1239        } else {
1240            match self.retry_node_state.get(&node) {
1241                Some(state) if state.retry_is_queued => NodeState::WaitForRetry,
1242                _ => NodeState::Disconnected,
1243            }
1244        }
1245    }
1246
1247    /// Check if we have maxed our connection capacity.
1248    fn at_connections_capacity(&self) -> bool {
1249        self.concurrency_limits
1250            .at_connections_capacity(self.connections_count())
1251    }
1252
1253    /// Get the total number of connected and dialing nodes.
1254    fn connections_count(&self) -> usize {
1255        let connected_nodes = self.connected_nodes.values().count();
1256        let dialing_nodes = self.dialer.pending_count();
1257        connected_nodes + dialing_nodes
1258    }
1259
1260    /// Remove a `hash` from the [`ProviderMap`], but only if [`Self::queue`] does not contain the
1261    /// hash at all, even with the other [`BlobFormat`].
1262    fn remove_hash_if_not_queued(&mut self, hash: &Hash) {
1263        if !self.queue.contains_hash(*hash) {
1264            self.providers.remove_hash(hash);
1265        }
1266    }
1267
1268    #[allow(clippy::unused_async)]
1269    async fn shutdown(self) {
1270        debug!("shutting down");
1271        // TODO(@divma): how to make sure the download futures end gracefully?
1272    }
1273}
1274
1275/// The next step needed to continue a download.
1276///
1277/// See [`Service::next_step`] for details.
1278#[derive(Debug)]
1279enum NextStep {
1280    /// Provider connection is ready, initiate the transfer.
1281    StartTransfer(NodeId),
1282    /// Start to dial `NodeId`.
1283    ///
1284    /// This means: We have no non-exhausted connection to a provider node, but a free connection slot
1285    /// and a provider node we are not yet connected to.
1286    Dial(NodeId),
1287    /// Start to dial `NodeId`, but first disconnect the idle node behind [`delay_queue::Key`] in
1288    /// [`Service::goodbye_nodes_queue`] to free up a connection slot.
1289    DialQueuedDisconnect(NodeId, delay_queue::Key),
1290    /// Resource limits are exhausted, do nothing for now and wait until a slot frees up.
1291    Wait,
1292    /// All providers are currently in a retry timeout. Park the download aside, and move
1293    /// to the next download in the queue.
1294    Park,
1295    /// We have tried all available providers. There is nothing else to do.
1296    OutOfProviders,
1297}
1298
1299/// Map of potential providers for a hash.
1300#[derive(Default, Debug)]
1301struct ProviderMap {
1302    hash_node: HashMap<Hash, HashSet<NodeId>>,
1303    node_hash: HashMap<NodeId, HashSet<Hash>>,
1304}
1305
1306impl ProviderMap {
1307    /// Get candidates to download this hash.
1308    pub fn get_candidates<'a>(&'a self, hash: &Hash) -> impl Iterator<Item = NodeId> + 'a {
1309        self.hash_node
1310            .get(hash)
1311            .map(|nodes| nodes.iter())
1312            .into_iter()
1313            .flatten()
1314            .copied()
1315    }
1316
1317    /// Whether we have any candidates to download this hash.
1318    pub fn has_candidates(&self, hash: &Hash) -> bool {
1319        self.hash_node
1320            .get(hash)
1321            .map(|nodes| !nodes.is_empty())
1322            .unwrap_or(false)
1323    }
1324
1325    /// Register nodes for a hash. Should only be done for hashes we care to download.
1326    ///
1327    /// Returns `true` if new providers were added.
1328    fn add_hash_with_nodes(&mut self, hash: Hash, nodes: impl Iterator<Item = NodeId>) -> bool {
1329        let mut updated = false;
1330        let hash_entry = self.hash_node.entry(hash).or_default();
1331        for node in nodes {
1332            updated |= hash_entry.insert(node);
1333            let node_entry = self.node_hash.entry(node).or_default();
1334            node_entry.insert(hash);
1335        }
1336        updated
1337    }
1338
1339    /// Register nodes for a hash, but only if the hash is already in our queue.
1340    ///
1341    /// Returns `true` if a new node was added.
1342    fn add_nodes_if_hash_exists(
1343        &mut self,
1344        hash: Hash,
1345        nodes: impl Iterator<Item = NodeId>,
1346    ) -> bool {
1347        let mut updated = false;
1348        if let Some(hash_entry) = self.hash_node.get_mut(&hash) {
1349            for node in nodes {
1350                updated |= hash_entry.insert(node);
1351                let node_entry = self.node_hash.entry(node).or_default();
1352                node_entry.insert(hash);
1353            }
1354        }
1355        updated
1356    }
1357
1358    /// Signal the registry that this hash is no longer of interest.
1359    fn remove_hash(&mut self, hash: &Hash) {
1360        if let Some(nodes) = self.hash_node.remove(hash) {
1361            for node in nodes {
1362                if let Some(hashes) = self.node_hash.get_mut(&node) {
1363                    hashes.remove(hash);
1364                    if hashes.is_empty() {
1365                        self.node_hash.remove(&node);
1366                    }
1367                }
1368            }
1369        }
1370    }
1371
1372    fn remove_node(&mut self, node: &NodeId) {
1373        if let Some(hashes) = self.node_hash.remove(node) {
1374            for hash in hashes {
1375                if let Some(nodes) = self.hash_node.get_mut(&hash) {
1376                    nodes.remove(node);
1377                    if nodes.is_empty() {
1378                        self.hash_node.remove(&hash);
1379                    }
1380                }
1381            }
1382        }
1383    }
1384
1385    fn remove_hash_from_node(&mut self, hash: &Hash, node: &NodeId) {
1386        if let Some(nodes) = self.hash_node.get_mut(hash) {
1387            nodes.remove(node);
1388            if nodes.is_empty() {
1389                self.remove_hash(hash);
1390            }
1391        }
1392        if let Some(hashes) = self.node_hash.get_mut(node) {
1393            hashes.remove(hash);
1394            if hashes.is_empty() {
1395                self.remove_node(node);
1396            }
1397        }
1398    }
1399}
1400
1401/// The queue of requested downloads.
1402///
1403/// This manages two datastructures:
1404/// * The main queue, a FIFO queue where each item can only appear once.
1405///   New downloads are pushed to the back of the queue, and the next download to process is popped
1406///   from the front.
1407/// * The parked set, a hash set. Items can be moved from the main queue into the parked set.
1408///   Parked items will not be popped unless they are moved back into the main queue.
1409#[derive(Debug, Default)]
1410struct Queue {
1411    main: LinkedHashSet<DownloadKind>,
1412    parked: HashSet<DownloadKind>,
1413}
1414
1415impl Queue {
1416    /// Peek at the front element of the main queue.
1417    pub fn front(&self) -> Option<&DownloadKind> {
1418        self.main.front()
1419    }
1420
1421    #[cfg(any(test, debug_assertions))]
1422    pub fn iter_parked(&self) -> impl Iterator<Item = &DownloadKind> {
1423        self.parked.iter()
1424    }
1425
1426    #[cfg(any(test, debug_assertions))]
1427    pub fn iter(&self) -> impl Iterator<Item = &DownloadKind> {
1428        self.main.iter().chain(self.parked.iter())
1429    }
1430
1431    /// Returns `true` if either the main queue or the parked set contain a download.
1432    pub fn contains(&self, kind: &DownloadKind) -> bool {
1433        self.main.contains(kind) || self.parked.contains(kind)
1434    }
1435
1436    /// Returns `true` if either the main queue or the parked set contain a download for a hash.
1437    pub fn contains_hash(&self, hash: Hash) -> bool {
1438        let as_raw = HashAndFormat::raw(hash).into();
1439        let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1440        self.contains(&as_raw) || self.contains(&as_hash_seq)
1441    }
1442
1443    /// Returns `true` if a download is in the parked set.
1444    pub fn is_parked(&self, kind: &DownloadKind) -> bool {
1445        self.parked.contains(kind)
1446    }
1447
1448    /// Insert an element at the back of the main queue.
1449    pub fn insert(&mut self, kind: DownloadKind) {
1450        if !self.main.contains(&kind) {
1451            self.main.insert(kind);
1452        }
1453    }
1454
1455    /// Insert an element at the front of the main queue.
1456    pub fn insert_front(&mut self, kind: DownloadKind) {
1457        if !self.main.contains(&kind) {
1458            self.main.insert(kind);
1459        }
1460        self.main.to_front(&kind);
1461    }
1462
1463    /// Dequeue the first download of the main queue.
1464    pub fn pop_front(&mut self) -> Option<DownloadKind> {
1465        self.main.pop_front()
1466    }
1467
1468    /// Move the front item of the main queue into the parked set.
1469    pub fn park_front(&mut self) {
1470        if let Some(item) = self.pop_front() {
1471            self.parked.insert(item);
1472        }
1473    }
1474
1475    /// Move a download from the parked set to the front of the main queue.
1476    pub fn unpark(&mut self, kind: &DownloadKind) {
1477        if self.parked.remove(kind) {
1478            self.main.insert(*kind);
1479            self.main.to_front(kind);
1480        }
1481    }
1482
1483    /// Move any download for a hash from the parked set to the main queue.
1484    pub fn unpark_hash(&mut self, hash: Hash) {
1485        let as_raw = HashAndFormat::raw(hash).into();
1486        let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1487        self.unpark(&as_raw);
1488        self.unpark(&as_hash_seq);
1489    }
1490
1491    /// Remove a download from both the main queue and the parked set.
1492    pub fn remove(&mut self, kind: &DownloadKind) -> bool {
1493        self.main.remove(kind) || self.parked.remove(kind)
1494    }
1495}
1496
1497impl DialerT for Dialer {
1498    type Connection = endpoint::Connection;
1499
1500    fn queue_dial(&mut self, node_id: NodeId) {
1501        self.queue_dial(node_id, crate::protocol::ALPN)
1502    }
1503
1504    fn pending_count(&self) -> usize {
1505        self.pending_count()
1506    }
1507
1508    fn is_pending(&self, node: NodeId) -> bool {
1509        self.is_pending(node)
1510    }
1511
1512    fn node_id(&self) -> NodeId {
1513        self.endpoint().node_id()
1514    }
1515}
1516
1517/// Dials nodes and maintains a queue of pending dials.
1518///
1519/// The [`Dialer`] wraps an [`Endpoint`], connects to nodes through the endpoint, stores the
1520/// pending connect futures and emits finished connect results.
1521///
1522/// The [`Dialer`] also implements [`Stream`] to retrieve the dialled connections.
1523#[derive(Debug)]
1524struct Dialer {
1525    endpoint: Endpoint,
1526    pending: JoinSet<(NodeId, anyhow::Result<endpoint::Connection>)>,
1527    pending_dials: HashMap<NodeId, CancellationToken>,
1528}
1529
1530impl Dialer {
1531    /// Create a new dialer for a [`Endpoint`]
1532    fn new(endpoint: Endpoint) -> Self {
1533        Self {
1534            endpoint,
1535            pending: Default::default(),
1536            pending_dials: Default::default(),
1537        }
1538    }
1539
1540    /// Starts to dial a node by [`NodeId`].
1541    fn queue_dial(&mut self, node_id: NodeId, alpn: &'static [u8]) {
1542        if self.is_pending(node_id) {
1543            return;
1544        }
1545        let cancel = CancellationToken::new();
1546        self.pending_dials.insert(node_id, cancel.clone());
1547        let endpoint = self.endpoint.clone();
1548        self.pending.spawn(async move {
1549            let res = tokio::select! {
1550                biased;
1551                _ = cancel.cancelled() => Err(anyhow!("Cancelled")),
1552                res = endpoint.connect(node_id, alpn) => res
1553            };
1554            (node_id, res)
1555        });
1556    }
1557
1558    /// Checks if a node is currently being dialed.
1559    fn is_pending(&self, node: NodeId) -> bool {
1560        self.pending_dials.contains_key(&node)
1561    }
1562
1563    /// Number of pending connections to be opened.
1564    fn pending_count(&self) -> usize {
1565        self.pending_dials.len()
1566    }
1567
1568    /// Returns a reference to the endpoint used in this dialer.
1569    fn endpoint(&self) -> &Endpoint {
1570        &self.endpoint
1571    }
1572}
1573
1574impl Stream for Dialer {
1575    type Item = (NodeId, anyhow::Result<endpoint::Connection>);
1576
1577    fn poll_next(
1578        mut self: Pin<&mut Self>,
1579        cx: &mut std::task::Context<'_>,
1580    ) -> Poll<Option<Self::Item>> {
1581        match self.pending.poll_join_next(cx) {
1582            Poll::Ready(Some(Ok((node_id, result)))) => {
1583                self.pending_dials.remove(&node_id);
1584                Poll::Ready(Some((node_id, result)))
1585            }
1586            Poll::Ready(Some(Err(e))) => {
1587                error!("dialer error: {:?}", e);
1588                Poll::Pending
1589            }
1590            _ => Poll::Pending,
1591        }
1592    }
1593}