Skip to main content

forest/utils/
net.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4mod download_file;
5pub use download_file::*;
6
7use crate::utils::io::WithProgress;
8use crate::utils::reqwest_resume;
9use anyhow::Context as _;
10use cid::Cid;
11use futures::{AsyncWriteExt, TryStreamExt};
12use reqwest::Response;
13use std::net::SocketAddr;
14use std::path::Path;
15use std::sync::{Arc, LazyLock};
16use tap::Pipe;
17use tokio::io::AsyncBufRead;
18use tokio::net::TcpListener;
19use tokio_util::{
20    compat::TokioAsyncReadCompatExt,
21    either::Either::{Left, Right},
22};
23use tracing::info;
24use url::Url;
25
26/// Minimum listen backlog applied by [`bind_tcp_listener`].
27///
28/// `tokio::net::TcpListener::bind` (via `mio` and the Rust standard library)
29/// uses a fixed backlog of 128, which is too small to absorb bursts of
30/// simultaneous connection attempts: when the accept queue overflows, the
31/// kernel silently drops the completed handshakes and clients only retry
32/// after `TCP_RTO_MIN` (~1s on Linux). The kernel further clamps the
33/// requested backlog to `/proc/sys/net/core/somaxconn`, so it is safe to
34/// ask for a large value. 4096 matches the Linux default `somaxconn` on
35/// kernels 5.4 and newer, and what Lotus and most other servers use.
36const MIN_LISTEN_BACKLOG: u32 = 4096;
37
38/// Bind a TCP listener with an explicit listen backlog, floored at
39/// [`MIN_LISTEN_BACKLOG`]. Use this for any externally-facing listener that
40/// might face a burst of simultaneous connection attempts.
41pub async fn bind_tcp_listener(addr: SocketAddr, backlog: u32) -> anyhow::Result<TcpListener> {
42    let socket = if addr.is_ipv6() {
43        tokio::net::TcpSocket::new_v6()
44    } else {
45        tokio::net::TcpSocket::new_v4()
46    }
47    .with_context(|| format!("could not create TCP socket for {addr}"))?;
48    let _ = socket.set_reuseaddr(true);
49    socket
50        .bind(addr)
51        .with_context(|| format!("could not bind to {addr}"))?;
52    socket
53        .listen(backlog.max(MIN_LISTEN_BACKLOG))
54        .with_context(|| format!("could not listen on {addr}"))
55}
56
57pub fn global_http_client() -> reqwest::Client {
58    static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
59    CLIENT.clone()
60}
61
62/// Download a file via IPFS HTTP gateway in trustless mode.
63/// See <https://github.com/ipfs/specs/blob/main/http-gateways/TRUSTLESS_GATEWAY.md>
64pub async fn download_ipfs_file_trustlessly(
65    cid: &Cid,
66    gateway: &Url,
67    destination: &Path,
68) -> anyhow::Result<()> {
69    let url = {
70        let mut url = gateway.join(&cid.to_string())?;
71        url.set_query(Some("format=car"));
72        Ok::<_, anyhow::Error>(url)
73    }?;
74
75    let tmp =
76        tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
77            .into_temp_path();
78    {
79        let mut reader = reader(url.as_str(), DownloadFileOption::Resumable, None)
80            .await?
81            .compat();
82        let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
83        rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(cid))
84            .await?;
85        writer.flush().await?;
86        writer.close().await?;
87    }
88
89    tmp.persist(destination)?;
90
91    Ok(())
92}
93
94/// `location` may be:
95/// - a path to a local file
96/// - a URL to a web resource
97/// - compressed
98/// - uncompressed
99///
100/// This function returns a reader of uncompressed data.
101pub async fn reader(
102    location: &str,
103    option: DownloadFileOption,
104    callback: Option<Arc<dyn Fn(String) + Sync + Send>>,
105) -> anyhow::Result<impl AsyncBufRead> {
106    // This isn't the cleanest approach in terms of error-handling, but it works. If the URL is
107    // malformed it'll end up trying to treat it as a local filepath. If that fails - an error
108    // is thrown.
109    let (stream, content_length) = match Url::parse(location) {
110        Ok(url) => {
111            info!("Downloading file: {}", url);
112            match option {
113                DownloadFileOption::Resumable => {
114                    let resume_resp = reqwest_resume::get(url).await?;
115                    let resp = resume_resp.response().error_for_status_ref()?;
116                    let content_length = resp.content_length().unwrap_or_default();
117                    let stream = resume_resp
118                        .bytes_stream()
119                        .map_err(std::io::Error::other)
120                        .pipe(tokio_util::io::StreamReader::new);
121                    (Left(Left(stream)), content_length)
122                }
123                DownloadFileOption::NonResumable => {
124                    let resp = global_http_client().get(url).send().await?;
125                    let content_length = resp.content_length().unwrap_or_default();
126                    let stream = resp
127                        .bytes_stream()
128                        .map_err(std::io::Error::other)
129                        .pipe(tokio_util::io::StreamReader::new);
130                    (Left(Right(stream)), content_length)
131                }
132            }
133        }
134        Err(_) => {
135            info!("Reading file: {}", location);
136            let stream = tokio::fs::File::open(location).await?;
137            let content_length = stream.metadata().await?.len();
138            (Right(stream), content_length)
139        }
140    };
141
142    // Use a larger buffer (512KB) for better throughput on large files
143    const DOWNLOAD_BUFFER_SIZE: usize = 512 * 1024;
144    Ok(tokio::io::BufReader::with_capacity(
145        DOWNLOAD_BUFFER_SIZE,
146        WithProgress::wrap_sync_read_with_callback("Loading", stream, content_length, callback)
147            .bytes(),
148    ))
149}
150
151pub async fn http_get(url: &Url) -> anyhow::Result<Response> {
152    info!(%url, "GET");
153    Ok(global_http_client()
154        .get(url.clone())
155        .send()
156        .await?
157        .error_for_status()?)
158}