1mod download_file;
5pub use download_file::*;
6
7use crate::utils::io::WithProgress;
8use crate::utils::reqwest_resume;
9use cid::Cid;
10use futures::{AsyncWriteExt, TryStreamExt};
11use reqwest::Response;
12use std::path::Path;
13use std::sync::{Arc, LazyLock};
14use tap::Pipe;
15use tokio::io::AsyncBufRead;
16use tokio_util::{
17 compat::TokioAsyncReadCompatExt,
18 either::Either::{Left, Right},
19};
20use tracing::info;
21use url::Url;
22
23pub fn global_http_client() -> reqwest::Client {
24 static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
25 CLIENT.clone()
26}
27
28pub async fn download_ipfs_file_trustlessly(
31 cid: &Cid,
32 gateway: &Url,
33 destination: &Path,
34) -> anyhow::Result<()> {
35 let url = {
36 let mut url = gateway.join(&cid.to_string())?;
37 url.set_query(Some("format=car"));
38 Ok::<_, anyhow::Error>(url)
39 }?;
40
41 let tmp =
42 tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
43 .into_temp_path();
44 {
45 let mut reader = reader(url.as_str(), DownloadFileOption::Resumable, None)
46 .await?
47 .compat();
48 let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
49 rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(cid))
50 .await?;
51 writer.flush().await?;
52 writer.close().await?;
53 }
54
55 tmp.persist(destination)?;
56
57 Ok(())
58}
59
60pub async fn reader(
68 location: &str,
69 option: DownloadFileOption,
70 callback: Option<Arc<dyn Fn(String) + Sync + Send>>,
71) -> anyhow::Result<impl AsyncBufRead> {
72 let (stream, content_length) = match Url::parse(location) {
76 Ok(url) => {
77 info!("Downloading file: {}", url);
78 match option {
79 DownloadFileOption::Resumable => {
80 let resume_resp = reqwest_resume::get(url).await?;
81 let resp = resume_resp.response().error_for_status_ref()?;
82 let content_length = resp.content_length().unwrap_or_default();
83 let stream = resume_resp
84 .bytes_stream()
85 .map_err(std::io::Error::other)
86 .pipe(tokio_util::io::StreamReader::new);
87 (Left(Left(stream)), content_length)
88 }
89 DownloadFileOption::NonResumable => {
90 let resp = global_http_client().get(url).send().await?;
91 let content_length = resp.content_length().unwrap_or_default();
92 let stream = resp
93 .bytes_stream()
94 .map_err(std::io::Error::other)
95 .pipe(tokio_util::io::StreamReader::new);
96 (Left(Right(stream)), content_length)
97 }
98 }
99 }
100 Err(_) => {
101 info!("Reading file: {}", location);
102 let stream = tokio::fs::File::open(location).await?;
103 let content_length = stream.metadata().await?.len();
104 (Right(stream), content_length)
105 }
106 };
107
108 Ok(tokio::io::BufReader::new(
109 WithProgress::wrap_sync_read_with_callback("Loading", stream, content_length, callback)
110 .bytes(),
111 ))
112}
113
114pub async fn http_get(url: &Url) -> anyhow::Result<Response> {
115 info!(%url, "GET");
116 Ok(global_http_client()
117 .get(url.clone())
118 .send()
119 .await?
120 .error_for_status()?)
121}