1use std::{
29 collections::{
30 hash_map::{self, Entry},
31 HashMap, HashSet,
32 },
33 fmt,
34 future::Future,
35 num::NonZeroUsize,
36 pin::Pin,
37 sync::{
38 atomic::{AtomicU64, Ordering},
39 Arc,
40 },
41 task::Poll,
42 time::Duration,
43};
44
45use anyhow::anyhow;
46use futures_lite::{future::BoxedLocal, Stream, StreamExt};
47use hashlink::LinkedHashSet;
48use iroh::{endpoint, Endpoint, NodeAddr, NodeId};
49use tokio::{
50 sync::{mpsc, oneshot},
51 task::JoinSet,
52};
53use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
54use tracing::{debug, error, error_span, trace, warn, Instrument};
55
56use crate::{
57 get::{db::DownloadProgress, error::GetError, Stats},
58 metrics::Metrics,
59 store::Store,
60 util::{local_pool::LocalPoolHandle, progress::ProgressSender},
61 BlobFormat, Hash, HashAndFormat,
62};
63
64mod get;
65mod invariants;
66mod progress;
67mod test;
68
69use self::progress::{BroadcastProgressSender, ProgressSubscriber, ProgressTracker};
70
71const IDLE_PEER_TIMEOUT: Duration = Duration::from_secs(10);
73const SERVICE_CHANNEL_CAPACITY: usize = 128;
75
76#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, derive_more::Display)]
78pub struct IntentId(pub u64);
79
80trait DialerT: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
82 type Connection: Clone + 'static;
84 fn queue_dial(&mut self, node_id: NodeId);
86 fn pending_count(&self) -> usize;
88 fn is_pending(&self, node: NodeId) -> bool;
90 fn node_id(&self) -> NodeId;
92}
93
94#[derive(Debug)]
96pub enum FailureAction {
97 AllIntentsDropped,
99 AbortRequest(GetError),
101 DropPeer(anyhow::Error),
103 RetryLater(anyhow::Error),
105}
106
107type GetStartFut<N> = BoxedLocal<Result<GetOutput<N>, FailureAction>>;
109type GetProceedFut = BoxedLocal<InternalDownloadResult>;
111
112pub trait Getter {
114 type Connection: 'static;
116 type NeedsConn: NeedsConn<Self::Connection>;
118 fn get(
121 &mut self,
122 kind: DownloadKind,
123 progress_sender: BroadcastProgressSender,
124 ) -> GetStartFut<Self::NeedsConn>;
125}
126
127pub trait NeedsConn<C>: std::fmt::Debug + 'static {
129 fn proceed(self, conn: C) -> GetProceedFut;
131}
132
133#[derive(Debug)]
135pub enum GetOutput<N> {
136 Complete(Stats),
138 NeedsConn(N),
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub struct ConcurrencyLimits {
145 pub max_concurrent_requests: usize,
147 pub max_concurrent_requests_per_node: usize,
149 pub max_open_connections: usize,
151 pub max_concurrent_dials_per_hash: usize,
153}
154
155impl Default for ConcurrencyLimits {
156 fn default() -> Self {
157 ConcurrencyLimits {
159 max_concurrent_requests: 50,
160 max_concurrent_requests_per_node: 4,
161 max_open_connections: 25,
162 max_concurrent_dials_per_hash: 5,
163 }
164 }
165}
166
167impl ConcurrencyLimits {
168 fn at_requests_capacity(&self, active_requests: usize) -> bool {
170 active_requests >= self.max_concurrent_requests
171 }
172
173 fn node_at_request_capacity(&self, active_node_requests: usize) -> bool {
175 active_node_requests >= self.max_concurrent_requests_per_node
176 }
177
178 fn at_connections_capacity(&self, active_connections: usize) -> bool {
180 active_connections >= self.max_open_connections
181 }
182
183 fn at_dials_per_hash_capacity(&self, concurrent_dials: usize) -> bool {
190 concurrent_dials >= self.max_concurrent_dials_per_hash
191 }
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
196pub struct RetryConfig {
197 pub max_retries_per_node: u32,
199 pub initial_retry_delay: Duration,
202}
203
204impl Default for RetryConfig {
205 fn default() -> Self {
206 Self {
207 max_retries_per_node: 6,
208 initial_retry_delay: Duration::from_millis(500),
209 }
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct DownloadRequest {
216 kind: DownloadKind,
217 nodes: Vec<NodeAddr>,
218 progress: Option<ProgressSubscriber>,
219}
220
221impl DownloadRequest {
222 pub fn new(
231 resource: impl Into<DownloadKind>,
232 nodes: impl IntoIterator<Item = impl Into<NodeAddr>>,
233 ) -> Self {
234 Self {
235 kind: resource.into(),
236 nodes: nodes.into_iter().map(|n| n.into()).collect(),
237 progress: None,
238 }
239 }
240
241 pub fn progress_sender(mut self, sender: ProgressSubscriber) -> Self {
243 self.progress = Some(sender);
244 self
245 }
246}
247
248#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, derive_more::From, derive_more::Into)]
250pub struct DownloadKind(HashAndFormat);
251
252impl DownloadKind {
253 pub const fn hash(&self) -> Hash {
255 self.0.hash
256 }
257
258 pub const fn format(&self) -> BlobFormat {
260 self.0.format
261 }
262
263 pub const fn hash_and_format(&self) -> HashAndFormat {
265 self.0
266 }
267}
268
269impl fmt::Display for DownloadKind {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 write!(f, "{}:{:?}", self.0.hash.fmt_short(), self.0.format)
272 }
273}
274
275type ExternalDownloadResult = Result<Stats, DownloadError>;
277
278type InternalDownloadResult = Result<Stats, FailureAction>;
280
281#[derive(Debug, Clone, thiserror::Error)]
283pub enum DownloadError {
284 #[error("Failed to complete download")]
286 DownloadFailed,
287 #[error("Download cancelled by us")]
289 Cancelled,
290 #[error("No provider nodes found")]
292 NoProviders,
293 #[error("Failed to receive response from download service")]
295 ActorClosed,
296}
297
298#[derive(Debug)]
300pub struct DownloadHandle {
301 id: IntentId,
303 kind: DownloadKind,
305 receiver: oneshot::Receiver<ExternalDownloadResult>,
307}
308
309impl Future for DownloadHandle {
310 type Output = ExternalDownloadResult;
311
312 fn poll(
313 mut self: std::pin::Pin<&mut Self>,
314 cx: &mut std::task::Context<'_>,
315 ) -> std::task::Poll<Self::Output> {
316 use std::task::Poll::*;
317 match std::pin::Pin::new(&mut self.receiver).poll(cx) {
320 Ready(Ok(result)) => Ready(result),
321 Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)),
322 Pending => Pending,
323 }
324 }
325}
326
327#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
329pub struct Config {
330 pub concurrency: ConcurrencyLimits,
332 pub retry: RetryConfig,
334}
335
336#[derive(Debug, Clone)]
338pub struct Downloader {
339 inner: Arc<Inner>,
340}
341
342#[derive(Debug)]
343struct Inner {
344 next_id: AtomicU64,
346 msg_tx: mpsc::Sender<Message>,
348 config: Arc<Config>,
350 metrics: Arc<Metrics>,
351}
352
353impl Downloader {
354 pub fn new<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle) -> Self
356 where
357 S: Store,
358 {
359 Self::with_config(store, endpoint, rt, Default::default())
360 }
361
362 pub fn with_config<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle, config: Config) -> Self
364 where
365 S: Store,
366 {
367 let metrics = Arc::new(Metrics::default());
368 let metrics2 = metrics.clone();
369 let me = endpoint.node_id().fmt_short();
370 let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
371 let dialer = Dialer::new(endpoint);
372 let config = Arc::new(config);
373 let config2 = config.clone();
374 let create_future = move || {
375 let getter = get::IoGetter {
376 store: store.clone(),
377 };
378 let service = Service::new(getter, dialer, config2, msg_rx, metrics2);
379 service.run().instrument(error_span!("downloader", %me))
380 };
381 rt.spawn_detached(create_future);
382 Self {
383 inner: Arc::new(Inner {
384 next_id: AtomicU64::new(0),
385 msg_tx,
386 config,
387 metrics,
388 }),
389 }
390 }
391
392 pub fn config(&self) -> &Config {
394 &self.inner.config
395 }
396
397 pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle {
399 let kind = request.kind;
400 let intent_id = IntentId(self.inner.next_id.fetch_add(1, Ordering::SeqCst));
401 let (sender, receiver) = oneshot::channel();
402 let handle = DownloadHandle {
403 id: intent_id,
404 kind,
405 receiver,
406 };
407 let msg = Message::Queue {
408 on_finish: sender,
409 request,
410 intent_id,
411 };
412 if let Err(send_err) = self.inner.msg_tx.send(msg).await {
415 let msg = send_err.0;
416 debug!(?msg, "download not sent");
417 }
418 handle
419 }
420
421 pub async fn cancel(&self, handle: DownloadHandle) {
424 let DownloadHandle {
425 id,
426 kind,
427 receiver: _,
428 } = handle;
429 let msg = Message::CancelIntent { id, kind };
430 if let Err(send_err) = self.inner.msg_tx.send(msg).await {
431 let msg = send_err.0;
432 debug!(?msg, "cancel not sent");
433 }
434 }
435
436 pub async fn nodes_have(&mut self, hash: Hash, nodes: Vec<NodeId>) {
441 let msg = Message::NodesHave { hash, nodes };
442 if let Err(send_err) = self.inner.msg_tx.send(msg).await {
443 let msg = send_err.0;
444 debug!(?msg, "nodes have not been sent")
445 }
446 }
447
448 pub fn metrics(&self) -> &Arc<Metrics> {
450 &self.inner.metrics
451 }
452}
453
454#[derive(derive_more::Debug)]
456enum Message {
457 Queue {
459 request: DownloadRequest,
460 #[debug(skip)]
461 on_finish: oneshot::Sender<ExternalDownloadResult>,
462 intent_id: IntentId,
463 },
464 NodesHave { hash: Hash, nodes: Vec<NodeId> },
466 CancelIntent { id: IntentId, kind: DownloadKind },
469}
470
471#[derive(derive_more::Debug)]
472struct IntentHandlers {
473 #[debug("oneshot::Sender<DownloadResult>")]
474 on_finish: oneshot::Sender<ExternalDownloadResult>,
475 on_progress: Option<ProgressSubscriber>,
476}
477
478#[derive(Debug)]
480struct RequestInfo<NC> {
481 intents: HashMap<IntentId, IntentHandlers>,
483 progress_sender: BroadcastProgressSender,
484 get_state: Option<NC>,
485}
486
487#[derive(derive_more::Debug)]
489struct ActiveRequestInfo {
490 #[debug(skip)]
492 cancellation: CancellationToken,
493 node: NodeId,
495}
496
497#[derive(Debug, Default)]
498struct RetryState {
499 retry_count: u32,
501 retry_is_queued: bool,
503}
504
505#[derive(derive_more::Debug)]
507struct ConnectionInfo<Conn> {
508 #[debug(skip)]
510 conn: Conn,
511 state: ConnectedState,
513}
514
515impl<Conn> ConnectionInfo<Conn> {
516 fn new_idle(connection: Conn, drop_key: delay_queue::Key) -> Self {
518 ConnectionInfo {
519 conn: connection,
520 state: ConnectedState::Idle { drop_key },
521 }
522 }
523
524 fn active_requests(&self) -> usize {
526 match self.state {
527 ConnectedState::Busy { active_requests } => active_requests.get(),
528 ConnectedState::Idle { .. } => 0,
529 }
530 }
531
532 fn is_idle(&self) -> bool {
534 matches!(self.state, ConnectedState::Idle { .. })
535 }
536}
537
538#[derive(derive_more::Debug)]
540enum ConnectedState {
541 Busy {
543 #[debug("{}", active_requests.get())]
544 active_requests: NonZeroUsize,
545 },
546 Idle {
548 #[debug(skip)]
549 drop_key: delay_queue::Key,
550 },
551}
552
553#[derive(Debug)]
554enum NodeState<'a, Conn> {
555 Connected(&'a ConnectionInfo<Conn>),
556 Dialing,
557 WaitForRetry,
558 Disconnected,
559}
560
561#[derive(Debug)]
562struct Service<G: Getter, D: DialerT> {
563 getter: G,
565 providers: ProviderMap,
567 dialer: D,
569 concurrency_limits: ConcurrencyLimits,
571 retry_config: RetryConfig,
573 msg_rx: mpsc::Receiver<Message>,
575 connected_nodes: HashMap<NodeId, ConnectionInfo<D::Connection>>,
577 retry_node_state: HashMap<NodeId, RetryState>,
579 retry_nodes_queue: delay_queue::DelayQueue<NodeId>,
581 goodbye_nodes_queue: delay_queue::DelayQueue<NodeId>,
583 queue: Queue,
585 requests: HashMap<DownloadKind, RequestInfo<G::NeedsConn>>,
587 active_requests: HashMap<DownloadKind, ActiveRequestInfo>,
589 in_progress_downloads: JoinSet<(DownloadKind, InternalDownloadResult)>,
591 progress_tracker: ProgressTracker,
593 metrics: Arc<Metrics>,
594}
595impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
596 fn new(
597 getter: G,
598 dialer: D,
599 config: Arc<Config>,
600 msg_rx: mpsc::Receiver<Message>,
601 metrics: Arc<Metrics>,
602 ) -> Self {
603 Service {
604 getter,
605 dialer,
606 msg_rx,
607 concurrency_limits: config.concurrency,
608 retry_config: config.retry,
609 connected_nodes: Default::default(),
610 retry_node_state: Default::default(),
611 providers: Default::default(),
612 requests: Default::default(),
613 retry_nodes_queue: delay_queue::DelayQueue::default(),
614 goodbye_nodes_queue: delay_queue::DelayQueue::default(),
615 active_requests: Default::default(),
616 in_progress_downloads: Default::default(),
617 progress_tracker: ProgressTracker::new(),
618 queue: Default::default(),
619 metrics,
620 }
621 }
622
623 async fn run(mut self) {
625 loop {
626 trace!("wait for tick");
627 self.metrics.downloader_tick_main.inc();
628 tokio::select! {
629 Some((node, conn_result)) = self.dialer.next() => {
630 trace!(node=%node.fmt_short(), "tick: connection ready");
631 self.metrics.downloader_tick_connection_ready.inc();
632 self.on_connection_ready(node, conn_result);
633 }
634 maybe_msg = self.msg_rx.recv() => {
635 trace!(msg=?maybe_msg, "tick: message received");
636 self.metrics.downloader_tick_message_received.inc();
637 match maybe_msg {
638 Some(msg) => self.handle_message(msg).await,
639 None => return self.shutdown().await,
640 }
641 }
642 Some(res) = self.in_progress_downloads.join_next(), if !self.in_progress_downloads.is_empty() => {
643 match res {
644 Ok((kind, result)) => {
645 trace!(%kind, "tick: transfer completed");
646 self::get::track_metrics(&result, &self.metrics);
647 self.metrics.downloader_tick_transfer_completed.inc();
648 self.on_download_completed(kind, result);
649 }
650 Err(err) => {
651 warn!(?err, "transfer task panicked");
652 self.metrics.downloader_tick_transfer_failed.inc();
653 }
654 }
655 }
656 Some(expired) = self.retry_nodes_queue.next() => {
657 let node = expired.into_inner();
658 trace!(node=%node.fmt_short(), "tick: retry node");
659 self.metrics.downloader_tick_retry_node.inc();
660 self.on_retry_wait_elapsed(node);
661 }
662 Some(expired) = self.goodbye_nodes_queue.next() => {
663 let node = expired.into_inner();
664 trace!(node=%node.fmt_short(), "tick: goodbye node");
665 self.metrics.downloader_tick_goodbye_node.inc();
666 self.disconnect_idle_node(node, "idle expired");
667 }
668 }
669
670 self.process_head();
671
672 #[cfg(any(test, debug_assertions))]
673 self.check_invariants();
674 }
675 }
676
677 async fn handle_message(&mut self, msg: Message) {
681 match msg {
682 Message::Queue {
683 request,
684 on_finish,
685 intent_id,
686 } => {
687 self.handle_queue_new_download(request, intent_id, on_finish)
688 .await
689 }
690 Message::CancelIntent { id, kind } => self.handle_cancel_download(id, kind).await,
691 Message::NodesHave { hash, nodes } => {
692 let updated = self
693 .providers
694 .add_nodes_if_hash_exists(hash, nodes.iter().cloned());
695 if updated {
696 self.queue.unpark_hash(hash);
697 }
698 }
699 }
700 }
701
702 async fn handle_queue_new_download(
707 &mut self,
708 request: DownloadRequest,
709 intent_id: IntentId,
710 on_finish: oneshot::Sender<ExternalDownloadResult>,
711 ) {
712 let DownloadRequest {
713 kind,
714 nodes,
715 progress,
716 } = request;
717 debug!(%kind, nodes=?nodes.iter().map(|n| n.node_id.fmt_short()).collect::<Vec<_>>(), "queue intent");
718
719 let intent_handlers = IntentHandlers {
721 on_finish,
722 on_progress: progress,
723 };
724
725 let node_ids = nodes
728 .iter()
729 .map(|n| n.node_id)
730 .filter(|node_id| *node_id != self.dialer.node_id());
731 let updated = self.providers.add_hash_with_nodes(kind.hash(), node_ids);
732
733 match self.requests.entry(kind) {
735 hash_map::Entry::Occupied(mut entry) => {
736 if let Some(on_progress) = &intent_handlers.on_progress {
737 if let Err(err) = self
739 .progress_tracker
740 .subscribe(kind, on_progress.clone())
741 .await
742 {
743 debug!(?err, %kind, "failed to subscribe progress sender to transfer");
744 }
745 }
746 entry.get_mut().intents.insert(intent_id, intent_handlers);
747 }
748 hash_map::Entry::Vacant(entry) => {
749 let progress_sender = self.progress_tracker.track(
750 kind,
751 intent_handlers
752 .on_progress
753 .clone()
754 .into_iter()
755 .collect::<Vec<_>>(),
756 );
757
758 let get_state = match self.getter.get(kind, progress_sender.clone()).await {
759 Err(err) => {
760 tracing::error!(?err, "failed queuing new download");
762 self.finalize_download(
763 kind,
764 [(intent_id, intent_handlers)].into(),
765 Err(DownloadError::DownloadFailed),
768 );
769 return;
770 }
771 Ok(GetOutput::Complete(stats)) => {
772 self.finalize_download(
773 kind,
774 [(intent_id, intent_handlers)].into(),
775 Ok(stats),
776 );
777 return;
778 }
779 Ok(GetOutput::NeedsConn(state)) => {
780 if self.providers.get_candidates(&kind.hash()).next().is_none() {
782 self.finalize_download(
783 kind,
784 [(intent_id, intent_handlers)].into(),
785 Err(DownloadError::NoProviders),
786 );
787 return;
788 }
789 state
790 }
791 };
792 entry.insert(RequestInfo {
793 intents: [(intent_id, intent_handlers)].into_iter().collect(),
794 progress_sender,
795 get_state: Some(get_state),
796 });
797 self.queue.insert(kind);
798 }
799 }
800
801 if updated && self.queue.is_parked(&kind) {
802 self.queue.unpark(&kind);
804 }
805 }
806
807 async fn handle_cancel_download(&mut self, intent_id: IntentId, kind: DownloadKind) {
816 let Entry::Occupied(mut occupied_entry) = self.requests.entry(kind) else {
817 warn!(%kind, %intent_id, "cancel download called for unknown download");
818 return;
819 };
820
821 let request_info = occupied_entry.get_mut();
822 if let Some(handlers) = request_info.intents.remove(&intent_id) {
823 handlers.on_finish.send(Err(DownloadError::Cancelled)).ok();
824
825 if let Some(sender) = handlers.on_progress {
826 self.progress_tracker.unsubscribe(&kind, &sender);
827 sender
828 .send(DownloadProgress::Abort(serde_error::Error::new(
829 &*anyhow::Error::from(DownloadError::Cancelled),
830 )))
831 .await
832 .ok();
833 }
834 }
835
836 if request_info.intents.is_empty() {
837 occupied_entry.remove();
838 if let Entry::Occupied(occupied_entry) = self.active_requests.entry(kind) {
839 occupied_entry.remove().cancellation.cancel();
840 } else {
841 self.queue.remove(&kind);
842 }
843 self.remove_hash_if_not_queued(&kind.hash());
844 }
845 }
846
847 fn on_connection_ready(&mut self, node: NodeId, result: anyhow::Result<D::Connection>) {
849 debug_assert!(
850 !self.connected_nodes.contains_key(&node),
851 "newly connected node is not yet connected"
852 );
853 match result {
854 Ok(connection) => {
855 trace!(node=%node.fmt_short(), "connected to node");
856 let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
857 self.connected_nodes
858 .insert(node, ConnectionInfo::new_idle(connection, drop_key));
859 }
860 Err(err) => {
861 debug!(%node, %err, "connection to node failed");
862 self.disconnect_and_retry(node);
863 }
864 }
865 }
866
867 fn on_download_completed(&mut self, kind: DownloadKind, result: InternalDownloadResult) {
868 let active_request_info = self
870 .active_requests
871 .remove(&kind)
872 .expect("request was active");
873
874 let request_info = self.requests.remove(&kind).expect("request was active");
876
877 let ActiveRequestInfo { node, .. } = active_request_info;
878
879 let node_info = self
881 .connected_nodes
882 .get_mut(&node)
883 .expect("node exists in the mapping");
884
885 node_info.state = match NonZeroUsize::new(node_info.active_requests() - 1) {
887 None => {
888 let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
890 ConnectedState::Idle { drop_key }
891 }
892 Some(active_requests) => ConnectedState::Busy { active_requests },
893 };
894
895 match &result {
896 Ok(_) => {
897 debug!(%kind, node=%node.fmt_short(), "download successful");
898 self.retry_node_state.remove(&node);
900 }
901 Err(FailureAction::AllIntentsDropped) => {
902 debug!(%kind, node=%node.fmt_short(), "download cancelled");
903 }
904 Err(FailureAction::AbortRequest(reason)) => {
905 debug!(%kind, node=%node.fmt_short(), %reason, "download failed: abort request");
906 self.providers.remove_hash_from_node(&kind.hash(), &node);
908 }
909 Err(FailureAction::DropPeer(reason)) => {
910 debug!(%kind, node=%node.fmt_short(), %reason, "download failed: drop node");
911 if node_info.is_idle() {
912 self.remove_node(node, "explicit drop");
914 } else {
915 self.providers.remove_hash_from_node(&kind.hash(), &node);
917 }
918 }
919 Err(FailureAction::RetryLater(reason)) => {
920 debug!(%kind, node=%node.fmt_short(), %reason, "download failed: retry later");
921 if node_info.is_idle() {
922 self.disconnect_and_retry(node);
923 }
924 }
925 };
926
927 let finalize = match &result {
931 Ok(_) | Err(FailureAction::AllIntentsDropped) => true,
932 _ => !self.providers.has_candidates(&kind.hash()),
933 };
934
935 if finalize {
936 let result = result.map_err(|_| DownloadError::DownloadFailed);
937 self.finalize_download(kind, request_info.intents, result);
938 } else {
939 self.requests.insert(kind, request_info);
941 self.queue.insert_front(kind);
942 }
943 }
944
945 fn finalize_download(
950 &mut self,
951 kind: DownloadKind,
952 intents: HashMap<IntentId, IntentHandlers>,
953 result: ExternalDownloadResult,
954 ) {
955 self.progress_tracker.remove(&kind);
956 self.remove_hash_if_not_queued(&kind.hash());
957 for (_id, handlers) in intents.into_iter() {
958 handlers.on_finish.send(result.clone()).ok();
959 }
960 }
961
962 fn on_retry_wait_elapsed(&mut self, node: NodeId) {
963 let Some(hashes) = self.providers.node_hash.get(&node) else {
965 self.retry_node_state.remove(&node);
966 return;
967 };
968 let Some(state) = self.retry_node_state.get_mut(&node) else {
969 warn!(node=%node.fmt_short(), "missing retry state for node ready for retry");
970 return;
971 };
972 state.retry_is_queued = false;
973 for hash in hashes {
974 self.queue.unpark_hash(*hash);
975 }
976 }
977
978 fn process_head(&mut self) {
995 loop {
997 let Some(kind) = self.queue.front().cloned() else {
998 break;
999 };
1000
1001 let next_step = self.next_step(&kind);
1002 trace!(%kind, ?next_step, "process_head");
1003
1004 match next_step {
1005 NextStep::Wait => break,
1006 NextStep::StartTransfer(node) => {
1007 let _ = self.queue.pop_front();
1008 debug!(%kind, node=%node.fmt_short(), "start transfer");
1009 self.start_download(kind, node);
1010 }
1011 NextStep::Dial(node) => {
1012 debug!(%kind, node=%node.fmt_short(), "dial node");
1013 self.dialer.queue_dial(node);
1014 }
1015 NextStep::DialQueuedDisconnect(node, key) => {
1016 let idle_node = self.goodbye_nodes_queue.remove(&key).into_inner();
1017 self.disconnect_idle_node(idle_node, "drop idle for new dial");
1018 debug!(%kind, node=%node.fmt_short(), idle_node=%idle_node.fmt_short(), "dial node, disconnect idle node)");
1019 self.dialer.queue_dial(node);
1020 }
1021 NextStep::Park => {
1022 debug!(%kind, "park download: all providers waiting for retry");
1023 self.queue.park_front();
1024 }
1025 NextStep::OutOfProviders => {
1026 debug!(%kind, "abort download: out of providers");
1027 let _ = self.queue.pop_front();
1028 let info = self.requests.remove(&kind).expect("queued downloads exist");
1029 self.finalize_download(kind, info.intents, Err(DownloadError::NoProviders));
1030 }
1031 }
1032 }
1033 }
1034
1035 fn disconnect_and_retry(&mut self, node: NodeId) {
1037 self.disconnect_idle_node(node, "queue retry");
1038 let retry_state = self.retry_node_state.entry(node).or_default();
1039 retry_state.retry_count += 1;
1040 if retry_state.retry_count <= self.retry_config.max_retries_per_node {
1041 debug!(node=%node.fmt_short(), retry_count=retry_state.retry_count, "queue retry");
1043 let timeout = self.retry_config.initial_retry_delay * retry_state.retry_count;
1044 self.retry_nodes_queue.insert(node, timeout);
1045 retry_state.retry_is_queued = true;
1046 } else {
1047 self.remove_node(node, "retries exceeded");
1049 }
1050 }
1051
1052 fn next_step(&self, kind: &DownloadKind) -> NextStep {
1060 if self
1063 .concurrency_limits
1064 .at_requests_capacity(self.active_requests.len())
1065 {
1066 return NextStep::Wait;
1067 };
1068
1069 let mut candidates = self.providers.get_candidates(&kind.hash()).peekable();
1070 if candidates.peek().is_none() {
1072 return NextStep::OutOfProviders;
1073 }
1074
1075 let mut best_connected: Option<(NodeId, usize)> = None;
1078 let mut next_to_dial = None;
1080 let mut currently_dialing = 0;
1082 let mut has_exhausted_provider = false;
1086 let mut has_retrying_provider = false;
1088
1089 for node in candidates {
1090 match self.node_state(node) {
1091 NodeState::Connected(info) => {
1092 let active_requests = info.active_requests();
1093 if self
1094 .concurrency_limits
1095 .node_at_request_capacity(active_requests)
1096 {
1097 has_exhausted_provider = true;
1098 } else {
1099 best_connected = Some(match best_connected.take() {
1100 Some(old) if old.1 <= active_requests => old,
1101 _ => (node, active_requests),
1102 });
1103 }
1104 }
1105 NodeState::Dialing => {
1106 currently_dialing += 1;
1107 }
1108 NodeState::WaitForRetry => {
1109 has_retrying_provider = true;
1110 }
1111 NodeState::Disconnected => {
1112 if next_to_dial.is_none() {
1113 next_to_dial = Some(node);
1114 }
1115 }
1116 }
1117 }
1118
1119 let has_dialing = currently_dialing > 0;
1120
1121 if let Some((node, _active_requests)) = best_connected {
1123 NextStep::StartTransfer(node)
1124 }
1125 else if let Some(node) = next_to_dial {
1127 let at_dial_capacity = has_dialing
1133 && self
1134 .concurrency_limits
1135 .at_dials_per_hash_capacity(currently_dialing);
1136 let at_connections_capacity = self.at_connections_capacity();
1138
1139 if !at_connections_capacity && !at_dial_capacity {
1141 NextStep::Dial(node)
1142 }
1143 else if at_connections_capacity
1147 && !at_dial_capacity
1148 && !self.goodbye_nodes_queue.is_empty()
1149 {
1150 let key = self.goodbye_nodes_queue.peek().expect("just checked");
1151 NextStep::DialQueuedDisconnect(node, key)
1152 }
1153 else {
1155 NextStep::Wait
1156 }
1157 }
1158 else if has_exhausted_provider || has_dialing {
1161 NextStep::Wait
1162 }
1163 else if has_retrying_provider {
1165 NextStep::Park
1166 }
1167 else {
1169 NextStep::OutOfProviders
1170 }
1171 }
1172
1173 fn start_download(&mut self, kind: DownloadKind, node: NodeId) {
1177 let node_info = self.connected_nodes.get_mut(&node).expect("node exists");
1178 let request_info = self.requests.get_mut(&kind).expect("request exists");
1179 let progress = request_info.progress_sender.clone();
1180 let cancellation = CancellationToken::new();
1184 let state = ActiveRequestInfo {
1185 cancellation: cancellation.clone(),
1186 node,
1187 };
1188 let conn = node_info.conn.clone();
1189
1190 let get_state = match request_info.get_state.take() {
1195 Some(state) => Either::Left(async move { Ok(GetOutput::NeedsConn(state)) }),
1196 None => Either::Right(self.getter.get(kind, progress)),
1197 };
1198 let fut = async move {
1199 let fut = async move {
1206 match get_state.await? {
1207 GetOutput::Complete(stats) => Ok(stats),
1208 GetOutput::NeedsConn(state) => state.proceed(conn).await,
1209 }
1210 };
1211 tokio::pin!(fut);
1212 let res = tokio::select! {
1213 _ = cancellation.cancelled() => Err(FailureAction::AllIntentsDropped),
1214 res = &mut fut => res
1215 };
1216 trace!("transfer finished");
1217
1218 (kind, res)
1219 }
1220 .instrument(error_span!("transfer", %kind, node=%node.fmt_short()));
1221 node_info.state = match &node_info.state {
1222 ConnectedState::Busy { active_requests } => ConnectedState::Busy {
1223 active_requests: active_requests.saturating_add(1),
1224 },
1225 ConnectedState::Idle { drop_key } => {
1226 self.goodbye_nodes_queue.remove(drop_key);
1227 ConnectedState::Busy {
1228 active_requests: NonZeroUsize::new(1).expect("clearly non zero"),
1229 }
1230 }
1231 };
1232 self.active_requests.insert(kind, state);
1233 self.in_progress_downloads.spawn_local(fut);
1234 }
1235
1236 fn disconnect_idle_node(&mut self, node: NodeId, reason: &'static str) -> bool {
1237 if let Some(info) = self.connected_nodes.remove(&node) {
1238 match info.state {
1239 ConnectedState::Idle { drop_key } => {
1240 self.goodbye_nodes_queue.try_remove(&drop_key);
1241 true
1242 }
1243 ConnectedState::Busy { .. } => {
1244 warn!("expected removed node to be idle, but is busy (removal reason: {reason:?})");
1245 self.connected_nodes.insert(node, info);
1246 false
1247 }
1248 }
1249 } else {
1250 true
1251 }
1252 }
1253
1254 fn remove_node(&mut self, node: NodeId, reason: &'static str) {
1255 debug!(node = %node.fmt_short(), %reason, "remove node");
1256 if self.disconnect_idle_node(node, reason) {
1257 self.providers.remove_node(&node);
1258 self.retry_node_state.remove(&node);
1259 }
1260 }
1261
1262 fn node_state(&self, node: NodeId) -> NodeState<'_, D::Connection> {
1263 if let Some(info) = self.connected_nodes.get(&node) {
1264 NodeState::Connected(info)
1265 } else if self.dialer.is_pending(node) {
1266 NodeState::Dialing
1267 } else {
1268 match self.retry_node_state.get(&node) {
1269 Some(state) if state.retry_is_queued => NodeState::WaitForRetry,
1270 _ => NodeState::Disconnected,
1271 }
1272 }
1273 }
1274
1275 fn at_connections_capacity(&self) -> bool {
1277 self.concurrency_limits
1278 .at_connections_capacity(self.connections_count())
1279 }
1280
1281 fn connections_count(&self) -> usize {
1283 let connected_nodes = self.connected_nodes.values().count();
1284 let dialing_nodes = self.dialer.pending_count();
1285 connected_nodes + dialing_nodes
1286 }
1287
1288 fn remove_hash_if_not_queued(&mut self, hash: &Hash) {
1291 if !self.queue.contains_hash(*hash) {
1292 self.providers.remove_hash(hash);
1293 }
1294 }
1295
1296 #[allow(clippy::unused_async)]
1297 async fn shutdown(self) {
1298 debug!("shutting down");
1299 }
1301}
1302
1303#[derive(Debug)]
1307enum NextStep {
1308 StartTransfer(NodeId),
1310 Dial(NodeId),
1315 DialQueuedDisconnect(NodeId, delay_queue::Key),
1318 Wait,
1320 Park,
1323 OutOfProviders,
1325}
1326
1327#[derive(Default, Debug)]
1329struct ProviderMap {
1330 hash_node: HashMap<Hash, HashSet<NodeId>>,
1331 node_hash: HashMap<NodeId, HashSet<Hash>>,
1332}
1333
1334impl ProviderMap {
1335 pub fn get_candidates<'a>(&'a self, hash: &Hash) -> impl Iterator<Item = NodeId> + 'a {
1337 self.hash_node
1338 .get(hash)
1339 .map(|nodes| nodes.iter())
1340 .into_iter()
1341 .flatten()
1342 .copied()
1343 }
1344
1345 pub fn has_candidates(&self, hash: &Hash) -> bool {
1347 self.hash_node
1348 .get(hash)
1349 .map(|nodes| !nodes.is_empty())
1350 .unwrap_or(false)
1351 }
1352
1353 fn add_hash_with_nodes(&mut self, hash: Hash, nodes: impl Iterator<Item = NodeId>) -> bool {
1357 let mut updated = false;
1358 let hash_entry = self.hash_node.entry(hash).or_default();
1359 for node in nodes {
1360 updated |= hash_entry.insert(node);
1361 let node_entry = self.node_hash.entry(node).or_default();
1362 node_entry.insert(hash);
1363 }
1364 updated
1365 }
1366
1367 fn add_nodes_if_hash_exists(
1371 &mut self,
1372 hash: Hash,
1373 nodes: impl Iterator<Item = NodeId>,
1374 ) -> bool {
1375 let mut updated = false;
1376 if let Some(hash_entry) = self.hash_node.get_mut(&hash) {
1377 for node in nodes {
1378 updated |= hash_entry.insert(node);
1379 let node_entry = self.node_hash.entry(node).or_default();
1380 node_entry.insert(hash);
1381 }
1382 }
1383 updated
1384 }
1385
1386 fn remove_hash(&mut self, hash: &Hash) {
1388 if let Some(nodes) = self.hash_node.remove(hash) {
1389 for node in nodes {
1390 if let Some(hashes) = self.node_hash.get_mut(&node) {
1391 hashes.remove(hash);
1392 if hashes.is_empty() {
1393 self.node_hash.remove(&node);
1394 }
1395 }
1396 }
1397 }
1398 }
1399
1400 fn remove_node(&mut self, node: &NodeId) {
1401 if let Some(hashes) = self.node_hash.remove(node) {
1402 for hash in hashes {
1403 if let Some(nodes) = self.hash_node.get_mut(&hash) {
1404 nodes.remove(node);
1405 if nodes.is_empty() {
1406 self.hash_node.remove(&hash);
1407 }
1408 }
1409 }
1410 }
1411 }
1412
1413 fn remove_hash_from_node(&mut self, hash: &Hash, node: &NodeId) {
1414 if let Some(nodes) = self.hash_node.get_mut(hash) {
1415 nodes.remove(node);
1416 if nodes.is_empty() {
1417 self.remove_hash(hash);
1418 }
1419 }
1420 if let Some(hashes) = self.node_hash.get_mut(node) {
1421 hashes.remove(hash);
1422 if hashes.is_empty() {
1423 self.remove_node(node);
1424 }
1425 }
1426 }
1427}
1428
1429#[derive(Debug, Default)]
1438struct Queue {
1439 main: LinkedHashSet<DownloadKind>,
1440 parked: HashSet<DownloadKind>,
1441}
1442
1443impl Queue {
1444 pub fn front(&self) -> Option<&DownloadKind> {
1446 self.main.front()
1447 }
1448
1449 #[cfg(any(test, debug_assertions))]
1450 pub fn iter_parked(&self) -> impl Iterator<Item = &DownloadKind> {
1451 self.parked.iter()
1452 }
1453
1454 #[cfg(any(test, debug_assertions))]
1455 pub fn iter(&self) -> impl Iterator<Item = &DownloadKind> {
1456 self.main.iter().chain(self.parked.iter())
1457 }
1458
1459 pub fn contains(&self, kind: &DownloadKind) -> bool {
1461 self.main.contains(kind) || self.parked.contains(kind)
1462 }
1463
1464 pub fn contains_hash(&self, hash: Hash) -> bool {
1466 let as_raw = HashAndFormat::raw(hash).into();
1467 let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1468 self.contains(&as_raw) || self.contains(&as_hash_seq)
1469 }
1470
1471 pub fn is_parked(&self, kind: &DownloadKind) -> bool {
1473 self.parked.contains(kind)
1474 }
1475
1476 pub fn insert(&mut self, kind: DownloadKind) {
1478 if !self.main.contains(&kind) {
1479 self.main.insert(kind);
1480 }
1481 }
1482
1483 pub fn insert_front(&mut self, kind: DownloadKind) {
1485 if !self.main.contains(&kind) {
1486 self.main.insert(kind);
1487 }
1488 self.main.to_front(&kind);
1489 }
1490
1491 pub fn pop_front(&mut self) -> Option<DownloadKind> {
1493 self.main.pop_front()
1494 }
1495
1496 pub fn park_front(&mut self) {
1498 if let Some(item) = self.pop_front() {
1499 self.parked.insert(item);
1500 }
1501 }
1502
1503 pub fn unpark(&mut self, kind: &DownloadKind) {
1505 if self.parked.remove(kind) {
1506 self.main.insert(*kind);
1507 self.main.to_front(kind);
1508 }
1509 }
1510
1511 pub fn unpark_hash(&mut self, hash: Hash) {
1513 let as_raw = HashAndFormat::raw(hash).into();
1514 let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1515 self.unpark(&as_raw);
1516 self.unpark(&as_hash_seq);
1517 }
1518
1519 pub fn remove(&mut self, kind: &DownloadKind) -> bool {
1521 self.main.remove(kind) || self.parked.remove(kind)
1522 }
1523}
1524
1525impl DialerT for Dialer {
1526 type Connection = endpoint::Connection;
1527
1528 fn queue_dial(&mut self, node_id: NodeId) {
1529 self.queue_dial(node_id, crate::protocol::ALPN)
1530 }
1531
1532 fn pending_count(&self) -> usize {
1533 self.pending_count()
1534 }
1535
1536 fn is_pending(&self, node: NodeId) -> bool {
1537 self.is_pending(node)
1538 }
1539
1540 fn node_id(&self) -> NodeId {
1541 self.endpoint().node_id()
1542 }
1543}
1544
1545#[derive(Debug)]
1552struct Dialer {
1553 endpoint: Endpoint,
1554 pending: JoinSet<(NodeId, anyhow::Result<endpoint::Connection>)>,
1555 pending_dials: HashMap<NodeId, CancellationToken>,
1556}
1557
1558impl Dialer {
1559 fn new(endpoint: Endpoint) -> Self {
1561 Self {
1562 endpoint,
1563 pending: Default::default(),
1564 pending_dials: Default::default(),
1565 }
1566 }
1567
1568 fn queue_dial(&mut self, node_id: NodeId, alpn: &'static [u8]) {
1570 if self.is_pending(node_id) {
1571 return;
1572 }
1573 let cancel = CancellationToken::new();
1574 self.pending_dials.insert(node_id, cancel.clone());
1575 let endpoint = self.endpoint.clone();
1576 self.pending.spawn(async move {
1577 let res = tokio::select! {
1578 biased;
1579 _ = cancel.cancelled() => Err(anyhow!("Cancelled")),
1580 res = endpoint.connect(node_id, alpn) => res
1581 };
1582 (node_id, res)
1583 });
1584 }
1585
1586 fn is_pending(&self, node: NodeId) -> bool {
1588 self.pending_dials.contains_key(&node)
1589 }
1590
1591 fn pending_count(&self) -> usize {
1593 self.pending_dials.len()
1594 }
1595
1596 fn endpoint(&self) -> &Endpoint {
1598 &self.endpoint
1599 }
1600}
1601
1602impl Stream for Dialer {
1603 type Item = (NodeId, anyhow::Result<endpoint::Connection>);
1604
1605 fn poll_next(
1606 mut self: Pin<&mut Self>,
1607 cx: &mut std::task::Context<'_>,
1608 ) -> Poll<Option<Self::Item>> {
1609 match self.pending.poll_join_next(cx) {
1610 Poll::Ready(Some(Ok((node_id, result)))) => {
1611 self.pending_dials.remove(&node_id);
1612 Poll::Ready(Some((node_id, result)))
1613 }
1614 Poll::Ready(Some(Err(e))) => {
1615 error!("dialer error: {:?}", e);
1616 Poll::Pending
1617 }
1618 _ => Poll::Pending,
1619 }
1620 }
1621}