iroh_blobs/downloader/
get.rs1use futures_lite::FutureExt;
6use iroh::endpoint;
7
8use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
9use crate::{
10 get::{db::get_to_db_in_steps, error::GetError},
11 store::Store,
12};
13
14impl From<GetError> for FailureAction {
15 fn from(e: GetError) -> Self {
16 match e {
17 e @ GetError::NotFound(_) => FailureAction::AbortRequest(e),
18 e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
19 e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
20 e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
21 e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e),
22 e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e),
24 }
25 }
26}
27
28pub(crate) struct IoGetter<S: Store> {
32 pub store: S,
33}
34
35impl<S: Store> Getter for IoGetter<S> {
36 type Connection = endpoint::Connection;
37 type NeedsConn = crate::get::db::GetStateNeedsConn;
38
39 fn get(
40 &mut self,
41 kind: DownloadKind,
42 progress_sender: BroadcastProgressSender,
43 ) -> GetStartFut<Self::NeedsConn> {
44 let store = self.store.clone();
45 async move {
46 match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
47 Err(err) => Err(err.into()),
48 Ok(crate::get::db::GetState::Complete(stats)) => {
49 Ok(super::GetOutput::Complete(stats))
50 }
51 Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
52 Ok(super::GetOutput::NeedsConn(needs_conn))
53 }
54 }
55 }
56 .boxed_local()
57 }
58}
59
60impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
61 fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
62 async move {
63 let res = self.proceed(conn).await;
64 match res {
65 Ok(stats) => Ok(stats),
66 Err(err) => Err(err.into()),
67 }
68 }
69 .boxed_local()
70 }
71}
72
73pub(super) fn track_metrics(
74 res: &Result<crate::get::Stats, FailureAction>,
75 metrics: &crate::metrics::Metrics,
76) {
77 match res {
78 Ok(stats) => {
79 let crate::get::Stats {
80 bytes_written,
81 bytes_read: _,
82 elapsed,
83 } = stats;
84
85 metrics.downloads_success.inc();
86 metrics.download_bytes_total.inc_by(*bytes_written);
87 metrics
88 .download_time_total
89 .inc_by(elapsed.as_millis() as u64);
90 }
91 Err(e) => match &e {
92 FailureAction::AbortRequest(GetError::NotFound(_)) => {
93 metrics.downloads_notfound.inc();
94 }
95 _ => {
96 metrics.downloads_error.inc();
97 }
98 },
99 }
100}