iroh_blobs/downloader/
get.rs

1//! [`Getter`] implementation that performs requests over [`Connection`]s.
2//!
3//! [`Connection`]: iroh::endpoint::Connection
4
5use futures_lite::FutureExt;
6use iroh::endpoint;
7
8use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
9use crate::{
10    get::{db::get_to_db_in_steps, error::GetError},
11    store::Store,
12};
13
14impl From<GetError> for FailureAction {
15    fn from(e: GetError) -> Self {
16        match e {
17            e @ GetError::NotFound(_) => FailureAction::AbortRequest(e),
18            e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
19            e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
20            e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
21            e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e),
22            // TODO: what do we want to do on local failures?
23            e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e),
24        }
25    }
26}
27
28/// [`Getter`] implementation that performs requests over [`Connection`]s.
29///
30/// [`Connection`]: iroh::endpoint::Connection
31pub(crate) struct IoGetter<S: Store> {
32    pub store: S,
33}
34
35impl<S: Store> Getter for IoGetter<S> {
36    type Connection = endpoint::Connection;
37    type NeedsConn = crate::get::db::GetStateNeedsConn;
38
39    fn get(
40        &mut self,
41        kind: DownloadKind,
42        progress_sender: BroadcastProgressSender,
43    ) -> GetStartFut<Self::NeedsConn> {
44        let store = self.store.clone();
45        async move {
46            match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
47                Err(err) => Err(err.into()),
48                Ok(crate::get::db::GetState::Complete(stats)) => {
49                    Ok(super::GetOutput::Complete(stats))
50                }
51                Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
52                    Ok(super::GetOutput::NeedsConn(needs_conn))
53                }
54            }
55        }
56        .boxed_local()
57    }
58}
59
60impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
61    fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
62        async move {
63            let res = self.proceed(conn).await;
64            match res {
65                Ok(stats) => Ok(stats),
66                Err(err) => Err(err.into()),
67            }
68        }
69        .boxed_local()
70    }
71}
72
73pub(super) fn track_metrics(
74    res: &Result<crate::get::Stats, FailureAction>,
75    metrics: &crate::metrics::Metrics,
76) {
77    match res {
78        Ok(stats) => {
79            let crate::get::Stats {
80                bytes_written,
81                bytes_read: _,
82                elapsed,
83            } = stats;
84
85            metrics.downloads_success.inc();
86            metrics.download_bytes_total.inc_by(*bytes_written);
87            metrics
88                .download_time_total
89                .inc_by(elapsed.as_millis() as u64);
90        }
91        Err(e) => match &e {
92            FailureAction::AbortRequest(GetError::NotFound(_)) => {
93                metrics.downloads_notfound.inc();
94            }
95            _ => {
96                metrics.downloads_error.inc();
97            }
98        },
99    }
100}