1pub use async_fetcher::Fetcher;
5
6use crate::request::Request as AptRequest;
7
8use futures::stream::{Stream, StreamExt};
9use std::{path::Path, pin::Pin, sync::Arc};
10use thiserror::Error;
11use tokio::sync::mpsc;
12
13pub type FetchEvents = Pin<Box<dyn Stream<Item = FetchEvent>>>;
14
15#[derive(Debug)]
16pub struct FetchEvent {
17 pub package: Arc<AptRequest>,
18 pub kind: EventKind,
19}
20
21impl FetchEvent {
22 pub fn new(package: Arc<AptRequest>, kind: EventKind) -> Self {
23 Self { package, kind }
24 }
25}
26
27#[derive(Debug)]
28pub enum EventKind {
29 Fetching,
31
32 Fetched,
34
35 Error(FetchError),
37
38 Validated,
40
41 Retrying,
43}
44
45#[derive(Debug, Error)]
46pub enum FetchError {
47 #[error("{}: fetched package had checksum error", package)]
48 Checksum {
49 package: String,
50 source: crate::hash::ChecksumError,
51 },
52
53 #[error("{}: download failed", package)]
54 Fetch {
55 package: String,
56 source: async_fetcher::Error,
57 },
58}
59
60pub struct FetchRequest {
61 pub package: AptRequest,
62 pub attempt: usize,
63}
64
65#[derive(Default)]
66pub struct PackageFetcher {
67 fetcher: Fetcher<AptRequest>,
68 concurrent: usize,
69}
70
71pub trait FetcherExt {
72 fn into_package_fetcher(self) -> PackageFetcher;
73}
74
75impl FetcherExt for Fetcher<AptRequest> {
76 fn into_package_fetcher(self) -> PackageFetcher {
77 PackageFetcher::from(self)
78 }
79}
80
81impl From<Fetcher<AptRequest>> for PackageFetcher {
82 fn from(fetcher: Fetcher<AptRequest>) -> Self {
83 PackageFetcher::new(fetcher)
84 }
85}
86
87impl PackageFetcher {
88 pub fn new(fetcher: Fetcher<AptRequest>) -> Self {
89 Self {
90 fetcher,
91 concurrent: 1,
92 }
93 }
94
95 pub fn concurrent(mut self, concurrent: usize) -> Self {
96 self.concurrent = concurrent;
97 self
98 }
99
100 pub fn fetch(
101 self,
102 packages: impl Stream<Item = Arc<AptRequest>> + Send + Unpin + 'static,
103 destination: Arc<Path>,
104 ) -> (
105 impl std::future::Future<Output = ()> + Send + 'static,
106 mpsc::UnboundedReceiver<FetchEvent>,
107 ) {
108 let (tx, rx) = mpsc::unbounded_channel::<FetchEvent>();
109 let (events_tx, mut events_rx) = mpsc::unbounded_channel();
110
111 let input_stream = packages.map(move |package| {
112 (
113 async_fetcher::Source::new(
114 Arc::from(vec![Box::from(&*package.uri)].into_boxed_slice()),
115 Arc::from(destination.join(&package.name)),
116 ),
117 package,
118 )
119 });
120
121 let mut fetch_results = self
122 .fetcher
123 .events(events_tx)
124 .build()
125 .stream_from(input_stream, self.concurrent.min(1));
126
127 let event_handler = {
128 let tx = tx.clone();
129 async move {
130 while let Some((dest, package, event)) = events_rx.recv().await {
131 match event {
132 async_fetcher::FetchEvent::Fetching => {
133 let _ = tx.send(FetchEvent::new(package, EventKind::Fetching));
134 }
135
136 async_fetcher::FetchEvent::Fetched => {
137 let _ = tx.send(FetchEvent::new(package.clone(), EventKind::Fetched));
138 let tx = tx.clone();
139
140 rayon::spawn(move || {
141 let event = match crate::hash::compare_hash(
142 &dest,
143 package.size,
144 &package.checksum,
145 ) {
146 Ok(()) => EventKind::Validated,
147 Err(source) => {
148 let _ = std::fs::remove_file(&dest);
149 EventKind::Error(FetchError::Checksum {
150 package: package.uri.clone(),
151 source,
152 })
153 }
154 };
155
156 let _ = tx.send(FetchEvent::new(package, event));
157 });
158 }
159
160 async_fetcher::FetchEvent::Retrying => {
161 let _ = tx.send(FetchEvent::new(package, EventKind::Retrying));
162 }
163
164 _ => (),
165 }
166 }
167 }
168 };
169
170 let fetcher = async move {
171 while let Some((dest, package, result)) = fetch_results.next().await {
172 if let Err(source) = result {
173 let _ = tx.send(FetchEvent::new(
174 package.clone(),
175 EventKind::Error(FetchError::Fetch {
176 package: package.uri.clone(),
177 source,
178 }),
179 ));
180
181 let _ = tokio::fs::remove_file(&dest).await;
182 }
183 }
184 };
185
186 let future = async move {
187 let _ = futures::future::join(event_handler, fetcher).await;
188 };
189
190 (future, rx)
191 }
192}