1use std::{
29 collections::{
30 hash_map::{self, Entry},
31 HashMap, HashSet,
32 },
33 fmt,
34 future::Future,
35 num::NonZeroUsize,
36 pin::Pin,
37 sync::{
38 atomic::{AtomicU64, Ordering},
39 Arc,
40 },
41 task::Poll,
42 time::Duration,
43};
44
45use anyhow::anyhow;
46use futures_lite::{future::BoxedLocal, Stream, StreamExt};
47use hashlink::LinkedHashSet;
48use iroh::{endpoint, Endpoint, NodeAddr, NodeId};
49use iroh_metrics::inc;
50use tokio::{
51 sync::{mpsc, oneshot},
52 task::JoinSet,
53};
54use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
55use tracing::{debug, error, error_span, trace, warn, Instrument};
56
57use crate::{
58 get::{db::DownloadProgress, Stats},
59 metrics::Metrics,
60 store::Store,
61 util::{local_pool::LocalPoolHandle, progress::ProgressSender},
62 BlobFormat, Hash, HashAndFormat,
63};
64
65mod get;
66mod invariants;
67mod progress;
68mod test;
69
70use self::progress::{BroadcastProgressSender, ProgressSubscriber, ProgressTracker};
71
72const IDLE_PEER_TIMEOUT: Duration = Duration::from_secs(10);
74const SERVICE_CHANNEL_CAPACITY: usize = 128;
76
77#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, derive_more::Display)]
79pub struct IntentId(pub u64);
80
81trait DialerT: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
83 type Connection: Clone + 'static;
85 fn queue_dial(&mut self, node_id: NodeId);
87 fn pending_count(&self) -> usize;
89 fn is_pending(&self, node: NodeId) -> bool;
91 fn node_id(&self) -> NodeId;
93}
94
95#[derive(Debug)]
97pub enum FailureAction {
98 AllIntentsDropped,
100 AbortRequest(anyhow::Error),
102 DropPeer(anyhow::Error),
104 RetryLater(anyhow::Error),
106}
107
108type GetStartFut<N> = BoxedLocal<Result<GetOutput<N>, FailureAction>>;
110type GetProceedFut = BoxedLocal<InternalDownloadResult>;
112
113pub trait Getter {
115 type Connection: 'static;
117 type NeedsConn: NeedsConn<Self::Connection>;
119 fn get(
122 &mut self,
123 kind: DownloadKind,
124 progress_sender: BroadcastProgressSender,
125 ) -> GetStartFut<Self::NeedsConn>;
126}
127
128pub trait NeedsConn<C>: std::fmt::Debug + 'static {
130 fn proceed(self, conn: C) -> GetProceedFut;
132}
133
134#[derive(Debug)]
136pub enum GetOutput<N> {
137 Complete(Stats),
139 NeedsConn(N),
141}
142
143#[derive(Debug)]
145pub struct ConcurrencyLimits {
146 pub max_concurrent_requests: usize,
148 pub max_concurrent_requests_per_node: usize,
150 pub max_open_connections: usize,
152 pub max_concurrent_dials_per_hash: usize,
154}
155
156impl Default for ConcurrencyLimits {
157 fn default() -> Self {
158 ConcurrencyLimits {
160 max_concurrent_requests: 50,
161 max_concurrent_requests_per_node: 4,
162 max_open_connections: 25,
163 max_concurrent_dials_per_hash: 5,
164 }
165 }
166}
167
168impl ConcurrencyLimits {
169 fn at_requests_capacity(&self, active_requests: usize) -> bool {
171 active_requests >= self.max_concurrent_requests
172 }
173
174 fn node_at_request_capacity(&self, active_node_requests: usize) -> bool {
176 active_node_requests >= self.max_concurrent_requests_per_node
177 }
178
179 fn at_connections_capacity(&self, active_connections: usize) -> bool {
181 active_connections >= self.max_open_connections
182 }
183
184 fn at_dials_per_hash_capacity(&self, concurrent_dials: usize) -> bool {
191 concurrent_dials >= self.max_concurrent_dials_per_hash
192 }
193}
194
195#[derive(Debug)]
197pub struct RetryConfig {
198 pub max_retries_per_node: u32,
200 pub initial_retry_delay: Duration,
203}
204
205impl Default for RetryConfig {
206 fn default() -> Self {
207 Self {
208 max_retries_per_node: 6,
209 initial_retry_delay: Duration::from_millis(500),
210 }
211 }
212}
213
214#[derive(Debug, Clone)]
216pub struct DownloadRequest {
217 kind: DownloadKind,
218 nodes: Vec<NodeAddr>,
219 progress: Option<ProgressSubscriber>,
220}
221
222impl DownloadRequest {
223 pub fn new(
232 resource: impl Into<DownloadKind>,
233 nodes: impl IntoIterator<Item = impl Into<NodeAddr>>,
234 ) -> Self {
235 Self {
236 kind: resource.into(),
237 nodes: nodes.into_iter().map(|n| n.into()).collect(),
238 progress: None,
239 }
240 }
241
242 pub fn progress_sender(mut self, sender: ProgressSubscriber) -> Self {
244 self.progress = Some(sender);
245 self
246 }
247}
248
249#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, derive_more::From, derive_more::Into)]
251pub struct DownloadKind(HashAndFormat);
252
253impl DownloadKind {
254 pub const fn hash(&self) -> Hash {
256 self.0.hash
257 }
258
259 pub const fn format(&self) -> BlobFormat {
261 self.0.format
262 }
263
264 pub const fn hash_and_format(&self) -> HashAndFormat {
266 self.0
267 }
268}
269
270impl fmt::Display for DownloadKind {
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 write!(f, "{}:{:?}", self.0.hash.fmt_short(), self.0.format)
273 }
274}
275
276type ExternalDownloadResult = Result<Stats, DownloadError>;
278
279type InternalDownloadResult = Result<Stats, FailureAction>;
281
282#[derive(Debug, Clone, thiserror::Error)]
284pub enum DownloadError {
285 #[error("Failed to complete download")]
287 DownloadFailed,
288 #[error("Download cancelled by us")]
290 Cancelled,
291 #[error("No provider nodes found")]
293 NoProviders,
294 #[error("Failed to receive response from download service")]
296 ActorClosed,
297}
298
299#[derive(Debug)]
301pub struct DownloadHandle {
302 id: IntentId,
304 kind: DownloadKind,
306 receiver: oneshot::Receiver<ExternalDownloadResult>,
308}
309
310impl Future for DownloadHandle {
311 type Output = ExternalDownloadResult;
312
313 fn poll(
314 mut self: std::pin::Pin<&mut Self>,
315 cx: &mut std::task::Context<'_>,
316 ) -> std::task::Poll<Self::Output> {
317 use std::task::Poll::*;
318 match std::pin::Pin::new(&mut self.receiver).poll(cx) {
321 Ready(Ok(result)) => Ready(result),
322 Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)),
323 Pending => Pending,
324 }
325 }
326}
327
328#[derive(Clone, Debug)]
330pub struct Downloader {
331 next_id: Arc<AtomicU64>,
333 msg_tx: mpsc::Sender<Message>,
335}
336
337impl Downloader {
338 pub fn new<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle) -> Self
340 where
341 S: Store,
342 {
343 Self::with_config(store, endpoint, rt, Default::default(), Default::default())
344 }
345
346 pub fn with_config<S>(
348 store: S,
349 endpoint: Endpoint,
350 rt: LocalPoolHandle,
351 concurrency_limits: ConcurrencyLimits,
352 retry_config: RetryConfig,
353 ) -> Self
354 where
355 S: Store,
356 {
357 let me = endpoint.node_id().fmt_short();
358 let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
359 let dialer = Dialer::new(endpoint);
360
361 let create_future = move || {
362 let getter = get::IoGetter {
363 store: store.clone(),
364 };
365
366 let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
367
368 service.run().instrument(error_span!("downloader", %me))
369 };
370 rt.spawn_detached(create_future);
371 Self {
372 next_id: Arc::new(AtomicU64::new(0)),
373 msg_tx,
374 }
375 }
376
377 pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle {
379 let kind = request.kind;
380 let intent_id = IntentId(self.next_id.fetch_add(1, Ordering::SeqCst));
381 let (sender, receiver) = oneshot::channel();
382 let handle = DownloadHandle {
383 id: intent_id,
384 kind,
385 receiver,
386 };
387 let msg = Message::Queue {
388 on_finish: sender,
389 request,
390 intent_id,
391 };
392 if let Err(send_err) = self.msg_tx.send(msg).await {
395 let msg = send_err.0;
396 debug!(?msg, "download not sent");
397 }
398 handle
399 }
400
401 pub async fn cancel(&self, handle: DownloadHandle) {
404 let DownloadHandle {
405 id,
406 kind,
407 receiver: _,
408 } = handle;
409 let msg = Message::CancelIntent { id, kind };
410 if let Err(send_err) = self.msg_tx.send(msg).await {
411 let msg = send_err.0;
412 debug!(?msg, "cancel not sent");
413 }
414 }
415
416 pub async fn nodes_have(&mut self, hash: Hash, nodes: Vec<NodeId>) {
421 let msg = Message::NodesHave { hash, nodes };
422 if let Err(send_err) = self.msg_tx.send(msg).await {
423 let msg = send_err.0;
424 debug!(?msg, "nodes have not been sent")
425 }
426 }
427}
428
429#[derive(derive_more::Debug)]
431enum Message {
432 Queue {
434 request: DownloadRequest,
435 #[debug(skip)]
436 on_finish: oneshot::Sender<ExternalDownloadResult>,
437 intent_id: IntentId,
438 },
439 NodesHave { hash: Hash, nodes: Vec<NodeId> },
441 CancelIntent { id: IntentId, kind: DownloadKind },
444}
445
446#[derive(derive_more::Debug)]
447struct IntentHandlers {
448 #[debug("oneshot::Sender<DownloadResult>")]
449 on_finish: oneshot::Sender<ExternalDownloadResult>,
450 on_progress: Option<ProgressSubscriber>,
451}
452
453#[derive(Debug)]
455struct RequestInfo<NC> {
456 intents: HashMap<IntentId, IntentHandlers>,
458 progress_sender: BroadcastProgressSender,
459 get_state: Option<NC>,
460}
461
462#[derive(derive_more::Debug)]
464struct ActiveRequestInfo {
465 #[debug(skip)]
467 cancellation: CancellationToken,
468 node: NodeId,
470}
471
472#[derive(Debug, Default)]
473struct RetryState {
474 retry_count: u32,
476 retry_is_queued: bool,
478}
479
480#[derive(derive_more::Debug)]
482struct ConnectionInfo<Conn> {
483 #[debug(skip)]
485 conn: Conn,
486 state: ConnectedState,
488}
489
490impl<Conn> ConnectionInfo<Conn> {
491 fn new_idle(connection: Conn, drop_key: delay_queue::Key) -> Self {
493 ConnectionInfo {
494 conn: connection,
495 state: ConnectedState::Idle { drop_key },
496 }
497 }
498
499 fn active_requests(&self) -> usize {
501 match self.state {
502 ConnectedState::Busy { active_requests } => active_requests.get(),
503 ConnectedState::Idle { .. } => 0,
504 }
505 }
506
507 fn is_idle(&self) -> bool {
509 matches!(self.state, ConnectedState::Idle { .. })
510 }
511}
512
513#[derive(derive_more::Debug)]
515enum ConnectedState {
516 Busy {
518 #[debug("{}", active_requests.get())]
519 active_requests: NonZeroUsize,
520 },
521 Idle {
523 #[debug(skip)]
524 drop_key: delay_queue::Key,
525 },
526}
527
528#[derive(Debug)]
529enum NodeState<'a, Conn> {
530 Connected(&'a ConnectionInfo<Conn>),
531 Dialing,
532 WaitForRetry,
533 Disconnected,
534}
535
536#[derive(Debug)]
537struct Service<G: Getter, D: DialerT> {
538 getter: G,
540 providers: ProviderMap,
542 dialer: D,
544 concurrency_limits: ConcurrencyLimits,
546 retry_config: RetryConfig,
548 msg_rx: mpsc::Receiver<Message>,
550 connected_nodes: HashMap<NodeId, ConnectionInfo<D::Connection>>,
552 retry_node_state: HashMap<NodeId, RetryState>,
554 retry_nodes_queue: delay_queue::DelayQueue<NodeId>,
556 goodbye_nodes_queue: delay_queue::DelayQueue<NodeId>,
558 queue: Queue,
560 requests: HashMap<DownloadKind, RequestInfo<G::NeedsConn>>,
562 active_requests: HashMap<DownloadKind, ActiveRequestInfo>,
564 in_progress_downloads: JoinSet<(DownloadKind, InternalDownloadResult)>,
566 progress_tracker: ProgressTracker,
568}
569impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
570 fn new(
571 getter: G,
572 dialer: D,
573 concurrency_limits: ConcurrencyLimits,
574 retry_config: RetryConfig,
575 msg_rx: mpsc::Receiver<Message>,
576 ) -> Self {
577 Service {
578 getter,
579 dialer,
580 msg_rx,
581 concurrency_limits,
582 retry_config,
583 connected_nodes: Default::default(),
584 retry_node_state: Default::default(),
585 providers: Default::default(),
586 requests: Default::default(),
587 retry_nodes_queue: delay_queue::DelayQueue::default(),
588 goodbye_nodes_queue: delay_queue::DelayQueue::default(),
589 active_requests: Default::default(),
590 in_progress_downloads: Default::default(),
591 progress_tracker: ProgressTracker::new(),
592 queue: Default::default(),
593 }
594 }
595
596 async fn run(mut self) {
598 loop {
599 trace!("wait for tick");
600 inc!(Metrics, downloader_tick_main);
601 tokio::select! {
602 Some((node, conn_result)) = self.dialer.next() => {
603 trace!(node=%node.fmt_short(), "tick: connection ready");
604 inc!(Metrics, downloader_tick_connection_ready);
605 self.on_connection_ready(node, conn_result);
606 }
607 maybe_msg = self.msg_rx.recv() => {
608 trace!(msg=?maybe_msg, "tick: message received");
609 inc!(Metrics, downloader_tick_message_received);
610 match maybe_msg {
611 Some(msg) => self.handle_message(msg).await,
612 None => return self.shutdown().await,
613 }
614 }
615 Some(res) = self.in_progress_downloads.join_next(), if !self.in_progress_downloads.is_empty() => {
616 match res {
617 Ok((kind, result)) => {
618 trace!(%kind, "tick: transfer completed");
619 inc!(Metrics, downloader_tick_transfer_completed);
620 self.on_download_completed(kind, result);
621 }
622 Err(err) => {
623 warn!(?err, "transfer task panicked");
624 inc!(Metrics, downloader_tick_transfer_failed);
625 }
626 }
627 }
628 Some(expired) = self.retry_nodes_queue.next() => {
629 let node = expired.into_inner();
630 trace!(node=%node.fmt_short(), "tick: retry node");
631 inc!(Metrics, downloader_tick_retry_node);
632 self.on_retry_wait_elapsed(node);
633 }
634 Some(expired) = self.goodbye_nodes_queue.next() => {
635 let node = expired.into_inner();
636 trace!(node=%node.fmt_short(), "tick: goodbye node");
637 inc!(Metrics, downloader_tick_goodbye_node);
638 self.disconnect_idle_node(node, "idle expired");
639 }
640 }
641
642 self.process_head();
643
644 #[cfg(any(test, debug_assertions))]
645 self.check_invariants();
646 }
647 }
648
649 async fn handle_message(&mut self, msg: Message) {
653 match msg {
654 Message::Queue {
655 request,
656 on_finish,
657 intent_id,
658 } => {
659 self.handle_queue_new_download(request, intent_id, on_finish)
660 .await
661 }
662 Message::CancelIntent { id, kind } => self.handle_cancel_download(id, kind).await,
663 Message::NodesHave { hash, nodes } => {
664 let updated = self
665 .providers
666 .add_nodes_if_hash_exists(hash, nodes.iter().cloned());
667 if updated {
668 self.queue.unpark_hash(hash);
669 }
670 }
671 }
672 }
673
674 async fn handle_queue_new_download(
679 &mut self,
680 request: DownloadRequest,
681 intent_id: IntentId,
682 on_finish: oneshot::Sender<ExternalDownloadResult>,
683 ) {
684 let DownloadRequest {
685 kind,
686 nodes,
687 progress,
688 } = request;
689 debug!(%kind, nodes=?nodes.iter().map(|n| n.node_id.fmt_short()).collect::<Vec<_>>(), "queue intent");
690
691 let intent_handlers = IntentHandlers {
693 on_finish,
694 on_progress: progress,
695 };
696
697 let node_ids = nodes
700 .iter()
701 .map(|n| n.node_id)
702 .filter(|node_id| *node_id != self.dialer.node_id());
703 let updated = self.providers.add_hash_with_nodes(kind.hash(), node_ids);
704
705 match self.requests.entry(kind) {
707 hash_map::Entry::Occupied(mut entry) => {
708 if let Some(on_progress) = &intent_handlers.on_progress {
709 if let Err(err) = self
711 .progress_tracker
712 .subscribe(kind, on_progress.clone())
713 .await
714 {
715 debug!(?err, %kind, "failed to subscribe progress sender to transfer");
716 }
717 }
718 entry.get_mut().intents.insert(intent_id, intent_handlers);
719 }
720 hash_map::Entry::Vacant(entry) => {
721 let progress_sender = self.progress_tracker.track(
722 kind,
723 intent_handlers
724 .on_progress
725 .clone()
726 .into_iter()
727 .collect::<Vec<_>>(),
728 );
729
730 let get_state = match self.getter.get(kind, progress_sender.clone()).await {
731 Err(err) => {
732 tracing::error!(?err, "failed queuing new download");
734 self.finalize_download(
735 kind,
736 [(intent_id, intent_handlers)].into(),
737 Err(DownloadError::DownloadFailed),
740 );
741 return;
742 }
743 Ok(GetOutput::Complete(stats)) => {
744 self.finalize_download(
745 kind,
746 [(intent_id, intent_handlers)].into(),
747 Ok(stats),
748 );
749 return;
750 }
751 Ok(GetOutput::NeedsConn(state)) => {
752 if self.providers.get_candidates(&kind.hash()).next().is_none() {
754 self.finalize_download(
755 kind,
756 [(intent_id, intent_handlers)].into(),
757 Err(DownloadError::NoProviders),
758 );
759 return;
760 }
761 state
762 }
763 };
764 entry.insert(RequestInfo {
765 intents: [(intent_id, intent_handlers)].into_iter().collect(),
766 progress_sender,
767 get_state: Some(get_state),
768 });
769 self.queue.insert(kind);
770 }
771 }
772
773 if updated && self.queue.is_parked(&kind) {
774 self.queue.unpark(&kind);
776 }
777 }
778
779 async fn handle_cancel_download(&mut self, intent_id: IntentId, kind: DownloadKind) {
788 let Entry::Occupied(mut occupied_entry) = self.requests.entry(kind) else {
789 warn!(%kind, %intent_id, "cancel download called for unknown download");
790 return;
791 };
792
793 let request_info = occupied_entry.get_mut();
794 if let Some(handlers) = request_info.intents.remove(&intent_id) {
795 handlers.on_finish.send(Err(DownloadError::Cancelled)).ok();
796
797 if let Some(sender) = handlers.on_progress {
798 self.progress_tracker.unsubscribe(&kind, &sender);
799 sender
800 .send(DownloadProgress::Abort(serde_error::Error::new(
801 &*anyhow::Error::from(DownloadError::Cancelled),
802 )))
803 .await
804 .ok();
805 }
806 }
807
808 if request_info.intents.is_empty() {
809 occupied_entry.remove();
810 if let Entry::Occupied(occupied_entry) = self.active_requests.entry(kind) {
811 occupied_entry.remove().cancellation.cancel();
812 } else {
813 self.queue.remove(&kind);
814 }
815 self.remove_hash_if_not_queued(&kind.hash());
816 }
817 }
818
819 fn on_connection_ready(&mut self, node: NodeId, result: anyhow::Result<D::Connection>) {
821 debug_assert!(
822 !self.connected_nodes.contains_key(&node),
823 "newly connected node is not yet connected"
824 );
825 match result {
826 Ok(connection) => {
827 trace!(node=%node.fmt_short(), "connected to node");
828 let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
829 self.connected_nodes
830 .insert(node, ConnectionInfo::new_idle(connection, drop_key));
831 }
832 Err(err) => {
833 debug!(%node, %err, "connection to node failed");
834 self.disconnect_and_retry(node);
835 }
836 }
837 }
838
839 fn on_download_completed(&mut self, kind: DownloadKind, result: InternalDownloadResult) {
840 let active_request_info = self
842 .active_requests
843 .remove(&kind)
844 .expect("request was active");
845
846 let request_info = self.requests.remove(&kind).expect("request was active");
848
849 let ActiveRequestInfo { node, .. } = active_request_info;
850
851 let node_info = self
853 .connected_nodes
854 .get_mut(&node)
855 .expect("node exists in the mapping");
856
857 node_info.state = match NonZeroUsize::new(node_info.active_requests() - 1) {
859 None => {
860 let drop_key = self.goodbye_nodes_queue.insert(node, IDLE_PEER_TIMEOUT);
862 ConnectedState::Idle { drop_key }
863 }
864 Some(active_requests) => ConnectedState::Busy { active_requests },
865 };
866
867 match &result {
868 Ok(_) => {
869 debug!(%kind, node=%node.fmt_short(), "download successful");
870 self.retry_node_state.remove(&node);
872 }
873 Err(FailureAction::AllIntentsDropped) => {
874 debug!(%kind, node=%node.fmt_short(), "download cancelled");
875 }
876 Err(FailureAction::AbortRequest(reason)) => {
877 debug!(%kind, node=%node.fmt_short(), %reason, "download failed: abort request");
878 self.providers.remove_hash_from_node(&kind.hash(), &node);
880 }
881 Err(FailureAction::DropPeer(reason)) => {
882 debug!(%kind, node=%node.fmt_short(), %reason, "download failed: drop node");
883 if node_info.is_idle() {
884 self.remove_node(node, "explicit drop");
886 } else {
887 self.providers.remove_hash_from_node(&kind.hash(), &node);
889 }
890 }
891 Err(FailureAction::RetryLater(reason)) => {
892 debug!(%kind, node=%node.fmt_short(), %reason, "download failed: retry later");
893 if node_info.is_idle() {
894 self.disconnect_and_retry(node);
895 }
896 }
897 };
898
899 let finalize = match &result {
903 Ok(_) | Err(FailureAction::AllIntentsDropped) => true,
904 _ => !self.providers.has_candidates(&kind.hash()),
905 };
906
907 if finalize {
908 let result = result.map_err(|_| DownloadError::DownloadFailed);
909 self.finalize_download(kind, request_info.intents, result);
910 } else {
911 self.requests.insert(kind, request_info);
913 self.queue.insert_front(kind);
914 }
915 }
916
917 fn finalize_download(
922 &mut self,
923 kind: DownloadKind,
924 intents: HashMap<IntentId, IntentHandlers>,
925 result: ExternalDownloadResult,
926 ) {
927 self.progress_tracker.remove(&kind);
928 self.remove_hash_if_not_queued(&kind.hash());
929 for (_id, handlers) in intents.into_iter() {
930 handlers.on_finish.send(result.clone()).ok();
931 }
932 }
933
934 fn on_retry_wait_elapsed(&mut self, node: NodeId) {
935 let Some(hashes) = self.providers.node_hash.get(&node) else {
937 self.retry_node_state.remove(&node);
938 return;
939 };
940 let Some(state) = self.retry_node_state.get_mut(&node) else {
941 warn!(node=%node.fmt_short(), "missing retry state for node ready for retry");
942 return;
943 };
944 state.retry_is_queued = false;
945 for hash in hashes {
946 self.queue.unpark_hash(*hash);
947 }
948 }
949
950 fn process_head(&mut self) {
967 loop {
969 let Some(kind) = self.queue.front().cloned() else {
970 break;
971 };
972
973 let next_step = self.next_step(&kind);
974 trace!(%kind, ?next_step, "process_head");
975
976 match next_step {
977 NextStep::Wait => break,
978 NextStep::StartTransfer(node) => {
979 let _ = self.queue.pop_front();
980 debug!(%kind, node=%node.fmt_short(), "start transfer");
981 self.start_download(kind, node);
982 }
983 NextStep::Dial(node) => {
984 debug!(%kind, node=%node.fmt_short(), "dial node");
985 self.dialer.queue_dial(node);
986 }
987 NextStep::DialQueuedDisconnect(node, key) => {
988 let idle_node = self.goodbye_nodes_queue.remove(&key).into_inner();
989 self.disconnect_idle_node(idle_node, "drop idle for new dial");
990 debug!(%kind, node=%node.fmt_short(), idle_node=%idle_node.fmt_short(), "dial node, disconnect idle node)");
991 self.dialer.queue_dial(node);
992 }
993 NextStep::Park => {
994 debug!(%kind, "park download: all providers waiting for retry");
995 self.queue.park_front();
996 }
997 NextStep::OutOfProviders => {
998 debug!(%kind, "abort download: out of providers");
999 let _ = self.queue.pop_front();
1000 let info = self.requests.remove(&kind).expect("queued downloads exist");
1001 self.finalize_download(kind, info.intents, Err(DownloadError::NoProviders));
1002 }
1003 }
1004 }
1005 }
1006
1007 fn disconnect_and_retry(&mut self, node: NodeId) {
1009 self.disconnect_idle_node(node, "queue retry");
1010 let retry_state = self.retry_node_state.entry(node).or_default();
1011 retry_state.retry_count += 1;
1012 if retry_state.retry_count <= self.retry_config.max_retries_per_node {
1013 debug!(node=%node.fmt_short(), retry_count=retry_state.retry_count, "queue retry");
1015 let timeout = self.retry_config.initial_retry_delay * retry_state.retry_count;
1016 self.retry_nodes_queue.insert(node, timeout);
1017 retry_state.retry_is_queued = true;
1018 } else {
1019 self.remove_node(node, "retries exceeded");
1021 }
1022 }
1023
1024 fn next_step(&self, kind: &DownloadKind) -> NextStep {
1032 if self
1035 .concurrency_limits
1036 .at_requests_capacity(self.active_requests.len())
1037 {
1038 return NextStep::Wait;
1039 };
1040
1041 let mut candidates = self.providers.get_candidates(&kind.hash()).peekable();
1042 if candidates.peek().is_none() {
1044 return NextStep::OutOfProviders;
1045 }
1046
1047 let mut best_connected: Option<(NodeId, usize)> = None;
1050 let mut next_to_dial = None;
1052 let mut currently_dialing = 0;
1054 let mut has_exhausted_provider = false;
1058 let mut has_retrying_provider = false;
1060
1061 for node in candidates {
1062 match self.node_state(node) {
1063 NodeState::Connected(info) => {
1064 let active_requests = info.active_requests();
1065 if self
1066 .concurrency_limits
1067 .node_at_request_capacity(active_requests)
1068 {
1069 has_exhausted_provider = true;
1070 } else {
1071 best_connected = Some(match best_connected.take() {
1072 Some(old) if old.1 <= active_requests => old,
1073 _ => (node, active_requests),
1074 });
1075 }
1076 }
1077 NodeState::Dialing => {
1078 currently_dialing += 1;
1079 }
1080 NodeState::WaitForRetry => {
1081 has_retrying_provider = true;
1082 }
1083 NodeState::Disconnected => {
1084 if next_to_dial.is_none() {
1085 next_to_dial = Some(node);
1086 }
1087 }
1088 }
1089 }
1090
1091 let has_dialing = currently_dialing > 0;
1092
1093 if let Some((node, _active_requests)) = best_connected {
1095 NextStep::StartTransfer(node)
1096 }
1097 else if let Some(node) = next_to_dial {
1099 let at_dial_capacity = has_dialing
1105 && self
1106 .concurrency_limits
1107 .at_dials_per_hash_capacity(currently_dialing);
1108 let at_connections_capacity = self.at_connections_capacity();
1110
1111 if !at_connections_capacity && !at_dial_capacity {
1113 NextStep::Dial(node)
1114 }
1115 else if at_connections_capacity
1119 && !at_dial_capacity
1120 && !self.goodbye_nodes_queue.is_empty()
1121 {
1122 let key = self.goodbye_nodes_queue.peek().expect("just checked");
1123 NextStep::DialQueuedDisconnect(node, key)
1124 }
1125 else {
1127 NextStep::Wait
1128 }
1129 }
1130 else if has_exhausted_provider || has_dialing {
1133 NextStep::Wait
1134 }
1135 else if has_retrying_provider {
1137 NextStep::Park
1138 }
1139 else {
1141 NextStep::OutOfProviders
1142 }
1143 }
1144
1145 fn start_download(&mut self, kind: DownloadKind, node: NodeId) {
1149 let node_info = self.connected_nodes.get_mut(&node).expect("node exists");
1150 let request_info = self.requests.get_mut(&kind).expect("request exists");
1151 let progress = request_info.progress_sender.clone();
1152 let cancellation = CancellationToken::new();
1156 let state = ActiveRequestInfo {
1157 cancellation: cancellation.clone(),
1158 node,
1159 };
1160 let conn = node_info.conn.clone();
1161
1162 let get_state = match request_info.get_state.take() {
1167 Some(state) => Either::Left(async move { Ok(GetOutput::NeedsConn(state)) }),
1168 None => Either::Right(self.getter.get(kind, progress)),
1169 };
1170 let fut = async move {
1171 let fut = async move {
1178 match get_state.await? {
1179 GetOutput::Complete(stats) => Ok(stats),
1180 GetOutput::NeedsConn(state) => state.proceed(conn).await,
1181 }
1182 };
1183 tokio::pin!(fut);
1184 let res = tokio::select! {
1185 _ = cancellation.cancelled() => Err(FailureAction::AllIntentsDropped),
1186 res = &mut fut => res
1187 };
1188 trace!("transfer finished");
1189
1190 (kind, res)
1191 }
1192 .instrument(error_span!("transfer", %kind, node=%node.fmt_short()));
1193 node_info.state = match &node_info.state {
1194 ConnectedState::Busy { active_requests } => ConnectedState::Busy {
1195 active_requests: active_requests.saturating_add(1),
1196 },
1197 ConnectedState::Idle { drop_key } => {
1198 self.goodbye_nodes_queue.remove(drop_key);
1199 ConnectedState::Busy {
1200 active_requests: NonZeroUsize::new(1).expect("clearly non zero"),
1201 }
1202 }
1203 };
1204 self.active_requests.insert(kind, state);
1205 self.in_progress_downloads.spawn_local(fut);
1206 }
1207
1208 fn disconnect_idle_node(&mut self, node: NodeId, reason: &'static str) -> bool {
1209 if let Some(info) = self.connected_nodes.remove(&node) {
1210 match info.state {
1211 ConnectedState::Idle { drop_key } => {
1212 self.goodbye_nodes_queue.try_remove(&drop_key);
1213 true
1214 }
1215 ConnectedState::Busy { .. } => {
1216 warn!("expected removed node to be idle, but is busy (removal reason: {reason:?})");
1217 self.connected_nodes.insert(node, info);
1218 false
1219 }
1220 }
1221 } else {
1222 true
1223 }
1224 }
1225
1226 fn remove_node(&mut self, node: NodeId, reason: &'static str) {
1227 debug!(node = %node.fmt_short(), %reason, "remove node");
1228 if self.disconnect_idle_node(node, reason) {
1229 self.providers.remove_node(&node);
1230 self.retry_node_state.remove(&node);
1231 }
1232 }
1233
1234 fn node_state(&self, node: NodeId) -> NodeState<'_, D::Connection> {
1235 if let Some(info) = self.connected_nodes.get(&node) {
1236 NodeState::Connected(info)
1237 } else if self.dialer.is_pending(node) {
1238 NodeState::Dialing
1239 } else {
1240 match self.retry_node_state.get(&node) {
1241 Some(state) if state.retry_is_queued => NodeState::WaitForRetry,
1242 _ => NodeState::Disconnected,
1243 }
1244 }
1245 }
1246
1247 fn at_connections_capacity(&self) -> bool {
1249 self.concurrency_limits
1250 .at_connections_capacity(self.connections_count())
1251 }
1252
1253 fn connections_count(&self) -> usize {
1255 let connected_nodes = self.connected_nodes.values().count();
1256 let dialing_nodes = self.dialer.pending_count();
1257 connected_nodes + dialing_nodes
1258 }
1259
1260 fn remove_hash_if_not_queued(&mut self, hash: &Hash) {
1263 if !self.queue.contains_hash(*hash) {
1264 self.providers.remove_hash(hash);
1265 }
1266 }
1267
1268 #[allow(clippy::unused_async)]
1269 async fn shutdown(self) {
1270 debug!("shutting down");
1271 }
1273}
1274
1275#[derive(Debug)]
1279enum NextStep {
1280 StartTransfer(NodeId),
1282 Dial(NodeId),
1287 DialQueuedDisconnect(NodeId, delay_queue::Key),
1290 Wait,
1292 Park,
1295 OutOfProviders,
1297}
1298
1299#[derive(Default, Debug)]
1301struct ProviderMap {
1302 hash_node: HashMap<Hash, HashSet<NodeId>>,
1303 node_hash: HashMap<NodeId, HashSet<Hash>>,
1304}
1305
1306impl ProviderMap {
1307 pub fn get_candidates<'a>(&'a self, hash: &Hash) -> impl Iterator<Item = NodeId> + 'a {
1309 self.hash_node
1310 .get(hash)
1311 .map(|nodes| nodes.iter())
1312 .into_iter()
1313 .flatten()
1314 .copied()
1315 }
1316
1317 pub fn has_candidates(&self, hash: &Hash) -> bool {
1319 self.hash_node
1320 .get(hash)
1321 .map(|nodes| !nodes.is_empty())
1322 .unwrap_or(false)
1323 }
1324
1325 fn add_hash_with_nodes(&mut self, hash: Hash, nodes: impl Iterator<Item = NodeId>) -> bool {
1329 let mut updated = false;
1330 let hash_entry = self.hash_node.entry(hash).or_default();
1331 for node in nodes {
1332 updated |= hash_entry.insert(node);
1333 let node_entry = self.node_hash.entry(node).or_default();
1334 node_entry.insert(hash);
1335 }
1336 updated
1337 }
1338
1339 fn add_nodes_if_hash_exists(
1343 &mut self,
1344 hash: Hash,
1345 nodes: impl Iterator<Item = NodeId>,
1346 ) -> bool {
1347 let mut updated = false;
1348 if let Some(hash_entry) = self.hash_node.get_mut(&hash) {
1349 for node in nodes {
1350 updated |= hash_entry.insert(node);
1351 let node_entry = self.node_hash.entry(node).or_default();
1352 node_entry.insert(hash);
1353 }
1354 }
1355 updated
1356 }
1357
1358 fn remove_hash(&mut self, hash: &Hash) {
1360 if let Some(nodes) = self.hash_node.remove(hash) {
1361 for node in nodes {
1362 if let Some(hashes) = self.node_hash.get_mut(&node) {
1363 hashes.remove(hash);
1364 if hashes.is_empty() {
1365 self.node_hash.remove(&node);
1366 }
1367 }
1368 }
1369 }
1370 }
1371
1372 fn remove_node(&mut self, node: &NodeId) {
1373 if let Some(hashes) = self.node_hash.remove(node) {
1374 for hash in hashes {
1375 if let Some(nodes) = self.hash_node.get_mut(&hash) {
1376 nodes.remove(node);
1377 if nodes.is_empty() {
1378 self.hash_node.remove(&hash);
1379 }
1380 }
1381 }
1382 }
1383 }
1384
1385 fn remove_hash_from_node(&mut self, hash: &Hash, node: &NodeId) {
1386 if let Some(nodes) = self.hash_node.get_mut(hash) {
1387 nodes.remove(node);
1388 if nodes.is_empty() {
1389 self.remove_hash(hash);
1390 }
1391 }
1392 if let Some(hashes) = self.node_hash.get_mut(node) {
1393 hashes.remove(hash);
1394 if hashes.is_empty() {
1395 self.remove_node(node);
1396 }
1397 }
1398 }
1399}
1400
1401#[derive(Debug, Default)]
1410struct Queue {
1411 main: LinkedHashSet<DownloadKind>,
1412 parked: HashSet<DownloadKind>,
1413}
1414
1415impl Queue {
1416 pub fn front(&self) -> Option<&DownloadKind> {
1418 self.main.front()
1419 }
1420
1421 #[cfg(any(test, debug_assertions))]
1422 pub fn iter_parked(&self) -> impl Iterator<Item = &DownloadKind> {
1423 self.parked.iter()
1424 }
1425
1426 #[cfg(any(test, debug_assertions))]
1427 pub fn iter(&self) -> impl Iterator<Item = &DownloadKind> {
1428 self.main.iter().chain(self.parked.iter())
1429 }
1430
1431 pub fn contains(&self, kind: &DownloadKind) -> bool {
1433 self.main.contains(kind) || self.parked.contains(kind)
1434 }
1435
1436 pub fn contains_hash(&self, hash: Hash) -> bool {
1438 let as_raw = HashAndFormat::raw(hash).into();
1439 let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1440 self.contains(&as_raw) || self.contains(&as_hash_seq)
1441 }
1442
1443 pub fn is_parked(&self, kind: &DownloadKind) -> bool {
1445 self.parked.contains(kind)
1446 }
1447
1448 pub fn insert(&mut self, kind: DownloadKind) {
1450 if !self.main.contains(&kind) {
1451 self.main.insert(kind);
1452 }
1453 }
1454
1455 pub fn insert_front(&mut self, kind: DownloadKind) {
1457 if !self.main.contains(&kind) {
1458 self.main.insert(kind);
1459 }
1460 self.main.to_front(&kind);
1461 }
1462
1463 pub fn pop_front(&mut self) -> Option<DownloadKind> {
1465 self.main.pop_front()
1466 }
1467
1468 pub fn park_front(&mut self) {
1470 if let Some(item) = self.pop_front() {
1471 self.parked.insert(item);
1472 }
1473 }
1474
1475 pub fn unpark(&mut self, kind: &DownloadKind) {
1477 if self.parked.remove(kind) {
1478 self.main.insert(*kind);
1479 self.main.to_front(kind);
1480 }
1481 }
1482
1483 pub fn unpark_hash(&mut self, hash: Hash) {
1485 let as_raw = HashAndFormat::raw(hash).into();
1486 let as_hash_seq = HashAndFormat::hash_seq(hash).into();
1487 self.unpark(&as_raw);
1488 self.unpark(&as_hash_seq);
1489 }
1490
1491 pub fn remove(&mut self, kind: &DownloadKind) -> bool {
1493 self.main.remove(kind) || self.parked.remove(kind)
1494 }
1495}
1496
1497impl DialerT for Dialer {
1498 type Connection = endpoint::Connection;
1499
1500 fn queue_dial(&mut self, node_id: NodeId) {
1501 self.queue_dial(node_id, crate::protocol::ALPN)
1502 }
1503
1504 fn pending_count(&self) -> usize {
1505 self.pending_count()
1506 }
1507
1508 fn is_pending(&self, node: NodeId) -> bool {
1509 self.is_pending(node)
1510 }
1511
1512 fn node_id(&self) -> NodeId {
1513 self.endpoint().node_id()
1514 }
1515}
1516
1517#[derive(Debug)]
1524struct Dialer {
1525 endpoint: Endpoint,
1526 pending: JoinSet<(NodeId, anyhow::Result<endpoint::Connection>)>,
1527 pending_dials: HashMap<NodeId, CancellationToken>,
1528}
1529
1530impl Dialer {
1531 fn new(endpoint: Endpoint) -> Self {
1533 Self {
1534 endpoint,
1535 pending: Default::default(),
1536 pending_dials: Default::default(),
1537 }
1538 }
1539
1540 fn queue_dial(&mut self, node_id: NodeId, alpn: &'static [u8]) {
1542 if self.is_pending(node_id) {
1543 return;
1544 }
1545 let cancel = CancellationToken::new();
1546 self.pending_dials.insert(node_id, cancel.clone());
1547 let endpoint = self.endpoint.clone();
1548 self.pending.spawn(async move {
1549 let res = tokio::select! {
1550 biased;
1551 _ = cancel.cancelled() => Err(anyhow!("Cancelled")),
1552 res = endpoint.connect(node_id, alpn) => res
1553 };
1554 (node_id, res)
1555 });
1556 }
1557
1558 fn is_pending(&self, node: NodeId) -> bool {
1560 self.pending_dials.contains_key(&node)
1561 }
1562
1563 fn pending_count(&self) -> usize {
1565 self.pending_dials.len()
1566 }
1567
1568 fn endpoint(&self) -> &Endpoint {
1570 &self.endpoint
1571 }
1572}
1573
1574impl Stream for Dialer {
1575 type Item = (NodeId, anyhow::Result<endpoint::Connection>);
1576
1577 fn poll_next(
1578 mut self: Pin<&mut Self>,
1579 cx: &mut std::task::Context<'_>,
1580 ) -> Poll<Option<Self::Item>> {
1581 match self.pending.poll_join_next(cx) {
1582 Poll::Ready(Some(Ok((node_id, result)))) => {
1583 self.pending_dials.remove(&node_id);
1584 Poll::Ready(Some((node_id, result)))
1585 }
1586 Poll::Ready(Some(Err(e))) => {
1587 error!("dialer error: {:?}", e);
1588 Poll::Pending
1589 }
1590 _ => Poll::Pending,
1591 }
1592 }
1593}