forest/utils/
net.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4mod download_file;
5pub use download_file::*;
6
7use crate::utils::io::WithProgress;
8use crate::utils::reqwest_resume;
9use cid::Cid;
10use futures::{AsyncWriteExt, TryStreamExt};
11use reqwest::Response;
12use std::path::Path;
13use std::sync::{Arc, LazyLock};
14use tap::Pipe;
15use tokio::io::AsyncBufRead;
16use tokio_util::{
17    compat::TokioAsyncReadCompatExt,
18    either::Either::{Left, Right},
19};
20use tracing::info;
21use url::Url;
22
23pub fn global_http_client() -> reqwest::Client {
24    static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
25    CLIENT.clone()
26}
27
28/// Download a file via IPFS HTTP gateway in trustless mode.
29/// See <https://github.com/ipfs/specs/blob/main/http-gateways/TRUSTLESS_GATEWAY.md>
30pub async fn download_ipfs_file_trustlessly(
31    cid: &Cid,
32    gateway: &Url,
33    destination: &Path,
34) -> anyhow::Result<()> {
35    let url = {
36        let mut url = gateway.join(&cid.to_string())?;
37        url.set_query(Some("format=car"));
38        Ok::<_, anyhow::Error>(url)
39    }?;
40
41    let tmp =
42        tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
43            .into_temp_path();
44    {
45        let mut reader = reader(url.as_str(), DownloadFileOption::Resumable, None)
46            .await?
47            .compat();
48        let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
49        rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(cid))
50            .await?;
51        writer.flush().await?;
52        writer.close().await?;
53    }
54
55    tmp.persist(destination)?;
56
57    Ok(())
58}
59
60/// `location` may be:
61/// - a path to a local file
62/// - a URL to a web resource
63/// - compressed
64/// - uncompressed
65///
66/// This function returns a reader of uncompressed data.
67pub async fn reader(
68    location: &str,
69    option: DownloadFileOption,
70    callback: Option<Arc<dyn Fn(String) + Sync + Send>>,
71) -> anyhow::Result<impl AsyncBufRead> {
72    // This isn't the cleanest approach in terms of error-handling, but it works. If the URL is
73    // malformed it'll end up trying to treat it as a local filepath. If that fails - an error
74    // is thrown.
75    let (stream, content_length) = match Url::parse(location) {
76        Ok(url) => {
77            info!("Downloading file: {}", url);
78            match option {
79                DownloadFileOption::Resumable => {
80                    let resume_resp = reqwest_resume::get(url).await?;
81                    let resp = resume_resp.response().error_for_status_ref()?;
82                    let content_length = resp.content_length().unwrap_or_default();
83                    let stream = resume_resp
84                        .bytes_stream()
85                        .map_err(std::io::Error::other)
86                        .pipe(tokio_util::io::StreamReader::new);
87                    (Left(Left(stream)), content_length)
88                }
89                DownloadFileOption::NonResumable => {
90                    let resp = global_http_client().get(url).send().await?;
91                    let content_length = resp.content_length().unwrap_or_default();
92                    let stream = resp
93                        .bytes_stream()
94                        .map_err(std::io::Error::other)
95                        .pipe(tokio_util::io::StreamReader::new);
96                    (Left(Right(stream)), content_length)
97                }
98            }
99        }
100        Err(_) => {
101            info!("Reading file: {}", location);
102            let stream = tokio::fs::File::open(location).await?;
103            let content_length = stream.metadata().await?.len();
104            (Right(stream), content_length)
105        }
106    };
107
108    Ok(tokio::io::BufReader::new(
109        WithProgress::wrap_sync_read_with_callback("Loading", stream, content_length, callback)
110            .bytes(),
111    ))
112}
113
114pub async fn http_get(url: &Url) -> anyhow::Result<Response> {
115    info!(%url, "GET");
116    Ok(global_http_client()
117        .get(url.clone())
118        .send()
119        .await?
120        .error_for_status()?)
121}