forest/utils/proofs_api/
paramfetch.rs1use std::{
11 io::{self, ErrorKind},
12 path::{Path, PathBuf},
13 sync::{Arc, LazyLock},
14};
15
16use crate::{
17 shim::sector::SectorSize,
18 utils::net::{download_ipfs_file_trustlessly, global_http_client},
19};
20use anyhow::{Context, bail};
21use backon::{ExponentialBuilder, Retryable};
22use futures::{AsyncWriteExt, TryStreamExt, stream::FuturesUnordered};
23use tokio::{
24 fs::{self},
25 sync::Mutex,
26};
27use tracing::{debug, info, warn};
28
29use super::parameters::{
30 DEFAULT_PARAMETERS, PROOFS_PARAMETER_CACHE_ENV, ParameterData, ParameterMap,
31 check_parameter_file, param_dir,
32};
33
34const DEFAULT_IPFS_GATEWAY: &str = "https://proofs.filecoin.io/ipfs/";
37const CLOUDFLARE_PROOF_PARAMETER_DOMAIN: &str = "filecoin-proof-parameters.chainsafe.dev";
39
40const PROOFS_ONLY_IPFS_GATEWAY_ENV: &str = "FOREST_PROOFS_ONLY_IPFS_GATEWAY";
42
43const IPFS_GATEWAY_ENV: &str = "IPFS_GATEWAY";
47
48pub enum SectorSizeOpt {
50 All,
52 Keys,
54 Size(SectorSize),
56}
57
58pub async fn ensure_proof_params_downloaded() -> anyhow::Result<()> {
60 #[cfg(test)]
61 if crate::utils::misc::env::is_env_truthy("FOREST_TEST_SKIP_PROOF_PARAM_CHECK") {
62 return Ok(());
63 }
64
65 let data_dir = std::env::var(PROOFS_PARAMETER_CACHE_ENV).unwrap_or_default();
66 if data_dir.is_empty() {
67 anyhow::bail!("Proof parameter data dir is not set");
68 }
69 static RUN_ONCE: LazyLock<Mutex<bool>> = LazyLock::new(|| Mutex::new(false));
70 let mut run_once = RUN_ONCE.lock().await;
71 if *run_once {
72 Ok(())
73 } else {
74 get_params_default(Path::new(&data_dir), SectorSizeOpt::Keys, false).await?;
75 *run_once = true;
76 Ok(())
77 }
78}
79
80pub async fn get_params(
83 data_dir: &Path,
84 param_json: &str,
85 storage_size: SectorSizeOpt,
86 dry_run: bool,
87) -> anyhow::Result<()> {
88 if dry_run {
90 println!("{}", param_dir(data_dir).to_string_lossy());
91 return Ok(());
92 }
93
94 fs::create_dir_all(param_dir(data_dir)).await?;
95
96 let params: ParameterMap = serde_json::from_str(param_json)?;
97
98 FuturesUnordered::from_iter(
99 params
100 .into_iter()
101 .filter(|(name, info)| match storage_size {
102 SectorSizeOpt::Keys => !name.ends_with("params"),
103 SectorSizeOpt::Size(size) => {
104 size as u64 == info.sector_size || !name.ends_with(".params")
105 }
106 SectorSizeOpt::All => true,
107 })
108 .map(|(name, info)| async move {
109 let data_dir_clone = data_dir.to_owned();
110 fetch_verify_params(&data_dir_clone, &name, Arc::new(info)).await
111 }),
112 )
113 .try_collect::<Vec<_>>()
114 .await?;
115
116 Ok(())
117}
118
119#[inline]
122pub async fn get_params_default(
123 data_dir: &Path,
124 storage_size: SectorSizeOpt,
125 dry_run: bool,
126) -> anyhow::Result<()> {
127 get_params(data_dir, DEFAULT_PARAMETERS, storage_size, dry_run).await
128}
129
130async fn fetch_verify_params(
131 data_dir: &Path,
132 name: &str,
133 info: Arc<ParameterData>,
134) -> anyhow::Result<()> {
135 crate::def_is_env_truthy!(force_ipfs_gateway, PROOFS_ONLY_IPFS_GATEWAY_ENV);
136
137 let path: PathBuf = param_dir(data_dir).join(name);
138
139 match check_parameter_file(&path, &info).await {
140 Ok(()) => return Ok(()),
141 Err(e) => {
142 if let Some(e) = e.downcast_ref::<io::Error>() {
143 if e.kind() == ErrorKind::NotFound {
144 }
146 } else {
147 warn!("Error checking file: {e:?}");
148 }
149 }
150 }
151
152 if force_ipfs_gateway() {
153 fetch_params_ipfs_gateway(&path, &info).await?;
154 } else if let Err(e) = fetch_params_cloudflare(name, &path).await {
155 warn!("Failed to fetch param file from Cloudflare R2: {e:?}. Falling back to IPFS gateway",);
156 fetch_params_ipfs_gateway(&path, &info).await?;
157 }
158
159 check_parameter_file(&path, &info).await?;
160 Ok(())
161}
162
163async fn fetch_params_ipfs_gateway(path: &Path, info: &ParameterData) -> anyhow::Result<()> {
164 let gateway = std::env::var(IPFS_GATEWAY_ENV)
165 .unwrap_or_else(|_| DEFAULT_IPFS_GATEWAY.to_owned())
166 .parse()?;
167 info!(
168 "Fetching param file {path} from {gateway}",
169 path = path.display()
170 );
171 let result = (|| download_ipfs_file_trustlessly(&info.cid, &gateway, path))
172 .retry(ExponentialBuilder::default())
173 .notify(|err, dur| {
174 debug!(
175 "retrying download_ipfs_file_trustlessly {err} after {}",
176 humantime::format_duration(dur)
177 );
178 })
179 .await;
180
181 debug!(
182 "Done fetching param file {path} from {gateway}",
183 path = path.display(),
184 );
185 result
186}
187
188async fn fetch_params_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
190 info!("Fetching param file {name} from Cloudflare R2 {CLOUDFLARE_PROOF_PARAMETER_DOMAIN}");
191 let result = (|| download_from_cloudflare(name, path))
192 .retry(ExponentialBuilder::default())
193 .notify(|err, dur| {
194 debug!(
195 "retrying download_from_cloudflare {err} after {}",
196 humantime::format_duration(dur)
197 );
198 })
199 .await;
200 debug!(
201 "Done fetching param file {} from Cloudflare",
202 path.display()
203 );
204 result
205}
206
207async fn download_from_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
210 let response = global_http_client()
211 .get(format!(
212 "https://{CLOUDFLARE_PROOF_PARAMETER_DOMAIN}/{name}"
213 ))
214 .send()
215 .await
216 .context("Failed to fetch param file from Cloudflare R2")?;
217
218 if !response.status().is_success() {
219 bail!(
220 "Failed to fetch param file from Cloudflare R2: {:?}",
221 response
222 );
223 }
224 let tmp = tempfile::NamedTempFile::new_in(path.parent().context("No parent dir")?)
230 .context("Failed to create temp file")?
231 .into_temp_path();
232
233 let reader = response
234 .bytes_stream()
235 .map_err(std::io::Error::other)
236 .into_async_read();
237
238 let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
239 futures::io::copy(reader, &mut writer)
240 .await
241 .context("Failed to write to temp file")?;
242
243 writer.flush().await.context("Failed to flush temp file")?;
244 writer.close().await.context("Failed to close temp file")?;
245
246 tmp.persist(path).context("Failed to persist temp file")?;
247 Ok(())
248}