forest/utils/proofs_api/
paramfetch.rs1use std::{
11 io::{self, ErrorKind},
12 path::{Path, PathBuf},
13 sync::Arc,
14};
15
16use crate::{
17 shim::sector::SectorSize,
18 utils::{
19 misc::env::is_env_truthy,
20 net::{download_ipfs_file_trustlessly, global_http_client},
21 },
22};
23use anyhow::{Context, bail};
24use backon::{ExponentialBuilder, Retryable};
25use futures::{AsyncWriteExt, TryStreamExt, stream::FuturesUnordered};
26use tokio::fs::{self};
27use tracing::{debug, info, warn};
28
29use super::parameters::{
30 DEFAULT_PARAMETERS, PROOFS_PARAMETER_CACHE_ENV, ParameterData, ParameterMap,
31 check_parameter_file, param_dir,
32};
33
34const DEFAULT_IPFS_GATEWAY: &str = "https://proofs.filecoin.io/ipfs/";
37const CLOUDFLARE_PROOF_PARAMETER_DOMAIN: &str = "filecoin-proof-parameters.chainsafe.dev";
39
40const PROOFS_ONLY_IPFS_GATEWAY_ENV: &str = "FOREST_PROOFS_ONLY_IPFS_GATEWAY";
42
43const IPFS_GATEWAY_ENV: &str = "IPFS_GATEWAY";
47
48pub enum SectorSizeOpt {
50 All,
52 Keys,
54 Size(SectorSize),
56}
57
58pub async fn ensure_proof_params_downloaded() -> anyhow::Result<()> {
60 let data_dir = std::env::var(PROOFS_PARAMETER_CACHE_ENV).unwrap_or_default();
61 if data_dir.is_empty() {
62 anyhow::bail!("Proof parameter data dir is not set");
63 }
64 get_params_default(Path::new(&data_dir), SectorSizeOpt::Keys, false).await?;
65 Ok(())
66}
67
68pub async fn get_params(
71 data_dir: &Path,
72 param_json: &str,
73 storage_size: SectorSizeOpt,
74 dry_run: bool,
75) -> Result<(), anyhow::Error> {
76 if dry_run {
78 println!("{}", param_dir(data_dir).to_string_lossy());
79 return Ok(());
80 }
81
82 fs::create_dir_all(param_dir(data_dir)).await?;
83
84 let params: ParameterMap = serde_json::from_str(param_json)?;
85
86 FuturesUnordered::from_iter(
87 params
88 .into_iter()
89 .filter(|(name, info)| match storage_size {
90 SectorSizeOpt::Keys => !name.ends_with("params"),
91 SectorSizeOpt::Size(size) => {
92 size as u64 == info.sector_size || !name.ends_with(".params")
93 }
94 SectorSizeOpt::All => true,
95 })
96 .map(|(name, info)| async move {
97 let data_dir_clone = data_dir.to_owned();
98 fetch_verify_params(&data_dir_clone, &name, Arc::new(info)).await
99 }),
100 )
101 .try_collect::<Vec<_>>()
102 .await?;
103
104 Ok(())
105}
106
107#[inline]
110pub async fn get_params_default(
111 data_dir: &Path,
112 storage_size: SectorSizeOpt,
113 dry_run: bool,
114) -> Result<(), anyhow::Error> {
115 get_params(data_dir, DEFAULT_PARAMETERS, storage_size, dry_run).await
116}
117
118async fn fetch_verify_params(
119 data_dir: &Path,
120 name: &str,
121 info: Arc<ParameterData>,
122) -> Result<(), anyhow::Error> {
123 let path: PathBuf = param_dir(data_dir).join(name);
124
125 match check_parameter_file(&path, &info).await {
126 Ok(()) => return Ok(()),
127 Err(e) => {
128 if let Some(e) = e.downcast_ref::<io::Error>() {
129 if e.kind() == ErrorKind::NotFound {
130 }
132 } else {
133 warn!("Error checking file: {e:?}");
134 }
135 }
136 }
137
138 if is_env_truthy(PROOFS_ONLY_IPFS_GATEWAY_ENV) {
139 fetch_params_ipfs_gateway(&path, &info).await?;
140 } else if let Err(e) = fetch_params_cloudflare(name, &path).await {
141 warn!("Failed to fetch param file from Cloudflare R2: {e:?}. Falling back to IPFS gateway",);
142 fetch_params_ipfs_gateway(&path, &info).await?;
143 }
144
145 check_parameter_file(&path, &info).await?;
146 Ok(())
147}
148
149async fn fetch_params_ipfs_gateway(path: &Path, info: &ParameterData) -> anyhow::Result<()> {
150 let gateway = std::env::var(IPFS_GATEWAY_ENV)
151 .unwrap_or_else(|_| DEFAULT_IPFS_GATEWAY.to_owned())
152 .parse()?;
153 info!(
154 "Fetching param file {path} from {gateway}",
155 path = path.display()
156 );
157 let result = (|| download_ipfs_file_trustlessly(&info.cid, &gateway, path))
158 .retry(ExponentialBuilder::default())
159 .notify(|err, dur| {
160 debug!(
161 "retrying download_ipfs_file_trustlessly {err} after {}",
162 humantime::format_duration(dur)
163 );
164 })
165 .await;
166
167 debug!(
168 "Done fetching param file {path} from {gateway}",
169 path = path.display(),
170 );
171 result
172}
173
174async fn fetch_params_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
176 info!("Fetching param file {name} from Cloudflare R2 {CLOUDFLARE_PROOF_PARAMETER_DOMAIN}");
177 let result = (|| download_from_cloudflare(name, path))
178 .retry(ExponentialBuilder::default())
179 .notify(|err, dur| {
180 debug!(
181 "retrying download_from_cloudflare {err} after {}",
182 humantime::format_duration(dur)
183 );
184 })
185 .await;
186 debug!(
187 "Done fetching param file {} from Cloudflare",
188 path.display()
189 );
190 result
191}
192
193async fn download_from_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
196 let response = global_http_client()
197 .get(format!(
198 "https://{CLOUDFLARE_PROOF_PARAMETER_DOMAIN}/{name}"
199 ))
200 .send()
201 .await
202 .context("Failed to fetch param file from Cloudflare R2")?;
203
204 if !response.status().is_success() {
205 bail!(
206 "Failed to fetch param file from Cloudflare R2: {:?}",
207 response
208 );
209 }
210 let tmp = tempfile::NamedTempFile::new_in(path.parent().context("No parent dir")?)
216 .context("Failed to create temp file")?
217 .into_temp_path();
218
219 let reader = response
220 .bytes_stream()
221 .map_err(std::io::Error::other)
222 .into_async_read();
223
224 let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
225 futures::io::copy(reader, &mut writer)
226 .await
227 .context("Failed to write to temp file")?;
228
229 writer.flush().await.context("Failed to flush temp file")?;
230 writer.close().await.context("Failed to close temp file")?;
231
232 tmp.persist(path).context("Failed to persist temp file")?;
233 Ok(())
234}