1mod download_file;
5pub use download_file::*;
6
7use crate::utils::io::WithProgress;
8use crate::utils::reqwest_resume;
9use anyhow::Context as _;
10use cid::Cid;
11use futures::{AsyncWriteExt, TryStreamExt};
12use reqwest::Response;
13use std::net::SocketAddr;
14use std::path::Path;
15use std::sync::{Arc, LazyLock};
16use tap::Pipe;
17use tokio::io::AsyncBufRead;
18use tokio::net::TcpListener;
19use tokio_util::{
20 compat::TokioAsyncReadCompatExt,
21 either::Either::{Left, Right},
22};
23use tracing::info;
24use url::Url;
25
26const MIN_LISTEN_BACKLOG: u32 = 4096;
37
38pub async fn bind_tcp_listener(addr: SocketAddr, backlog: u32) -> anyhow::Result<TcpListener> {
42 let socket = if addr.is_ipv6() {
43 tokio::net::TcpSocket::new_v6()
44 } else {
45 tokio::net::TcpSocket::new_v4()
46 }
47 .with_context(|| format!("could not create TCP socket for {addr}"))?;
48 let _ = socket.set_reuseaddr(true);
49 socket
50 .bind(addr)
51 .with_context(|| format!("could not bind to {addr}"))?;
52 socket
53 .listen(backlog.max(MIN_LISTEN_BACKLOG))
54 .with_context(|| format!("could not listen on {addr}"))
55}
56
57pub fn global_http_client() -> reqwest::Client {
58 static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
59 CLIENT.clone()
60}
61
62pub async fn download_ipfs_file_trustlessly(
65 cid: &Cid,
66 gateway: &Url,
67 destination: &Path,
68) -> anyhow::Result<()> {
69 let url = {
70 let mut url = gateway.join(&cid.to_string())?;
71 url.set_query(Some("format=car"));
72 Ok::<_, anyhow::Error>(url)
73 }?;
74
75 let tmp =
76 tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
77 .into_temp_path();
78 {
79 let mut reader = reader(url.as_str(), DownloadFileOption::Resumable, None)
80 .await?
81 .compat();
82 let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
83 rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(cid))
84 .await?;
85 writer.flush().await?;
86 writer.close().await?;
87 }
88
89 tmp.persist(destination)?;
90
91 Ok(())
92}
93
94pub async fn reader(
102 location: &str,
103 option: DownloadFileOption,
104 callback: Option<Arc<dyn Fn(String) + Sync + Send>>,
105) -> anyhow::Result<impl AsyncBufRead> {
106 let (stream, content_length) = match Url::parse(location) {
110 Ok(url) => {
111 info!("Downloading file: {}", url);
112 match option {
113 DownloadFileOption::Resumable => {
114 let resume_resp = reqwest_resume::get(url).await?;
115 let resp = resume_resp.response().error_for_status_ref()?;
116 let content_length = resp.content_length().unwrap_or_default();
117 let stream = resume_resp
118 .bytes_stream()
119 .map_err(std::io::Error::other)
120 .pipe(tokio_util::io::StreamReader::new);
121 (Left(Left(stream)), content_length)
122 }
123 DownloadFileOption::NonResumable => {
124 let resp = global_http_client().get(url).send().await?;
125 let content_length = resp.content_length().unwrap_or_default();
126 let stream = resp
127 .bytes_stream()
128 .map_err(std::io::Error::other)
129 .pipe(tokio_util::io::StreamReader::new);
130 (Left(Right(stream)), content_length)
131 }
132 }
133 }
134 Err(_) => {
135 info!("Reading file: {}", location);
136 let stream = tokio::fs::File::open(location).await?;
137 let content_length = stream.metadata().await?.len();
138 (Right(stream), content_length)
139 }
140 };
141
142 const DOWNLOAD_BUFFER_SIZE: usize = 512 * 1024;
144 Ok(tokio::io::BufReader::with_capacity(
145 DOWNLOAD_BUFFER_SIZE,
146 WithProgress::wrap_sync_read_with_callback("Loading", stream, content_length, callback)
147 .bytes(),
148 ))
149}
150
151pub async fn http_get(url: &Url) -> anyhow::Result<Response> {
152 info!(%url, "GET");
153 Ok(global_http_client()
154 .get(url.clone())
155 .send()
156 .await?
157 .error_for_status()?)
158}