apt_cmd/
fetch.rs

1// Copyright 2021-2022 System76 <info@system76.com>
2// SPDX-License-Identifier: MPL-2.0
3
4pub use async_fetcher::Fetcher;
5
6use crate::request::Request as AptRequest;
7
8use futures::stream::{Stream, StreamExt};
9use std::{path::Path, pin::Pin, sync::Arc};
10use thiserror::Error;
11use tokio::sync::mpsc;
12
13pub type FetchEvents = Pin<Box<dyn Stream<Item = FetchEvent>>>;
14
15#[derive(Debug)]
16pub struct FetchEvent {
17    pub package: Arc<AptRequest>,
18    pub kind: EventKind,
19}
20
21impl FetchEvent {
22    pub fn new(package: Arc<AptRequest>, kind: EventKind) -> Self {
23        Self { package, kind }
24    }
25}
26
27#[derive(Debug)]
28pub enum EventKind {
29    /// Request to download package is being initiated
30    Fetching,
31
32    /// Package was downloaded successfully
33    Fetched,
34
35    /// An error occurred fetching package
36    Error(FetchError),
37
38    /// The package has been validated
39    Validated,
40
41    // Package is being retried
42    Retrying,
43}
44
45#[derive(Debug, Error)]
46pub enum FetchError {
47    #[error("{}: fetched package had checksum error", package)]
48    Checksum {
49        package: String,
50        source: crate::hash::ChecksumError,
51    },
52
53    #[error("{}: download failed", package)]
54    Fetch {
55        package: String,
56        source: async_fetcher::Error,
57    },
58}
59
60pub struct FetchRequest {
61    pub package: AptRequest,
62    pub attempt: usize,
63}
64
65#[derive(Default)]
66pub struct PackageFetcher {
67    fetcher: Fetcher<AptRequest>,
68    concurrent: usize,
69}
70
71pub trait FetcherExt {
72    fn into_package_fetcher(self) -> PackageFetcher;
73}
74
75impl FetcherExt for Fetcher<AptRequest> {
76    fn into_package_fetcher(self) -> PackageFetcher {
77        PackageFetcher::from(self)
78    }
79}
80
81impl From<Fetcher<AptRequest>> for PackageFetcher {
82    fn from(fetcher: Fetcher<AptRequest>) -> Self {
83        PackageFetcher::new(fetcher)
84    }
85}
86
87impl PackageFetcher {
88    pub fn new(fetcher: Fetcher<AptRequest>) -> Self {
89        Self {
90            fetcher,
91            concurrent: 1,
92        }
93    }
94
95    pub fn concurrent(mut self, concurrent: usize) -> Self {
96        self.concurrent = concurrent;
97        self
98    }
99
100    pub fn fetch(
101        self,
102        packages: impl Stream<Item = Arc<AptRequest>> + Send + Unpin + 'static,
103        destination: Arc<Path>,
104    ) -> (
105        impl std::future::Future<Output = ()> + Send + 'static,
106        mpsc::UnboundedReceiver<FetchEvent>,
107    ) {
108        let (tx, rx) = mpsc::unbounded_channel::<FetchEvent>();
109        let (events_tx, mut events_rx) = mpsc::unbounded_channel();
110
111        let input_stream = packages.map(move |package| {
112            (
113                async_fetcher::Source::new(
114                    Arc::from(vec![Box::from(&*package.uri)].into_boxed_slice()),
115                    Arc::from(destination.join(&package.name)),
116                ),
117                package,
118            )
119        });
120
121        let mut fetch_results = self
122            .fetcher
123            .events(events_tx)
124            .build()
125            .stream_from(input_stream, self.concurrent.min(1));
126
127        let event_handler = {
128            let tx = tx.clone();
129            async move {
130                while let Some((dest, package, event)) = events_rx.recv().await {
131                    match event {
132                        async_fetcher::FetchEvent::Fetching => {
133                            let _ = tx.send(FetchEvent::new(package, EventKind::Fetching));
134                        }
135
136                        async_fetcher::FetchEvent::Fetched => {
137                            let _ = tx.send(FetchEvent::new(package.clone(), EventKind::Fetched));
138                            let tx = tx.clone();
139
140                            rayon::spawn(move || {
141                                let event = match crate::hash::compare_hash(
142                                    &dest,
143                                    package.size,
144                                    &package.checksum,
145                                ) {
146                                    Ok(()) => EventKind::Validated,
147                                    Err(source) => {
148                                        let _ = std::fs::remove_file(&dest);
149                                        EventKind::Error(FetchError::Checksum {
150                                            package: package.uri.clone(),
151                                            source,
152                                        })
153                                    }
154                                };
155
156                                let _ = tx.send(FetchEvent::new(package, event));
157                            });
158                        }
159
160                        async_fetcher::FetchEvent::Retrying => {
161                            let _ = tx.send(FetchEvent::new(package, EventKind::Retrying));
162                        }
163
164                        _ => (),
165                    }
166                }
167            }
168        };
169
170        let fetcher = async move {
171            while let Some((dest, package, result)) = fetch_results.next().await {
172                if let Err(source) = result {
173                    let _ = tx.send(FetchEvent::new(
174                        package.clone(),
175                        EventKind::Error(FetchError::Fetch {
176                            package: package.uri.clone(),
177                            source,
178                        }),
179                    ));
180
181                    let _ = tokio::fs::remove_file(&dest).await;
182                }
183            }
184        };
185
186        let future = async move {
187            let _ = futures::future::join(event_handler, fetcher).await;
188        };
189
190        (future, rx)
191    }
192}