forest/utils/proofs_api/
paramfetch.rs1use std::{
11 io::{self, ErrorKind},
12 path::{Path, PathBuf},
13 sync::{Arc, LazyLock},
14};
15
16use crate::{
17 shim::sector::SectorSize,
18 utils::{
19 misc::env::is_env_truthy,
20 net::{download_ipfs_file_trustlessly, global_http_client},
21 },
22};
23use anyhow::{Context, bail};
24use backon::{ExponentialBuilder, Retryable};
25use futures::{AsyncWriteExt, TryStreamExt, stream::FuturesUnordered};
26use tokio::{
27 fs::{self},
28 sync::Mutex,
29};
30use tracing::{debug, info, warn};
31
32use super::parameters::{
33 DEFAULT_PARAMETERS, PROOFS_PARAMETER_CACHE_ENV, ParameterData, ParameterMap,
34 check_parameter_file, param_dir,
35};
36
37const DEFAULT_IPFS_GATEWAY: &str = "https://proofs.filecoin.io/ipfs/";
40const CLOUDFLARE_PROOF_PARAMETER_DOMAIN: &str = "filecoin-proof-parameters.chainsafe.dev";
42
43const PROOFS_ONLY_IPFS_GATEWAY_ENV: &str = "FOREST_PROOFS_ONLY_IPFS_GATEWAY";
45
46const IPFS_GATEWAY_ENV: &str = "IPFS_GATEWAY";
50
51pub enum SectorSizeOpt {
53 All,
55 Keys,
57 Size(SectorSize),
59}
60
61pub async fn ensure_proof_params_downloaded() -> anyhow::Result<()> {
63 #[cfg(test)]
64 if is_env_truthy("FOREST_TEST_SKIP_PROOF_PARAM_CHECK") {
65 return Ok(());
66 }
67
68 let data_dir = std::env::var(PROOFS_PARAMETER_CACHE_ENV).unwrap_or_default();
69 if data_dir.is_empty() {
70 anyhow::bail!("Proof parameter data dir is not set");
71 }
72 static RUN_ONCE: LazyLock<Mutex<bool>> = LazyLock::new(|| Mutex::new(false));
73 let mut run_once = RUN_ONCE.lock().await;
74 if *run_once {
75 Ok(())
76 } else {
77 get_params_default(Path::new(&data_dir), SectorSizeOpt::Keys, false).await?;
78 *run_once = true;
79 Ok(())
80 }
81}
82
83pub async fn get_params(
86 data_dir: &Path,
87 param_json: &str,
88 storage_size: SectorSizeOpt,
89 dry_run: bool,
90) -> Result<(), anyhow::Error> {
91 if dry_run {
93 println!("{}", param_dir(data_dir).to_string_lossy());
94 return Ok(());
95 }
96
97 fs::create_dir_all(param_dir(data_dir)).await?;
98
99 let params: ParameterMap = serde_json::from_str(param_json)?;
100
101 FuturesUnordered::from_iter(
102 params
103 .into_iter()
104 .filter(|(name, info)| match storage_size {
105 SectorSizeOpt::Keys => !name.ends_with("params"),
106 SectorSizeOpt::Size(size) => {
107 size as u64 == info.sector_size || !name.ends_with(".params")
108 }
109 SectorSizeOpt::All => true,
110 })
111 .map(|(name, info)| async move {
112 let data_dir_clone = data_dir.to_owned();
113 fetch_verify_params(&data_dir_clone, &name, Arc::new(info)).await
114 }),
115 )
116 .try_collect::<Vec<_>>()
117 .await?;
118
119 Ok(())
120}
121
122#[inline]
125pub async fn get_params_default(
126 data_dir: &Path,
127 storage_size: SectorSizeOpt,
128 dry_run: bool,
129) -> Result<(), anyhow::Error> {
130 get_params(data_dir, DEFAULT_PARAMETERS, storage_size, dry_run).await
131}
132
133async fn fetch_verify_params(
134 data_dir: &Path,
135 name: &str,
136 info: Arc<ParameterData>,
137) -> Result<(), anyhow::Error> {
138 let path: PathBuf = param_dir(data_dir).join(name);
139
140 match check_parameter_file(&path, &info).await {
141 Ok(()) => return Ok(()),
142 Err(e) => {
143 if let Some(e) = e.downcast_ref::<io::Error>() {
144 if e.kind() == ErrorKind::NotFound {
145 }
147 } else {
148 warn!("Error checking file: {e:?}");
149 }
150 }
151 }
152
153 if is_env_truthy(PROOFS_ONLY_IPFS_GATEWAY_ENV) {
154 fetch_params_ipfs_gateway(&path, &info).await?;
155 } else if let Err(e) = fetch_params_cloudflare(name, &path).await {
156 warn!("Failed to fetch param file from Cloudflare R2: {e:?}. Falling back to IPFS gateway",);
157 fetch_params_ipfs_gateway(&path, &info).await?;
158 }
159
160 check_parameter_file(&path, &info).await?;
161 Ok(())
162}
163
164async fn fetch_params_ipfs_gateway(path: &Path, info: &ParameterData) -> anyhow::Result<()> {
165 let gateway = std::env::var(IPFS_GATEWAY_ENV)
166 .unwrap_or_else(|_| DEFAULT_IPFS_GATEWAY.to_owned())
167 .parse()?;
168 info!(
169 "Fetching param file {path} from {gateway}",
170 path = path.display()
171 );
172 let result = (|| download_ipfs_file_trustlessly(&info.cid, &gateway, path))
173 .retry(ExponentialBuilder::default())
174 .notify(|err, dur| {
175 debug!(
176 "retrying download_ipfs_file_trustlessly {err} after {}",
177 humantime::format_duration(dur)
178 );
179 })
180 .await;
181
182 debug!(
183 "Done fetching param file {path} from {gateway}",
184 path = path.display(),
185 );
186 result
187}
188
189async fn fetch_params_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
191 info!("Fetching param file {name} from Cloudflare R2 {CLOUDFLARE_PROOF_PARAMETER_DOMAIN}");
192 let result = (|| download_from_cloudflare(name, path))
193 .retry(ExponentialBuilder::default())
194 .notify(|err, dur| {
195 debug!(
196 "retrying download_from_cloudflare {err} after {}",
197 humantime::format_duration(dur)
198 );
199 })
200 .await;
201 debug!(
202 "Done fetching param file {} from Cloudflare",
203 path.display()
204 );
205 result
206}
207
208async fn download_from_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
211 let response = global_http_client()
212 .get(format!(
213 "https://{CLOUDFLARE_PROOF_PARAMETER_DOMAIN}/{name}"
214 ))
215 .send()
216 .await
217 .context("Failed to fetch param file from Cloudflare R2")?;
218
219 if !response.status().is_success() {
220 bail!(
221 "Failed to fetch param file from Cloudflare R2: {:?}",
222 response
223 );
224 }
225 let tmp = tempfile::NamedTempFile::new_in(path.parent().context("No parent dir")?)
231 .context("Failed to create temp file")?
232 .into_temp_path();
233
234 let reader = response
235 .bytes_stream()
236 .map_err(std::io::Error::other)
237 .into_async_read();
238
239 let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
240 futures::io::copy(reader, &mut writer)
241 .await
242 .context("Failed to write to temp file")?;
243
244 writer.flush().await.context("Failed to flush temp file")?;
245 writer.close().await.context("Failed to close temp file")?;
246
247 tmp.persist(path).context("Failed to persist temp file")?;
248 Ok(())
249}