iroh_blobs/
downloader.rs

1//! Handle downloading blobs and collections concurrently and from nodes.
2//!
3//! The [`Downloader`] interacts with four main components to this end.
4//! - `ProviderMap`: Where the downloader obtains information about nodes that could be
5//!   used to perform a download.
6//! - [`Store`]: Where data is stored.
7//!
8//! Once a download request is received, the logic is as follows:
9//! 1. The `ProviderMap` is queried for nodes. From these nodes some are selected
10//!    prioritizing connected nodes with lower number of active requests. If no useful node is
11//!    connected, or useful connected nodes have no capacity to perform the request, a connection
12//!    attempt is started using the `DialerT`.
13//! 2. The download is queued for processing at a later time. Downloads are not performed right
14//!    away. Instead, they are initially delayed to allow the node to obtain the data itself, and
15//!    to wait for the new connection to be established if necessary.
16//! 3. Once a request is ready to be sent after a delay (initial or for a retry), the preferred
17//!    node is used if available. The request is now considered active.
18//!
19//! Concurrency is limited in different ways:
20//! - *Total number of active request:* This is a way to prevent a self DoS by overwhelming our own
21//!   bandwidth capacity. This is a best effort heuristic since it doesn't take into account how
22//!   much data we are actually requesting or receiving.
23//! - *Total number of connected nodes:* Peer connections are kept for a longer time than they are
24//!   strictly needed since it's likely they will be useful soon again.
25//! - *Requests per node*: to avoid overwhelming nodes with requests, the number of concurrent
26//!   requests to a single node is also limited.
27
28use std::{
29    collections::{
30        hash_map::{self, Entry},
31        HashMap, HashSet,
32    },
33    fmt,
34    future::Future,
35    num::NonZeroUsize,
36    pin::Pin,
37    sync::{
38        atomic::{AtomicU64, Ordering},
39        Arc,
40    },
41    task::Poll,
42    time::Duration,
43};
44
45use anyhow::anyhow;
46use futures_lite::{future::BoxedLocal, Stream, StreamExt};
47use hashlink::LinkedHashSet;
48use iroh::{endpoint, Endpoint, NodeAddr, NodeId};
49use tokio::{
50    sync::{mpsc, oneshot},
51    task::JoinSet,
52};
53use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
54use tracing::{debug, error, error_span, trace, warn, Instrument};
55
56use crate::{
57    get::{db::DownloadProgress, error::GetError, Stats},
58    metrics::Metrics,
59    store::Store,
60    util::{local_pool::LocalPoolHandle, progress::ProgressSender},
61    BlobFormat, Hash, HashAndFormat,
62};
63
64mod get;
65mod invariants;
66mod progress;
67mod test;
68
69use self::progress::{BroadcastProgressSender, ProgressSubscriber, ProgressTracker};
70
71/// Duration for which we keep nodes connected after they were last useful to us.
72const IDLE_PEER_TIMEOUT: Duration = Duration::from_secs(10);
73/// Capacity of the channel used to communicate between the [`Downloader`] and the [`Service`].
74const SERVICE_CHANNEL_CAPACITY: usize = 128;
75
76/// Identifier for a download intent.
77#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, derive_more::Display)]
78pub struct IntentId(pub u64);
79
80/// Trait modeling a dialer. This allows for IO-less testing.
81trait DialerT: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
82    /// Type of connections returned by the Dialer.
83    type Connection: Clone + 'static;
84    /// Dial a node.
85    fn queue_dial(&mut self, node_id: NodeId);
86    /// Get the number of dialing nodes.
87    fn pending_count(&self) -> usize;
88    /// Check if a node is being dialed.
89    fn is_pending(&self, node: NodeId) -> bool;
90    /// Get the node id of our node.
91    fn node_id(&self) -> NodeId;
92}
93
94/// Signals what should be done with the request when it fails.
95#[derive(Debug)]
96pub enum FailureAction {
97    /// The request was cancelled by us.
98    AllIntentsDropped,
99    /// An error occurred that prevents the request from being retried at all.
100    AbortRequest(GetError),
101    /// An error occurred that suggests the node should not be used in general.
102    DropPeer(anyhow::Error),
103    /// An error occurred in which neither the node nor the request are at fault.
104    RetryLater(anyhow::Error),
105}
106
107/// Future of a get request, for the checking stage.
108type GetStartFut<N> = BoxedLocal<Result<GetOutput<N>, FailureAction>>;
109/// Future of a get request, for the downloading stage.
110type GetProceedFut = BoxedLocal<InternalDownloadResult>;
111
112/// Trait modelling performing a single request over a connection. This allows for IO-less testing.
113pub trait Getter {
114    /// Type of connections the Getter requires to perform a download.
115    type Connection: 'static;
116    /// Type of the intermediary state returned from [`Self::get`] if a connection is needed.
117    type NeedsConn: NeedsConn<Self::Connection>;
118    /// Returns a future that checks the local store if the request is already complete, returning
119    /// a struct implementing [`NeedsConn`] if we need a network connection to proceed.
120    fn get(
121        &mut self,
122        kind: DownloadKind,
123        progress_sender: BroadcastProgressSender,
124    ) -> GetStartFut<Self::NeedsConn>;
125}
126
127/// Trait modelling the intermediary state when a connection is needed to proceed.
128pub trait NeedsConn<C>: std::fmt::Debug + 'static {
129    /// Proceeds the download with the given connection.
130    fn proceed(self, conn: C) -> GetProceedFut;
131}
132
133/// Output returned from [`Getter::get`].
134#[derive(Debug)]
135pub enum GetOutput<N> {
136    /// The request is already complete in the local store.
137    Complete(Stats),
138    /// The request needs a connection to continue.
139    NeedsConn(N),
140}
141
142/// Concurrency limits for the [`Downloader`].
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub struct ConcurrencyLimits {
145    /// Maximum number of requests the service performs concurrently.
146    pub max_concurrent_requests: usize,
147    /// Maximum number of requests performed by a single node concurrently.
148    pub max_concurrent_requests_per_node: usize,
149    /// Maximum number of open connections the service maintains.
150    pub max_open_connections: usize,
151    /// Maximum number of nodes to dial concurrently for a single request.
152    pub max_concurrent_dials_per_hash: usize,
153}
154
155impl Default for ConcurrencyLimits {
156    fn default() -> Self {
157        // these numbers should be checked against a running node and might depend on platform
158        ConcurrencyLimits {
159            max_concurrent_requests: 50,
160            max_concurrent_requests_per_node: 4,
161            max_open_connections: 25,
162            max_concurrent_dials_per_hash: 5,
163        }
164    }
165}
166
167impl ConcurrencyLimits {
168    /// Checks if the maximum number of concurrent requests has been reached.
169    fn at_requests_capacity(&self, active_requests: usize) -> bool {
170        active_requests >= self.max_concurrent_requests
171    }
172
173    /// Checks if the maximum number of concurrent requests per node has been reached.
174    fn node_at_request_capacity(&self, active_node_requests: usize) -> bool {
175        active_node_requests >= self.max_concurrent_requests_per_node
176    }
177
178    /// Checks if the maximum number of connections has been reached.
179    fn at_connections_capacity(&self, active_connections: usize) -> bool {
180        active_connections >= self.max_open_connections
181    }
182
183    /// Checks if the maximum number of concurrent dials per hash has been reached.
184    ///
185    /// Note that this limit is not strictly enforced, and not checked in
186    /// [`Service::check_invariants`]. A certain hash can exceed this limit in a valid way if some
187    /// of its providers are dialed for another hash. However, once the limit is reached,
188    /// no new dials will be initiated for the hash.
189    fn at_dials_per_hash_capacity(&self, concurrent_dials: usize) -> bool {
190        concurrent_dials >= self.max_concurrent_dials_per_hash
191    }
192}
193
194/// Configuration for retry behavior of the [`Downloader`].
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
196pub struct RetryConfig {
197    /// Maximum number of retry attempts for a node that failed to dial or failed with IO errors.
198    pub max_retries_per_node: u32,
199    /// The initial delay to wait before retrying a node. On subsequent failures, the retry delay
200    /// will be multiplied with the number of failed retries.
201    pub initial_retry_delay: Duration,
202}
203
204impl Default for RetryConfig {
205    fn default() -> Self {
206        Self {
207            max_retries_per_node: 6,
208            initial_retry_delay: Duration::from_millis(500),
209        }
210    }
211}
212
213/// A download request.
214#[derive(Debug, Clone)]
215pub struct DownloadRequest {
216    kind: DownloadKind,
217    nodes: Vec<NodeAddr>,
218    progress: Option<ProgressSubscriber>,
219}
220
221impl DownloadRequest {
222    /// Create a new download request.
223    ///
224    /// It is the responsibility of the caller to ensure that the data is tagged either with a
225    /// temp tag or with a persistent tag to make sure the data is not garbage collected during
226    /// the download.
227    ///
228    /// If this is not done, there download will proceed as normal, but there is no guarantee
229    /// that the data is still available when the download is complete.
230    pub fn new(
231        resource: impl Into<DownloadKind>,
232        nodes: impl IntoIterator<Item = impl Into<NodeAddr>>,
233    ) -> Self {
234        Self {
235            kind: resource.into(),
236            nodes: nodes.into_iter().map(|n| n.into()).collect(),
237            progress: None,
238        }
239    }
240
241    /// Pass a progress sender to receive progress updates.
242    pub fn progress_sender(mut self, sender: ProgressSubscriber) -> Self {
243        self.progress = Some(sender);
244        self
245    }
246}
247
248/// The kind of resource to download.
249#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, derive_more::From, derive_more::Into)]
250pub struct DownloadKind(HashAndFormat);
251
252impl DownloadKind {
253    /// Get the hash of this download
254    pub const fn hash(&self) -> Hash {
255        self.0.hash
256    }
257
258    /// Get the format of this download
259    pub const fn format(&self) -> BlobFormat {
260        self.0.format
261    }
262
263    /// Get the [`HashAndFormat`] pair of this download
264    pub const fn hash_and_format(&self) -> HashAndFormat {
265        self.0
266    }
267}
268
269impl fmt::Display for DownloadKind {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        write!(f, "{}:{:?}", self.0.hash.fmt_short(), self.0.format)
272    }
273}
274
275/// The result of a download request, as returned to the application code.
276type ExternalDownloadResult = Result<Stats, DownloadError>;
277
278/// The result of a download request, as used in this module.
279type InternalDownloadResult = Result<Stats, FailureAction>;
280
281/// Error returned when a download could not be completed.
282#[derive(Debug, Clone, thiserror::Error)]
283pub enum DownloadError {
284    /// Failed to download from any provider
285    #[error("Failed to complete download")]
286    DownloadFailed,
287    /// The download was cancelled by us
288    #[error("Download cancelled by us")]
289    Cancelled,
290    /// No provider nodes found
291    #[error("No provider nodes found")]
292    NoProviders,
293    /// Failed to receive response from service.
294    #[error("Failed to receive response from download service")]
295    ActorClosed,
296}
297
298/// Handle to interact with a download request.
299#[derive(Debug)]
300pub struct DownloadHandle {
301    /// Id used to identify the request in the [`Downloader`].
302    id: IntentId,
303    /// Kind of download.
304    kind: DownloadKind,
305    /// Receiver to retrieve the return value of this download.
306    receiver: oneshot::Receiver<ExternalDownloadResult>,
307}
308
309impl Future for DownloadHandle {
310    type Output = ExternalDownloadResult;
311
312    fn poll(
313        mut self: std::pin::Pin<&mut Self>,
314        cx: &mut std::task::Context<'_>,
315    ) -> std::task::Poll<Self::Output> {
316        use std::task::Poll::*;
317        // make it easier on holders of the handle to poll the result, removing the receiver error
318        // from the middle
319        match std::pin::Pin::new(&mut self.receiver).poll(cx) {
320            Ready(Ok(result)) => Ready(result),
321            Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)),
322            Pending => Pending,
323        }
324    }
325}
326
327/// All numerical config options for the downloader.
328#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
329pub struct Config {
330    /// Concurrency limits for the downloader.
331    pub concurrency: ConcurrencyLimits,
332    /// Retry configuration for the downloader.
333    pub retry: RetryConfig,
334}
335
336/// Handle for the download services.
337#[derive(Debug, Clone)]
338pub struct Downloader {
339    inner: Arc<Inner>,
340}
341
342#[derive(Debug)]
343struct Inner {
344    /// Next id to use for a download intent.
345    next_id: AtomicU64,
346    /// Channel to communicate with the service.
347    msg_tx: mpsc::Sender<Message>,
348    /// Configuration for the downloader.
349    config: Arc<Config>,
350    metrics: Arc<Metrics>,
351}
352
353impl Downloader {
354    /// Create a new Downloader with the default [`ConcurrencyLimits`] and [`RetryConfig`].
355    pub fn new<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle) -> Self
356    where
357        S: Store,
358    {
359        Self::with_config(store, endpoint, rt, Default::default())
360    }
361
362    /// Create a new Downloader with custom [`ConcurrencyLimits`] and [`RetryConfig`].
363    pub fn with_config<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle, config: Config) -> Self
364    where
365        S: Store,
366    {
367        let metrics = Arc::new(Metrics::default());
368        let metrics2 = metrics.clone();
369        let me = endpoint.node_id().fmt_short();
370        let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
371        let dialer = Dialer::new(endpoint);
372        let config = Arc::new(config);
373        let config2 = config.clone();
374        let create_future = move || {
375            let getter = get::IoGetter {
376                store: store.clone(),
377            };
378            let service = Service::new(getter, dialer, config2, msg_rx, metrics2);
379            service.run().instrument(error_span!("downloader", %me))
380        };
381        rt.spawn_detached(create_future);
382        Self {
383            inner: Arc::new(Inner {
384                next_id: AtomicU64::new(0),
385                msg_tx,
386                config,
387                metrics,
388            }),
389        }
390    }
391
392    /// Get the current configuration.
393    pub fn config(&self) -> &Config {
394        &self.inner.config
395    }
396
397    /// Queue a download.
398    pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle {
399        let kind = request.kind;
400        let intent_id = IntentId(self.inner.next_id.fetch_add(1, Ordering::SeqCst));
401        let (sender, receiver) = oneshot::channel();
402        let handle = DownloadHandle {
403            id: intent_id,
404            kind,
405            receiver,
406        };
407        let msg = Message::Queue {
408            on_finish: sender,
409            request,
410            intent_id,
411        };
412        // if this fails polling the handle will fail as well since the sender side of the oneshot
413        // will be dropped
414        if let Err(send_err) = self.inner.msg_tx.send(msg).await {
415            let msg = send_err.0;
416            debug!(?msg, "download not sent");
417        }
418        handle
419    }
420
421    /// Cancel a download.
422    // NOTE: receiving the handle ensures an intent can't be cancelled twice
423    pub async fn cancel(&self, handle: DownloadHandle) {
424        let DownloadHandle {
425            id,
426            kind,
427            receiver: _,
428        } = handle;
429        let msg = Message::CancelIntent { id, kind };
430        if let Err(send_err) = self.inner.msg_tx.send(msg).await {
431            let msg = send_err.0;
432            debug!(?msg, "cancel not sent");
433        }
434    }
435
436    /// Declare that certain nodes can be used to download a hash.
437    ///
438    /// Note that this does not start a download, but only provides new nodes to already queued
439    /// downloads. Use [`Self::queue`] to queue a download.
440    pub async fn nodes_have(&mut self, hash: Hash, nodes: Vec<NodeId>) {
441        let msg = Message::NodesHave { hash, nodes };
442        if let Err(send_err) = self.inner.msg_tx.send(msg).await {
443            let msg = send_err.0;
444            debug!(?msg, "nodes have not been sent")
445        }
446    }
447
448    /// Returns the metrics collected for this downloader.
449    pub fn metrics(&self) -> &Arc<Metrics> {
450        &self.inner.metrics
451    }
452}
453
454/// Messages the service can receive.
455#[derive(derive_more::Debug)]
456enum Message {
457    /// Queue a download intent.
458    Queue {
459        request: DownloadRequest,
460        #[debug(skip)]
461        on_finish: oneshot::Sender<ExternalDownloadResult>,
462        intent_id: IntentId,
463    },
464    /// Declare that nodes have a certain hash and can be used for downloading.
465    NodesHave { hash: Hash, nodes: Vec<NodeId> },
466    /// Cancel an intent. The associated request will be cancelled when the last intent is
467    /// cancelled.
468    CancelIntent { id: IntentId, kind: DownloadKind },
469}
470
471#[derive(derive_more::Debug)]
472struct IntentHandlers {
473    #[debug("oneshot::Sender<DownloadResult>")]
474    on_finish: oneshot::Sender<ExternalDownloadResult>,
475    on_progress: Option<ProgressSubscriber>,
476}
477
478/// Information about a request.
479#[derive(Debug)]
480struct RequestInfo<NC> {
481    /// Registered intents with progress senders and result callbacks.
482    intents: HashMap<IntentId, IntentHandlers>,
483    progress_sender: BroadcastProgressSender,
484    get_state: Option<NC>,
485}
486
487/// Information about a request in progress.
488#[derive(derive_more::Debug)]
489struct ActiveRequestInfo {
490    /// Token used to cancel the future doing the request.
491    #[debug(skip)]
492    cancellation: CancellationToken,
493    /// Peer doing this request attempt.
494    node: NodeId,
495}
496
497#[derive(Debug, Default)]
498struct RetryState {
499    /// How many times did we retry this node?
500    retry_count: u32,
501    /// Whether the node is currently queued for retry.
502    retry_is_queued: bool,
503}
504
505/// State of the connection to this node.
506#[derive(derive_more::Debug)]
507struct ConnectionInfo<Conn> {
508    /// Connection to this node.
509    #[debug(skip)]
510    conn: Conn,
511    /// State of this node.
512    state: ConnectedState,
513}
514
515impl<Conn> ConnectionInfo<Conn> {
516    /// Create a new idle node.
517    fn new_idle(connection: Conn, drop_key: delay_queue::Key) -> Self {
518        ConnectionInfo {
519            conn: connection,
520            state: ConnectedState::Idle { drop_key },
521        }
522    }
523
524    /// Count of active requests for the node.
525    fn active_requests(&self) -> usize {
526        match self.state {
527            ConnectedState::Busy { active_requests } => active_requests.get(),
528            ConnectedState::Idle { .. } => 0,
529        }
530    }
531
532    /// Returns `true` if the node is currently idle.
533    fn is_idle(&self) -> bool {
534        matches!(self.state, ConnectedState::Idle { .. })
535    }
536}
537
538/// State of a connected node.
539#[derive(derive_more::Debug)]
540enum ConnectedState {
541    /// Peer is handling at least one request.
542    Busy {
543        #[debug("{}", active_requests.get())]
544        active_requests: NonZeroUsize,
545    },
546    /// Peer is idle.
547    Idle {
548        #[debug(skip)]
549        drop_key: delay_queue::Key,
550    },
551}
552
553#[derive(Debug)]
554enum NodeState<'a, Conn> {
555    Connected(&'a ConnectionInfo<Conn>),
556    Dialing,
557    WaitForRetry,
558    Disconnected,
559}
560
561#[derive(Debug)]
562struct Service<G: Getter, D: DialerT> {
563    /// The getter performs individual requests.
564    getter: G,
565    /// Map to query for nodes that we believe have the data we are looking for.
566    providers: ProviderMap,
567    /// Dialer to get connections for required nodes.
568    dialer: D,
569    /// Limits to concurrent tasks handled by the service.
570    concurrency_limits: ConcurrencyLimits,
571    /// Configuration for retry behavior.
572    retry_config: RetryConfig,
573    /// Channel to receive messages from the service's handle.
574    msg_rx: mpsc::Receiver<Message>,
575    /// Nodes to which we have an active or idle connection.
576    connected_nodes: HashMap<NodeId, ConnectionInfo<D::Connection>>,
577    /// We track a retry state for nodes which failed to dial or in a transfer.
578    retry_node_state: HashMap<NodeId, RetryState>,
579    /// Delay queue for retrying failed nodes.
580    retry_nodes_queue: delay_queue::DelayQueue<NodeId>,
581    /// Delay queue for dropping idle nodes.
582    goodbye_nodes_queue: delay_queue::DelayQueue<NodeId>,
583    /// Queue of pending downloads.
584    queue: Queue,
585    /// Information about pending and active requests.
586    requests: HashMap<DownloadKind, RequestInfo<G::NeedsConn>>,
587    /// State of running downloads.
588    active_requests: HashMap<DownloadKind, ActiveRequestInfo>,
589    /// Tasks for currently running downloads.
590    in_progress_downloads: JoinSet<(DownloadKind, InternalDownloadResult)>,
591    /// Progress tracker
592    progress_tracker: ProgressTracker,
593    metrics: Arc<Metrics>,
594}
595impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
596    fn new(
597        getter: G,
598        dialer: D,
599        config: Arc<Config>,
600        msg_rx: mpsc::Receiver<Message>,
601        metrics: Arc<Metrics>,
602    ) -> Self {
603        Service {
604            getter,
605            dialer,
606            msg_rx,
607            concurrency_limits: config.concurrency,
608            retry_config: config.retry,
609            connected_nodes: Default::default(),
610            retry_node_state: Default::default(),
611            providers: Default::default(),
612            requests: Default::default(),
613            retry_nodes_queue: delay_queue::DelayQueue::default(),
614            goodbye_nodes_queue: delay_queue::DelayQueue::default(),
615            active_requests: Default::default(),
616            in_progress_downloads: Default::default(),
617            progress_tracker: ProgressTracker::new(),
618            queue: Default::default(),
619            metrics,
620        }
621    }
622
623    /// Main loop for the service.
624    async fn run(mut self) {
625        loop {
626            trace!("wait for tick");
627            self.metrics.downloader_tick_main.inc();
628            tokio::select! {
629                Some((node, conn_result)) = self.dialer.next() => {
630                    trace!(node=%node.fmt_short(), "tick: connection ready");
631                    self.metrics.downloader_tick_connection_ready.inc();
632                    self.on_connection_ready(node, conn_result);
633                }
634                maybe_msg = self.msg_rx.recv() => {
635                    trace!(msg=?maybe_msg, "tick: message received");
636                    self.metrics.downloader_tick_message_received.inc();
637                    match maybe_msg {
638                        Some(msg) => self.handle_message(msg).await,
639                        None => return self.shutdown().await,
640                    }
641                }
642                Some(res) = self.in_progress_downloads.join_next(), if !self.in_progress_downloads.is_empty() => {
643                    match res {
644                        Ok((kind, result)) => {
645                            trace!(%kind, "tick: transfer completed");
646                            self::get::track_metrics(&result, &self.metrics);
647                            self.metrics.downloader_tick_transfer_completed.inc();
648                            self.on_download_completed(kind, result);
649                        }
650                        Err(err) => {
651                            warn!(?err, "transfer task panicked");
652                            self.metrics.downloader_tick_transfer_failed.inc();
653                        }
654                    }
655                }
656                Some(expired) = self.retry_nodes_queue.next() => {
657                    let node = expired.into_inner();
658                    trace!(node=%node.fmt_short(), "tick: retry node");
659                    self.metrics.downloader_tick_retry_node.inc();
660                    self.on_retry_wait_elapsed(node);
661                }
662                Some(expired) = self.goodbye_nodes_queue.next() => {
663                    let node = expired.into_inner();
664                    trace!(node=%node.fmt_short(), "tick: goodbye node");
665                    self.metrics.downloader_tick_goodbye_node.inc();
666                    self.disconnect_idle_node(node, "idle expired");
667                }
668            }
669
670            self.process_head();
671
672            #[cfg(any(test, debug_assertions))]
673            self.check_invariants();
674        }
675    }
676
677    /// Handle receiving a [`Message`].
678    // This is called in the actor loop, and only async because subscribing to an existing transfer
679    // sends the initial state.
680    async fn handle_message(&mut self, msg: Message) {
681        match msg {
682            Message::Queue {
683                request,
684                on_finish,
685                intent_id,
686            } => {
687                self.handle_queue_new_download(request, intent_id, on_finish)
688                    .await
689            }
690            Message::CancelIntent { id, kind } => self.handle_cancel_download(id, kind).await,
691            Message::NodesHave { hash, nodes } => {
692                let updated = self
693                    .providers
694                    .add_nodes_if_hash_exists(hash, nodes.iter().cloned());
695                if updated {
696                    self.queue.unpark_hash(hash);
697                }
698            }
699        }
700    }
701
702    /// Handle a [`Message::Queue`].
703    ///
704    /// If this intent maps to a request that already exists, it will be registered with it. If the
705    /// request is new it will be scheduled.
706    async fn handle_queue_new_download(
707        &mut self,
708        request: DownloadRequest,
709        intent_id: IntentId,
710        on_finish: oneshot::Sender<ExternalDownloadResult>,
711    ) {
712        let DownloadRequest {
713            kind,
714            nodes,
715            progress,
716        } = request;
717        debug!(%kind, nodes=?nodes.iter().map(|n| n.node_id.fmt_short()).collect::<Vec<_>>(), "queue intent");
718
719        // store the download intent
720        let intent_handlers = IntentHandlers {
721            on_finish,
722            on_progress: progress,
723        };
724
725        // add the nodes to the provider map
726        // (skip the node id of our own node - we should never attempt to download from ourselves)
727        let node_ids = nodes
728            .iter()
729            .map(|n| n.node_id)
730            .filter(|node_id| *node_id != self.dialer.node_id());
731        let updated = self.providers.add_hash_with_nodes(kind.hash(), node_ids);
732
733        // queue the transfer (if not running) or attach to transfer progress (if already running)
734        match self.requests.entry(kind) {
735            hash_map::Entry::Occupied(mut entry) => {
736                if let Some(on_progress) = &intent_handlers.on_progress {
737                    // this is async because it sends the current state over the progress channel
738                    if let Err(err) = self
739                        .progress_tracker
740                        .subscribe(kind, on_progress.clone())
741                        .await
742                    {
743                        debug!(?err, %kind, "failed to subscribe progress sender to transfer");
744                    }
745                }
746                entry.get_mut().intents.insert(intent_id, intent_handlers);
747            }
748            hash_map::Entry::Vacant(entry) => {
749                let progress_sender = self.progress_tracker.track(
750                    kind,
751                    intent_handlers
752                        .on_progress
753                        .clone()
754                        .into_iter()
755                        .collect::<Vec<_>>(),
756                );
757
758                let get_state = match self.getter.get(kind, progress_sender.clone()).await {
759                    Err(err) => {
760                        // This prints a "FailureAction" which is somewhat weird, but that's all we get here.
761                        tracing::error!(?err, "failed queuing new download");
762                        self.finalize_download(
763                            kind,
764                            [(intent_id, intent_handlers)].into(),
765                            // TODO: add better error variant? this is only triggered if the local
766                            // store failed with local IO.
767                            Err(DownloadError::DownloadFailed),
768                        );
769                        return;
770                    }
771                    Ok(GetOutput::Complete(stats)) => {
772                        self.finalize_download(
773                            kind,
774                            [(intent_id, intent_handlers)].into(),
775                            Ok(stats),
776                        );
777                        return;
778                    }
779                    Ok(GetOutput::NeedsConn(state)) => {
780                        // early exit if no providers.
781                        if self.providers.get_candidates(&kind.hash()).next().is_none() {
782                            self.finalize_download(
783                                kind,
784                                [(intent_id, intent_handlers)].into(),
785                                Err(DownloadError::NoProviders),
786                            );
787                            return;
788                        }
789                        state
790                    }
791                };
792                entry.insert(RequestInfo {
793                    intents: [(intent_id, intent_handlers)].into_iter().collect(),
794                    progress_sender,
795                    get_state: Some(get_state),
796                });
797                self.queue.insert(kind);
798            }
799        }
800
801        if updated && self.queue.is_parked(&kind) {
802            // the transfer is on hold for pending retries, and we added new nodes, so move back to queue.
803            self.queue.unpark(&kind);
804        }
805    }
806
807    /// Cancels a download intent.
808    ///
809    /// This removes the intent from the list of intents for the `kind`. If the removed intent was
810    /// the last one for the `kind`, this means that the download is no longer needed. In this
811    /// case, the `kind` will be removed from the list of pending downloads - and, if the download was
812    /// already started, the download task will be cancelled.
813    ///
814    /// The method is async because it will send a final abort event on the progress sender.
815    async fn handle_cancel_download(&mut self, intent_id: IntentId, kind: DownloadKind) {
816        let Entry::Occupied(mut occupied_entry) = self.requests.entry(kind) else {
817            warn!(%kind, %intent_id, "cancel download called for unknown download");
818            return;
819        };
820
821        let request_info = occupied_entry.get_mut();
822        if let Some(handlers) = request_info.intents.remove(&intent_id) {
823            handlers.on_finish.send(Err(DownloadError::Cancelled)).ok();
824
825            if let Some(sender) = handlers.on_progress {
826                self.progress_tracker.unsubscribe(&kind, &sender);
827                sender
828                    .send(DownloadProgress::Abort(serde_error::Error::new(
829                        &*anyhow::Error::from(DownloadError::Cancelled),
830                    )))
831                    .await
832                    .ok();
833            }
834        }
835
836        if request_info.intents.is_empty() {
837            occupied_entry.remove();
838            if let Entry::Occupied(occupied_entry) = self.active_requests.entry(kind) {
839                occupied_entry.remove().cancellation.cancel();
840            } else {
841                self.queue.remove(&kind);
842            }
843            self.remove_hash_if_not_queued(&kind.hash());
844        }
845    }
846
847    /// Handle receiving a new connection.
848    fn on_connection_ready(&mut self, node: NodeId, result: anyhow::Result<D::Connection>) {
849        debug_assert!(
850            !self.connected_nodes.contains_key(&node),
851            "newly connected node is not yet connected"
852        );
853        match result {
854            Ok(connection) => {
855                trace!(node=%node.fmt_short(), "connected to node");
856                let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
857                self.connected_nodes
858                    .insert(node, ConnectionInfo::new_idle(connection, drop_key));
859            }
860            Err(err) => {
861                debug!(%node, %err, "connection to node failed");
862                self.disconnect_and_retry(node);
863            }
864        }
865    }
866
867    fn on_download_completed(&mut self, kind: DownloadKind, result: InternalDownloadResult) {
868        // first remove the request
869        let active_request_info = self
870            .active_requests
871            .remove(&kind)
872            .expect("request was active");
873
874        // get general request info
875        let request_info = self.requests.remove(&kind).expect("request was active");
876
877        let ActiveRequestInfo { node, .. } = active_request_info;
878
879        // get node info
880        let node_info = self
881            .connected_nodes
882            .get_mut(&node)
883            .expect("node exists in the mapping");
884
885        // update node busy/idle state
886        node_info.state = match NonZeroUsize::new(node_info.active_requests() - 1) {
887            None => {
888                // last request of the node was this one, switch to idle
889                let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
890                ConnectedState::Idle { drop_key }
891            }
892            Some(active_requests) => ConnectedState::Busy { active_requests },
893        };
894
895        match &result {
896            Ok(_) => {
897                debug!(%kind, node=%node.fmt_short(), "download successful");
898                // clear retry state if operation was successful
899                self.retry_node_state.remove(&node);
900            }
901            Err(FailureAction::AllIntentsDropped) => {
902                debug!(%kind, node=%node.fmt_short(), "download cancelled");
903            }
904            Err(FailureAction::AbortRequest(reason)) => {
905                debug!(%kind, node=%node.fmt_short(), %reason, "download failed: abort request");
906                // do not try to download the hash from this node again
907                self.providers.remove_hash_from_node(&kind.hash(), &node);
908            }
909            Err(FailureAction::DropPeer(reason)) => {
910                debug!(%kind, node=%node.fmt_short(), %reason, "download failed: drop node");
911                if node_info.is_idle() {
912                    // remove the node
913                    self.remove_node(node, "explicit drop");
914                } else {
915                    // do not try to download the hash from this node again
916                    self.providers.remove_hash_from_node(&kind.hash(), &node);
917                }
918            }
919            Err(FailureAction::RetryLater(reason)) => {
920                debug!(%kind, node=%node.fmt_short(), %reason, "download failed: retry later");
921                if node_info.is_idle() {
922                    self.disconnect_and_retry(node);
923                }
924            }
925        };
926
927        // we finalize the download if either the download was successful,
928        // or if it should never proceed because all intents were dropped,
929        // or if we don't have any candidates to proceed with anymore.
930        let finalize = match &result {
931            Ok(_) | Err(FailureAction::AllIntentsDropped) => true,
932            _ => !self.providers.has_candidates(&kind.hash()),
933        };
934
935        if finalize {
936            let result = result.map_err(|_| DownloadError::DownloadFailed);
937            self.finalize_download(kind, request_info.intents, result);
938        } else {
939            // reinsert the download at the front of the queue to try from the next node
940            self.requests.insert(kind, request_info);
941            self.queue.insert_front(kind);
942        }
943    }
944
945    /// Finalize a download.
946    ///
947    /// This triggers the intent return channels, and removes the download from the progress tracker
948    /// and provider map.
949    fn finalize_download(
950        &mut self,
951        kind: DownloadKind,
952        intents: HashMap<IntentId, IntentHandlers>,
953        result: ExternalDownloadResult,
954    ) {
955        self.progress_tracker.remove(&kind);
956        self.remove_hash_if_not_queued(&kind.hash());
957        for (_id, handlers) in intents.into_iter() {
958            handlers.on_finish.send(result.clone()).ok();
959        }
960    }
961
962    fn on_retry_wait_elapsed(&mut self, node: NodeId) {
963        // check if the node is still needed
964        let Some(hashes) = self.providers.node_hash.get(&node) else {
965            self.retry_node_state.remove(&node);
966            return;
967        };
968        let Some(state) = self.retry_node_state.get_mut(&node) else {
969            warn!(node=%node.fmt_short(), "missing retry state for node ready for retry");
970            return;
971        };
972        state.retry_is_queued = false;
973        for hash in hashes {
974            self.queue.unpark_hash(*hash);
975        }
976    }
977
978    /// Start the next downloads, or dial nodes, if limits permit and the queue is non-empty.
979    ///
980    /// This is called after all actions. If there is nothing to do, it will return cheaply.
981    /// Otherwise, we will check the next hash in the queue, and:
982    /// * start the transfer if we are connected to a provider and limits are ok
983    /// * or, connect to a provider, if there is one we are not dialing yet and limits are ok
984    /// * or, disconnect an idle node if it would allow us to connect to a provider,
985    /// * or, if all providers are waiting for retry, park the download
986    /// * or, if our limits are reached, do nothing for now
987    ///
988    /// The download requests will only be popped from the queue once we either start the transfer
989    /// from a connected node [`NextStep::StartTransfer`], or if we abort the download on
990    /// [`NextStep::OutOfProviders`]. In all other cases, the request is kept at the top of the
991    /// queue, so the next call to [`Self::process_head`] will evaluate the situation again - and
992    /// so forth, until either [`NextStep::StartTransfer`] or [`NextStep::OutOfProviders`] is
993    /// reached.
994    fn process_head(&mut self) {
995        // start as many queued downloads as allowed by the request limits.
996        loop {
997            let Some(kind) = self.queue.front().cloned() else {
998                break;
999            };
1000
1001            let next_step = self.next_step(&kind);
1002            trace!(%kind, ?next_step, "process_head");
1003
1004            match next_step {
1005                NextStep::Wait => break,
1006                NextStep::StartTransfer(node) => {
1007                    let _ = self.queue.pop_front();
1008                    debug!(%kind, node=%node.fmt_short(), "start transfer");
1009                    self.start_download(kind, node);
1010                }
1011                NextStep::Dial(node) => {
1012                    debug!(%kind, node=%node.fmt_short(), "dial node");
1013                    self.dialer.queue_dial(node);
1014                }
1015                NextStep::DialQueuedDisconnect(node, key) => {
1016                    let idle_node = self.goodbye_nodes_queue.remove(&key).into_inner();
1017                    self.disconnect_idle_node(idle_node, "drop idle for new dial");
1018                    debug!(%kind, node=%node.fmt_short(), idle_node=%idle_node.fmt_short(), "dial node, disconnect idle node)");
1019                    self.dialer.queue_dial(node);
1020                }
1021                NextStep::Park => {
1022                    debug!(%kind, "park download: all providers waiting for retry");
1023                    self.queue.park_front();
1024                }
1025                NextStep::OutOfProviders => {
1026                    debug!(%kind, "abort download: out of providers");
1027                    let _ = self.queue.pop_front();
1028                    let info = self.requests.remove(&kind).expect("queued downloads exist");
1029                    self.finalize_download(kind, info.intents, Err(DownloadError::NoProviders));
1030                }
1031            }
1032        }
1033    }
1034
1035    /// Drop the connection to a node and insert it into the the retry queue.
1036    fn disconnect_and_retry(&mut self, node: NodeId) {
1037        self.disconnect_idle_node(node, "queue retry");
1038        let retry_state = self.retry_node_state.entry(node).or_default();
1039        retry_state.retry_count += 1;
1040        if retry_state.retry_count <= self.retry_config.max_retries_per_node {
1041            // node can be retried
1042            debug!(node=%node.fmt_short(), retry_count=retry_state.retry_count, "queue retry");
1043            let timeout = self.retry_config.initial_retry_delay * retry_state.retry_count;
1044            self.retry_nodes_queue.insert(node, timeout);
1045            retry_state.retry_is_queued = true;
1046        } else {
1047            // node is dead
1048            self.remove_node(node, "retries exceeded");
1049        }
1050    }
1051
1052    /// Calculate the next step needed to proceed the download for `kind`.
1053    ///
1054    /// This is called once `kind` has reached the head of the queue, see [`Self::process_head`].
1055    /// It can be called repeatedly, and does nothing on itself, only calculate what *should* be
1056    /// done next.
1057    ///
1058    /// See [`NextStep`] for details on the potential next steps returned from this method.
1059    fn next_step(&self, kind: &DownloadKind) -> NextStep {
1060        // If the total requests capacity is reached, we have to wait until an active request
1061        // completes.
1062        if self
1063            .concurrency_limits
1064            .at_requests_capacity(self.active_requests.len())
1065        {
1066            return NextStep::Wait;
1067        };
1068
1069        let mut candidates = self.providers.get_candidates(&kind.hash()).peekable();
1070        // If we have no provider candidates for this download, there's nothing else we can do.
1071        if candidates.peek().is_none() {
1072            return NextStep::OutOfProviders;
1073        }
1074
1075        // Track if there is provider node to which we are connected and which is not at its request capacity.
1076        // If there are more than one, take the one with the least amount of running transfers.
1077        let mut best_connected: Option<(NodeId, usize)> = None;
1078        // Track if there is a disconnected provider node to which we can potentially connect.
1079        let mut next_to_dial = None;
1080        // Track the number of provider nodes that are currently being dialed.
1081        let mut currently_dialing = 0;
1082        // Track if we have at least one provider node which is currently at its request capacity.
1083        // If this is the case, we will never return [`NextStep::OutOfProviders`] but [`NextStep::Wait`]
1084        // instead, because we can still try that node once it has finished its work.
1085        let mut has_exhausted_provider = false;
1086        // Track if we have at least one provider node that is currently in the retry queue.
1087        let mut has_retrying_provider = false;
1088
1089        for node in candidates {
1090            match self.node_state(node) {
1091                NodeState::Connected(info) => {
1092                    let active_requests = info.active_requests();
1093                    if self
1094                        .concurrency_limits
1095                        .node_at_request_capacity(active_requests)
1096                    {
1097                        has_exhausted_provider = true;
1098                    } else {
1099                        best_connected = Some(match best_connected.take() {
1100                            Some(old) if old.1 <= active_requests => old,
1101                            _ => (node, active_requests),
1102                        });
1103                    }
1104                }
1105                NodeState::Dialing => {
1106                    currently_dialing += 1;
1107                }
1108                NodeState::WaitForRetry => {
1109                    has_retrying_provider = true;
1110                }
1111                NodeState::Disconnected => {
1112                    if next_to_dial.is_none() {
1113                        next_to_dial = Some(node);
1114                    }
1115                }
1116            }
1117        }
1118
1119        let has_dialing = currently_dialing > 0;
1120
1121        // If we have a connected provider node with free slots, use it!
1122        if let Some((node, _active_requests)) = best_connected {
1123            NextStep::StartTransfer(node)
1124        }
1125        // If we have a node which could be dialed: Check capacity and act accordingly.
1126        else if let Some(node) = next_to_dial {
1127            // We check if the dial capacity for this hash is exceeded: We only start new dials for
1128            // the hash if we are below the limit.
1129            //
1130            // If other requests trigger dials for providers of this hash, the limit may be
1131            // exceeded, but then we just don't start further dials and wait until one completes.
1132            let at_dial_capacity = has_dialing
1133                && self
1134                    .concurrency_limits
1135                    .at_dials_per_hash_capacity(currently_dialing);
1136            // Check if we reached the global connection limit.
1137            let at_connections_capacity = self.at_connections_capacity();
1138
1139            // All slots are free: We can dial our candidate.
1140            if !at_connections_capacity && !at_dial_capacity {
1141                NextStep::Dial(node)
1142            }
1143            // The hash has free dial capacity, but the global connection capacity is reached.
1144            // But if we have idle nodes, we will disconnect the longest idling node, and then dial our
1145            // candidate.
1146            else if at_connections_capacity
1147                && !at_dial_capacity
1148                && !self.goodbye_nodes_queue.is_empty()
1149            {
1150                let key = self.goodbye_nodes_queue.peek().expect("just checked");
1151                NextStep::DialQueuedDisconnect(node, key)
1152            }
1153            // No dial capacity, and no idling nodes: We have to wait until capacity is freed up.
1154            else {
1155                NextStep::Wait
1156            }
1157        }
1158        // If we have pending dials to candidates, or connected candidates which are busy
1159        // with other work: Wait for one of these to become available.
1160        else if has_exhausted_provider || has_dialing {
1161            NextStep::Wait
1162        }
1163        // All providers are in the retry queue: Park this request until they can be tried again.
1164        else if has_retrying_provider {
1165            NextStep::Park
1166        }
1167        // We have no candidates left: Nothing more to do.
1168        else {
1169            NextStep::OutOfProviders
1170        }
1171    }
1172
1173    /// Start downloading from the given node.
1174    ///
1175    /// Panics if hash is not in self.requests or node is not in self.nodes.
1176    fn start_download(&mut self, kind: DownloadKind, node: NodeId) {
1177        let node_info = self.connected_nodes.get_mut(&node).expect("node exists");
1178        let request_info = self.requests.get_mut(&kind).expect("request exists");
1179        let progress = request_info.progress_sender.clone();
1180        // .expect("queued state exists");
1181
1182        // create the active request state
1183        let cancellation = CancellationToken::new();
1184        let state = ActiveRequestInfo {
1185            cancellation: cancellation.clone(),
1186            node,
1187        };
1188        let conn = node_info.conn.clone();
1189
1190        // If this is the first provider node we try, we have an initial state
1191        // from starting the generator in Self::handle_queue_new_download.
1192        // If this not the first provider node we try, we have to recreate the generator, because
1193        // we can only resume it once.
1194        let get_state = match request_info.get_state.take() {
1195            Some(state) => Either::Left(async move { Ok(GetOutput::NeedsConn(state)) }),
1196            None => Either::Right(self.getter.get(kind, progress)),
1197        };
1198        let fut = async move {
1199            // NOTE: it's an open question if we should do timeouts at this point. Considerations from @Frando:
1200            // > at this stage we do not know the size of the download, so the timeout would have
1201            // > to be so large that it won't be useful for non-huge downloads. At the same time,
1202            // > this means that a super slow node would block a download from succeeding for a long
1203            // > time, while faster nodes could be readily available.
1204            // As a conclusion, timeouts should be added only after downloads are known to be bounded
1205            let fut = async move {
1206                match get_state.await? {
1207                    GetOutput::Complete(stats) => Ok(stats),
1208                    GetOutput::NeedsConn(state) => state.proceed(conn).await,
1209                }
1210            };
1211            tokio::pin!(fut);
1212            let res = tokio::select! {
1213                _ = cancellation.cancelled() => Err(FailureAction::AllIntentsDropped),
1214                res = &mut fut => res
1215            };
1216            trace!("transfer finished");
1217
1218            (kind, res)
1219        }
1220        .instrument(error_span!("transfer", %kind, node=%node.fmt_short()));
1221        node_info.state = match &node_info.state {
1222            ConnectedState::Busy { active_requests } => ConnectedState::Busy {
1223                active_requests: active_requests.saturating_add(1),
1224            },
1225            ConnectedState::Idle { drop_key } => {
1226                self.goodbye_nodes_queue.remove(drop_key);
1227                ConnectedState::Busy {
1228                    active_requests: NonZeroUsize::new(1).expect("clearly non zero"),
1229                }
1230            }
1231        };
1232        self.active_requests.insert(kind, state);
1233        self.in_progress_downloads.spawn_local(fut);
1234    }
1235
1236    fn disconnect_idle_node(&mut self, node: NodeId, reason: &'static str) -> bool {
1237        if let Some(info) = self.connected_nodes.remove(&node) {
1238            match info.state {
1239                ConnectedState::Idle { drop_key } => {
1240                    self.goodbye_nodes_queue.try_remove(&drop_key);
1241                    true
1242                }
1243                ConnectedState::Busy { .. } => {
1244                    warn!("expected removed node to be idle, but is busy (removal reason: {reason:?})");
1245                    self.connected_nodes.insert(node, info);
1246                    false
1247                }
1248            }
1249        } else {
1250            true
1251        }
1252    }
1253
1254    fn remove_node(&mut self, node: NodeId, reason: &'static str) {
1255        debug!(node = %node.fmt_short(), %reason, "remove node");
1256        if self.disconnect_idle_node(node, reason) {
1257            self.providers.remove_node(&node);
1258            self.retry_node_state.remove(&node);
1259        }
1260    }
1261
1262    fn node_state(&self, node: NodeId) -> NodeState<'_, D::Connection> {
1263        if let Some(info) = self.connected_nodes.get(&node) {
1264            NodeState::Connected(info)
1265        } else if self.dialer.is_pending(node) {
1266            NodeState::Dialing
1267        } else {
1268            match self.retry_node_state.get(&node) {
1269                Some(state) if state.retry_is_queued => NodeState::WaitForRetry,
1270                _ => NodeState::Disconnected,
1271            }
1272        }
1273    }
1274
1275    /// Check if we have maxed our connection capacity.
1276    fn at_connections_capacity(&self) -> bool {
1277        self.concurrency_limits
1278            .at_connections_capacity(self.connections_count())
1279    }
1280
1281    /// Get the total number of connected and dialing nodes.
1282    fn connections_count(&self) -> usize {
1283        let connected_nodes = self.connected_nodes.values().count();
1284        let dialing_nodes = self.dialer.pending_count();
1285        connected_nodes + dialing_nodes
1286    }
1287
1288    /// Remove a `hash` from the [`ProviderMap`], but only if [`Self::queue`] does not contain the
1289    /// hash at all, even with the other [`BlobFormat`].
1290    fn remove_hash_if_not_queued(&mut self, hash: &Hash) {
1291        if !self.queue.contains_hash(*hash) {
1292            self.providers.remove_hash(hash);
1293        }
1294    }
1295
1296    #[allow(clippy::unused_async)]
1297    async fn shutdown(self) {
1298        debug!("shutting down");
1299        // TODO(@divma): how to make sure the download futures end gracefully?
1300    }
1301}
1302
1303/// The next step needed to continue a download.
1304///
1305/// See [`Service::next_step`] for details.
1306#[derive(Debug)]
1307enum NextStep {
1308    /// Provider connection is ready, initiate the transfer.
1309    StartTransfer(NodeId),
1310    /// Start to dial `NodeId`.
1311    ///
1312    /// This means: We have no non-exhausted connection to a provider node, but a free connection slot
1313    /// and a provider node we are not yet connected to.
1314    Dial(NodeId),
1315    /// Start to dial `NodeId`, but first disconnect the idle node behind [`delay_queue::Key`] in
1316    /// [`Service::goodbye_nodes_queue`] to free up a connection slot.
1317    DialQueuedDisconnect(NodeId, delay_queue::Key),
1318    /// Resource limits are exhausted, do nothing for now and wait until a slot frees up.
1319    Wait,
1320    /// All providers are currently in a retry timeout. Park the download aside, and move
1321    /// to the next download in the queue.
1322    Park,
1323    /// We have tried all available providers. There is nothing else to do.
1324    OutOfProviders,
1325}
1326
1327/// Map of potential providers for a hash.
1328#[derive(Default, Debug)]
1329struct ProviderMap {
1330    hash_node: HashMap<Hash, HashSet<NodeId>>,
1331    node_hash: HashMap<NodeId, HashSet<Hash>>,
1332}
1333
1334impl ProviderMap {
1335    /// Get candidates to download this hash.
1336    pub fn get_candidates<'a>(&'a self, hash: &Hash) -> impl Iterator<Item = NodeId> + 'a {
1337        self.hash_node
1338            .get(hash)
1339            .map(|nodes| nodes.iter())
1340            .into_iter()
1341            .flatten()
1342            .copied()
1343    }
1344
1345    /// Whether we have any candidates to download this hash.
1346    pub fn has_candidates(&self, hash: &Hash) -> bool {
1347        self.hash_node
1348            .get(hash)
1349            .map(|nodes| !nodes.is_empty())
1350            .unwrap_or(false)
1351    }
1352
1353    /// Register nodes for a hash. Should only be done for hashes we care to download.
1354    ///
1355    /// Returns `true` if new providers were added.
1356    fn add_hash_with_nodes(&mut self, hash: Hash, nodes: impl Iterator<Item = NodeId>) -> bool {
1357        let mut updated = false;
1358        let hash_entry = self.hash_node.entry(hash).or_default();
1359        for node in nodes {
1360            updated |= hash_entry.insert(node);
1361            let node_entry = self.node_hash.entry(node).or_default();
1362            node_entry.insert(hash);
1363        }
1364        updated
1365    }
1366
1367    /// Register nodes for a hash, but only if the hash is already in our queue.
1368    ///
1369    /// Returns `true` if a new node was added.
1370    fn add_nodes_if_hash_exists(
1371        &mut self,
1372        hash: Hash,
1373        nodes: impl Iterator<Item = NodeId>,
1374    ) -> bool {
1375        let mut updated = false;
1376        if let Some(hash_entry) = self.hash_node.get_mut(&hash) {
1377            for node in nodes {
1378                updated |= hash_entry.insert(node);
1379                let node_entry = self.node_hash.entry(node).or_default();
1380                node_entry.insert(hash);
1381            }
1382        }
1383        updated
1384    }
1385
1386    /// Signal the registry that this hash is no longer of interest.
1387    fn remove_hash(&mut self, hash: &Hash) {
1388        if let Some(nodes) = self.hash_node.remove(hash) {
1389            for node in nodes {
1390                if let Some(hashes) = self.node_hash.get_mut(&node) {
1391                    hashes.remove(hash);
1392                    if hashes.is_empty() {
1393                        self.node_hash.remove(&node);
1394                    }
1395                }
1396            }
1397        }
1398    }
1399
1400    fn remove_node(&mut self, node: &NodeId) {
1401        if let Some(hashes) = self.node_hash.remove(node) {
1402            for hash in hashes {
1403                if let Some(nodes) = self.hash_node.get_mut(&hash) {
1404                    nodes.remove(node);
1405                    if nodes.is_empty() {
1406                        self.hash_node.remove(&hash);
1407                    }
1408                }
1409            }
1410        }
1411    }
1412
1413    fn remove_hash_from_node(&mut self, hash: &Hash, node: &NodeId) {
1414        if let Some(nodes) = self.hash_node.get_mut(hash) {
1415            nodes.remove(node);
1416            if nodes.is_empty() {
1417                self.remove_hash(hash);
1418            }
1419        }
1420        if let Some(hashes) = self.node_hash.get_mut(node) {
1421            hashes.remove(hash);
1422            if hashes.is_empty() {
1423                self.remove_node(node);
1424            }
1425        }
1426    }
1427}
1428
1429/// The queue of requested downloads.
1430///
1431/// This manages two datastructures:
1432/// * The main queue, a FIFO queue where each item can only appear once.
1433///   New downloads are pushed to the back of the queue, and the next download to process is popped
1434///   from the front.
1435/// * The parked set, a hash set. Items can be moved from the main queue into the parked set.
1436///   Parked items will not be popped unless they are moved back into the main queue.
1437#[derive(Debug, Default)]
1438struct Queue {
1439    main: LinkedHashSet<DownloadKind>,
1440    parked: HashSet<DownloadKind>,
1441}
1442
1443impl Queue {
1444    /// Peek at the front element of the main queue.
1445    pub fn front(&self) -> Option<&DownloadKind> {
1446        self.main.front()
1447    }
1448
1449    #[cfg(any(test, debug_assertions))]
1450    pub fn iter_parked(&self) -> impl Iterator<Item = &DownloadKind> {
1451        self.parked.iter()
1452    }
1453
1454    #[cfg(any(test, debug_assertions))]
1455    pub fn iter(&self) -> impl Iterator<Item = &DownloadKind> {
1456        self.main.iter().chain(self.parked.iter())
1457    }
1458
1459    /// Returns `true` if either the main queue or the parked set contain a download.
1460    pub fn contains(&self, kind: &DownloadKind) -> bool {
1461        self.main.contains(kind) || self.parked.contains(kind)
1462    }
1463
1464    /// Returns `true` if either the main queue or the parked set contain a download for a hash.
1465    pub fn contains_hash(&self, hash: Hash) -> bool {
1466        let as_raw = HashAndFormat::raw(hash).into();
1467        let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1468        self.contains(&as_raw) || self.contains(&as_hash_seq)
1469    }
1470
1471    /// Returns `true` if a download is in the parked set.
1472    pub fn is_parked(&self, kind: &DownloadKind) -> bool {
1473        self.parked.contains(kind)
1474    }
1475
1476    /// Insert an element at the back of the main queue.
1477    pub fn insert(&mut self, kind: DownloadKind) {
1478        if !self.main.contains(&kind) {
1479            self.main.insert(kind);
1480        }
1481    }
1482
1483    /// Insert an element at the front of the main queue.
1484    pub fn insert_front(&mut self, kind: DownloadKind) {
1485        if !self.main.contains(&kind) {
1486            self.main.insert(kind);
1487        }
1488        self.main.to_front(&kind);
1489    }
1490
1491    /// Dequeue the first download of the main queue.
1492    pub fn pop_front(&mut self) -> Option<DownloadKind> {
1493        self.main.pop_front()
1494    }
1495
1496    /// Move the front item of the main queue into the parked set.
1497    pub fn park_front(&mut self) {
1498        if let Some(item) = self.pop_front() {
1499            self.parked.insert(item);
1500        }
1501    }
1502
1503    /// Move a download from the parked set to the front of the main queue.
1504    pub fn unpark(&mut self, kind: &DownloadKind) {
1505        if self.parked.remove(kind) {
1506            self.main.insert(*kind);
1507            self.main.to_front(kind);
1508        }
1509    }
1510
1511    /// Move any download for a hash from the parked set to the main queue.
1512    pub fn unpark_hash(&mut self, hash: Hash) {
1513        let as_raw = HashAndFormat::raw(hash).into();
1514        let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1515        self.unpark(&as_raw);
1516        self.unpark(&as_hash_seq);
1517    }
1518
1519    /// Remove a download from both the main queue and the parked set.
1520    pub fn remove(&mut self, kind: &DownloadKind) -> bool {
1521        self.main.remove(kind) || self.parked.remove(kind)
1522    }
1523}
1524
1525impl DialerT for Dialer {
1526    type Connection = endpoint::Connection;
1527
1528    fn queue_dial(&mut self, node_id: NodeId) {
1529        self.queue_dial(node_id, crate::protocol::ALPN)
1530    }
1531
1532    fn pending_count(&self) -> usize {
1533        self.pending_count()
1534    }
1535
1536    fn is_pending(&self, node: NodeId) -> bool {
1537        self.is_pending(node)
1538    }
1539
1540    fn node_id(&self) -> NodeId {
1541        self.endpoint().node_id()
1542    }
1543}
1544
1545/// Dials nodes and maintains a queue of pending dials.
1546///
1547/// The [`Dialer`] wraps an [`Endpoint`], connects to nodes through the endpoint, stores the
1548/// pending connect futures and emits finished connect results.
1549///
1550/// The [`Dialer`] also implements [`Stream`] to retrieve the dialled connections.
1551#[derive(Debug)]
1552struct Dialer {
1553    endpoint: Endpoint,
1554    pending: JoinSet<(NodeId, anyhow::Result<endpoint::Connection>)>,
1555    pending_dials: HashMap<NodeId, CancellationToken>,
1556}
1557
1558impl Dialer {
1559    /// Create a new dialer for a [`Endpoint`]
1560    fn new(endpoint: Endpoint) -> Self {
1561        Self {
1562            endpoint,
1563            pending: Default::default(),
1564            pending_dials: Default::default(),
1565        }
1566    }
1567
1568    /// Starts to dial a node by [`NodeId`].
1569    fn queue_dial(&mut self, node_id: NodeId, alpn: &'static [u8]) {
1570        if self.is_pending(node_id) {
1571            return;
1572        }
1573        let cancel = CancellationToken::new();
1574        self.pending_dials.insert(node_id, cancel.clone());
1575        let endpoint = self.endpoint.clone();
1576        self.pending.spawn(async move {
1577            let res = tokio::select! {
1578                biased;
1579                _ = cancel.cancelled() => Err(anyhow!("Cancelled")),
1580                res = endpoint.connect(node_id, alpn) => res
1581            };
1582            (node_id, res)
1583        });
1584    }
1585
1586    /// Checks if a node is currently being dialed.
1587    fn is_pending(&self, node: NodeId) -> bool {
1588        self.pending_dials.contains_key(&node)
1589    }
1590
1591    /// Number of pending connections to be opened.
1592    fn pending_count(&self) -> usize {
1593        self.pending_dials.len()
1594    }
1595
1596    /// Returns a reference to the endpoint used in this dialer.
1597    fn endpoint(&self) -> &Endpoint {
1598        &self.endpoint
1599    }
1600}
1601
1602impl Stream for Dialer {
1603    type Item = (NodeId, anyhow::Result<endpoint::Connection>);
1604
1605    fn poll_next(
1606        mut self: Pin<&mut Self>,
1607        cx: &mut std::task::Context<'_>,
1608    ) -> Poll<Option<Self::Item>> {
1609        match self.pending.poll_join_next(cx) {
1610            Poll::Ready(Some(Ok((node_id, result)))) => {
1611                self.pending_dials.remove(&node_id);
1612                Poll::Ready(Some((node_id, result)))
1613            }
1614            Poll::Ready(Some(Err(e))) => {
1615                error!("dialer error: {:?}", e);
1616                Poll::Pending
1617            }
1618            _ => Poll::Pending,
1619        }
1620    }
1621}