apt_swarm/
fetch.rs

1use crate::config;
2use crate::config::Repository;
3use crate::db::DatabaseClient;
4use crate::errors::*;
5use crate::keyring::Keyring;
6use crate::signed::Signed;
7use sequoia_openpgp::Fingerprint;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::task::JoinSet;
12
13pub const DEFAULT_CONCURRENCY: usize = 4;
14pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
15pub const READ_TIMEOUT: Duration = Duration::from_secs(60);
16
17async fn fetch_repository_updates(
18    client: &reqwest::Client,
19    keyring: &Option<Keyring>,
20    repository: &config::Repository,
21) -> Result<Vec<(Option<Fingerprint>, Signed)>> {
22    let mut out = Vec::new();
23
24    for source in &repository.urls {
25        let Ok(signed) = source
26            .fetch(client)
27            .await
28            .inspect_err(|err| error!("Error fetching latest release: {err:#}"))
29        else {
30            continue;
31        };
32
33        for item in signed.canonicalize(keyring.as_ref())? {
34            out.push(item);
35        }
36    }
37
38    Ok(out)
39}
40
41pub fn client(proxy: Option<SocketAddr>) -> Result<reqwest::Client> {
42    let mut client = reqwest::Client::builder()
43        .connect_timeout(CONNECT_TIMEOUT)
44        .read_timeout(READ_TIMEOUT);
45    if let Some(proxy) = proxy {
46        let proxy = format!("socks5h://{proxy:?}");
47        let proxy = reqwest::Proxy::all(&proxy)
48            .with_context(|| anyhow!("Failed to parse as proxy: {proxy:?}"))?;
49        client = client.proxy(proxy);
50    }
51    let client = client.build().context("Failed to setup http client")?;
52    Ok(client)
53}
54
55pub async fn fetch_updates<D: DatabaseClient>(
56    db: &mut D,
57    keyring: Arc<Option<Keyring>>,
58    concurrency: Option<usize>,
59    repositories: Vec<Repository>,
60    proxy: Option<SocketAddr>,
61) -> Result<()> {
62    let concurrency = concurrency.unwrap_or(DEFAULT_CONCURRENCY);
63    let mut queue = repositories.into_iter();
64    let mut pool = JoinSet::new();
65    let client = client(proxy)?;
66
67    loop {
68        while pool.len() < concurrency {
69            if let Some(repository) = queue.next() {
70                let client = client.clone();
71                let keyring = keyring.clone();
72                pool.spawn(async move {
73                    fetch_repository_updates(&client, &keyring, &repository).await
74                });
75            } else {
76                // no more tasks to schedule
77                break;
78            }
79        }
80        if let Some(join) = pool.join_next().await {
81            match join.context("Failed to join task")? {
82                Ok(list) => {
83                    for (fp, variant) in list {
84                        let fp = fp.context(
85                            "Signature can't be imported because the signature is unverified",
86                        )?;
87                        db.add_release(&fp, &variant).await?;
88                    }
89                }
90                Err(err) => error!("Error fetching latest release: {err:#}"),
91            }
92        } else {
93            // no more tasks in pool
94            break;
95        }
96    }
97
98    Ok(())
99}