1use std::io::{self, BufWriter, Write};
28use std::path::{Component, Path, PathBuf};
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::Duration;
31
32use iroh_blobs::api::blobs::{AddPathOptions, ExportProgressItem, ImportMode as IrohImportMode};
33use iroh_blobs::api::downloader::{DownloadProgressItem, Downloader, Shuffled};
34use iroh_blobs::format::collection::Collection;
35use iroh_blobs::store::fs::FsStore;
36use iroh_blobs::util::connection_pool::Options as PoolOptions;
37use iroh_blobs::{BlobFormat, Hash, HashAndFormat};
38use n0_future::StreamExt;
39use url::Url;
40
41use crate::Error;
42use radicle_artifact_core::cid::{self as cid_utils, ArtifactKind, Cid};
43use radicle_artifact_core::keys::EndpointId;
44use radicle_artifact_core::protocol::FetchProgress;
45
46const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
52
53const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
58
59fn http_agent() -> ureq::Agent {
65 let config = ureq::Agent::config_builder()
66 .timeout_connect(Some(CONNECT_TIMEOUT))
67 .timeout_recv_response(Some(CONNECT_TIMEOUT))
68 .build();
69 ureq::Agent::new_with_config(config)
70}
71
72fn fetch_http(agent: &ureq::Agent, url: &Url, dest: &mut dyn Write) -> Result<(), Error> {
74 let resp = agent
75 .get(url.as_str())
76 .call()
77 .map_err(|e| Error::Http(e.to_string()))?;
78 let mut reader = resp.into_body().into_reader();
79 io::copy(&mut reader, dest).map_err(Error::Io)?;
80 Ok(())
81}
82
83pub(crate) fn pool_options() -> PoolOptions {
85 PoolOptions {
86 connect_timeout: CONNECT_TIMEOUT,
87 ..PoolOptions::default()
88 }
89}
90
91pub(crate) async fn download_iroh_to_store(
105 downloader: &Downloader,
106 store: &FsStore,
107 hash_and_format: HashAndFormat,
108 providers: Vec<EndpointId>,
109 mut on_progress: impl FnMut(FetchProgress),
110) -> Result<(), Vec<Error>> {
111 let providers: Vec<iroh::EndpointId> =
113 providers.into_iter().map(EndpointId::into_inner).collect();
114
115 let progress = downloader.download(hash_and_format, Shuffled::new(providers));
118 let mut stream = match progress.stream().await {
119 Ok(s) => s,
120 Err(e) => return Err(vec![Error::Iroh(format!("downloader rpc: {e}"))]),
121 };
122
123 let mut errors: Vec<Error> = Vec::new();
124 let mut fatal: Option<Error> = None;
125 let mut deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
130 loop {
131 match tokio::time::timeout_at(deadline, stream.next()).await {
132 Err(_) => {
133 fatal = Some(Error::Iroh(format!(
134 "no progress for {}s",
135 IDLE_TIMEOUT.as_secs()
136 )));
137 break;
138 }
139 Ok(None) => break, Ok(Some(item)) => match item {
141 DownloadProgressItem::TryProvider { id, .. } => {
142 on_progress(FetchProgress::TryingLocation {
143 endpoint_id: EndpointId::from(id),
144 });
145 }
146 DownloadProgressItem::ProviderFailed { id, .. } => {
147 let endpoint_id = EndpointId::from(id);
148 on_progress(FetchProgress::LocationFailed { endpoint_id });
149 errors.push(Error::Iroh(format!(
150 "location {endpoint_id}: download failed"
151 )));
152 }
153 DownloadProgressItem::Progress(offset) => {
154 on_progress(FetchProgress::Downloading {
155 offset,
156 total: None,
157 });
158 deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
159 }
160 DownloadProgressItem::PartComplete { .. } => {
161 deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
162 }
163 DownloadProgressItem::DownloadError => {
164 fatal = Some(Error::Iroh("download error".into()));
165 break;
166 }
167 DownloadProgressItem::Error(cause) => {
168 fatal = Some(Error::Iroh(format!("{cause}")));
169 break;
170 }
171 },
172 }
173 }
174
175 let done = match store.blobs().has(hash_and_format.hash).await {
180 Ok(complete) => complete,
181 Err(e) => {
182 errors.push(Error::Iroh(format!("store completeness check: {e}")));
183 false
184 }
185 };
186 if done {
187 Ok(())
188 } else {
189 if let Some(e) = fatal {
190 errors.push(e);
191 }
192 Err(errors)
193 }
194}
195
196struct ScopedPath(PathBuf);
200
201impl Drop for ScopedPath {
202 fn drop(&mut self) {
203 let _ = std::fs::remove_file(&self.0);
205 let _ = std::fs::remove_dir_all(&self.0);
206 }
207}
208
209const HTTP_PROGRESS_INTERVAL: Duration = Duration::from_secs(1);
213
214fn partial_sibling(dest: &Path) -> PathBuf {
219 let name = dest
220 .file_name()
221 .map(|n| n.to_string_lossy().into_owned())
222 .unwrap_or_else(|| "artifact".to_string());
223 let staged = format!(".{name}.rad-partial");
224 match dest.parent() {
225 Some(parent) if !parent.as_os_str().is_empty() => parent.join(staged),
226 _ => PathBuf::from(staged),
227 }
228}
229
230async fn export_streamed(
239 store: &FsStore,
240 hash: Hash,
241 target: &Path,
242 entry: Option<&str>,
243 on_progress: &mut impl FnMut(FetchProgress),
244) -> Result<u64, Error> {
245 let mut stream = store.blobs().export(hash, target).stream().await;
246 let mut size: Option<u64> = None;
247 while let Some(item) = stream.next().await {
248 match item {
249 ExportProgressItem::Size(s) => size = Some(s),
250 ExportProgressItem::CopyProgress(offset) => on_progress(FetchProgress::Exporting {
251 offset,
252 total: size,
253 entry: entry.map(str::to_owned),
254 }),
255 ExportProgressItem::Done => {}
256 ExportProgressItem::Error(e) => return Err(Error::Iroh(format!("export: {e}"))),
257 }
258 }
259 let bytes = size.unwrap_or_else(|| std::fs::metadata(target).map(|m| m.len()).unwrap_or(0));
260 on_progress(FetchProgress::Exporting {
263 offset: bytes,
264 total: Some(bytes),
265 entry: entry.map(str::to_owned),
266 });
267 Ok(bytes)
268}
269
270pub(crate) async fn export_blob_to(
277 store: &FsStore,
278 hash: Hash,
279 dest: &Path,
280 mut on_progress: impl FnMut(FetchProgress),
281) -> Result<u64, Error> {
282 let tmp = partial_sibling(dest);
283 let _tmp = ScopedPath(tmp.clone());
284 let bytes = export_streamed(store, hash, &tmp, None, &mut on_progress)
285 .await
286 .map_err(|e| {
287 Error::Iroh(format!(
288 "failed to export from the iroh store to '{}': {e}",
289 dest.display()
290 ))
291 })?;
292 std::fs::rename(&tmp, dest).map_err(Error::Io)?;
293 Ok(bytes)
294}
295
296pub(crate) async fn export_collection_to(
305 store: &FsStore,
306 hash: Hash,
307 dest_dir: &Path,
308 on_progress: impl FnMut(FetchProgress),
309) -> Result<u64, Error> {
310 let collection = Collection::load(hash, store.as_ref())
311 .await
312 .map_err(|e| Error::Iroh(format!("load collection: {e}")))?;
313
314 let staging = partial_sibling(dest_dir);
315 let _ = std::fs::remove_dir_all(&staging);
318 let _staging = ScopedPath(staging.clone());
319 std::fs::create_dir_all(&staging).map_err(Error::Io)?;
320
321 let total = export_members(store, &collection, &staging, on_progress)
322 .await
323 .map_err(|e| {
324 Error::Iroh(format!(
325 "failed to export from the iroh store to '{}': {e}",
326 dest_dir.display()
327 ))
328 })?;
329 if dest_dir.exists() {
333 std::fs::remove_dir_all(dest_dir).map_err(Error::Io)?;
334 }
335 std::fs::rename(&staging, dest_dir).map_err(Error::Io)?;
336 Ok(total)
337}
338
339async fn export_members(
341 store: &FsStore,
342 collection: &Collection,
343 dir: &Path,
344 mut on_progress: impl FnMut(FetchProgress),
345) -> Result<u64, Error> {
346 let mut total = 0u64;
347 for (name, entry_hash) in collection.iter() {
348 let target = safe_join(dir, name)
349 .ok_or_else(|| Error::Iroh(format!("unsafe collection member name: {name:?}")))?;
350 if let Some(parent) = target.parent() {
351 std::fs::create_dir_all(parent).map_err(Error::Io)?;
352 }
353 let name_ref: &str = name;
354 let bytes = export_streamed(
355 store,
356 *entry_hash,
357 &target,
358 Some(name_ref),
359 &mut on_progress,
360 )
361 .await
362 .map_err(|e| Error::Iroh(format!("export '{name}': {e}")))?;
363 total = total.saturating_add(bytes);
364 }
365 Ok(total)
366}
367
368fn safe_join(base: &Path, name: &str) -> Option<PathBuf> {
374 let rel = Path::new(name);
375 for component in rel.components() {
376 match component {
377 Component::Normal(_) | Component::CurDir => {}
378 Component::RootDir | Component::Prefix(_) | Component::ParentDir => return None,
379 }
380 }
381 Some(base.join(rel))
382}
383
384pub(crate) async fn http_to_store(
408 store: &FsStore,
409 url: &Url,
410 expected: &Cid,
411 mut on_progress: impl FnMut(FetchProgress),
412) -> Result<Hash, Error> {
413 let expected_hash = cid_utils::cid_to_blake3_hash(expected)?;
414 static SEQ: AtomicU64 = AtomicU64::new(0);
417 let n = SEQ.fetch_add(1, Ordering::Relaxed);
418 let tmp = std::env::temp_dir().join(format!(
419 ".rad-artifact-http-{}-{}-{n}",
420 expected_hash.to_hex(),
421 std::process::id(),
422 ));
423 let _tmp = ScopedPath(tmp.clone());
424
425 let url_owned = url.clone();
428 let tmp_dl = tmp.clone();
429 let download = tokio::task::spawn_blocking(move || -> Result<(), Error> {
430 let agent = http_agent();
431 let file = std::fs::File::create(&tmp_dl).map_err(Error::Io)?;
432 let mut writer = BufWriter::new(file);
433 fetch_http(&agent, &url_owned, &mut writer)?;
434 writer.flush().map_err(Error::Io)
435 });
436 tokio::pin!(download);
437 let joined = loop {
438 tokio::select! {
439 res = &mut download => break res,
440 _ = tokio::time::sleep(HTTP_PROGRESS_INTERVAL) => {
441 let offset = std::fs::metadata(&tmp).map(|m| m.len()).unwrap_or(0);
442 on_progress(FetchProgress::Downloading { offset, total: None });
443 }
444 }
445 };
446 joined.map_err(|e| Error::Iroh(format!("http download task: {e}")))??;
447
448 let tt = store
450 .add_path_with_opts(AddPathOptions {
451 path: tmp.clone(),
452 format: BlobFormat::Raw,
453 mode: IrohImportMode::Copy,
454 })
455 .temp_tag()
456 .await
457 .map_err(|e| Error::Iroh(format!("import http blob: {e}")))?;
458 let hash = tt.hash();
459 if blake3::Hash::from(hash) != expected_hash {
460 let actual = cid_utils::blake3_hash_to_cid(hash.into(), ArtifactKind::Blob);
461 return Err(Error::CidMismatch {
462 expected: expected.to_string(),
463 actual: actual.to_string(),
464 });
465 }
466 Ok(hash)
467}
468
469#[cfg(test)]
470mod tests {
471 use std::io::Read;
472 use std::net::TcpListener;
473
474 use super::*;
475
476 fn serve_once(body: Vec<u8>) -> (Url, std::thread::JoinHandle<()>) {
479 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
480 let addr = listener.local_addr().unwrap();
481 let handle = std::thread::spawn(move || {
482 if let Ok((mut stream, _)) = listener.accept() {
483 let mut buf = [0u8; 1024];
485 let _ = stream.read(&mut buf);
486 let header = format!(
487 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
488 body.len()
489 );
490 let _ = stream.write_all(header.as_bytes());
491 let _ = stream.write_all(&body);
492 let _ = stream.flush();
493 }
494 });
495 let url = Url::parse(&format!("http://{addr}/blob")).unwrap();
496 (url, handle)
497 }
498
499 fn runtime() -> tokio::runtime::Runtime {
500 tokio::runtime::Builder::new_multi_thread()
501 .enable_all()
502 .build()
503 .unwrap()
504 }
505
506 #[test]
509 fn http_to_store_imports_matching_blob() {
510 runtime().block_on(async {
511 let tmp = tempfile::tempdir().unwrap();
512 let store = FsStore::load(tmp.path()).await.unwrap();
513
514 let body = b"hello over http".to_vec();
515 let expected = cid_utils::blake3_hash_to_cid(blake3::hash(&body), ArtifactKind::Blob);
516 let (url, server) = serve_once(body.clone());
517
518 let hash = http_to_store(&store, &url, &expected, |_| {})
519 .await
520 .unwrap();
521 assert_eq!(hash, Hash::new(&body));
522 assert!(store.blobs().has(hash).await.unwrap());
523 server.join().unwrap();
524 });
525 }
526
527 #[test]
530 fn http_to_store_rejects_mismatch() {
531 runtime().block_on(async {
532 let tmp = tempfile::tempdir().unwrap();
533 let store = FsStore::load(tmp.path()).await.unwrap();
534
535 let expected =
537 cid_utils::blake3_hash_to_cid(blake3::hash(b"expected"), ArtifactKind::Blob);
538 let (url, server) = serve_once(b"something else entirely".to_vec());
539
540 let err = http_to_store(&store, &url, &expected, |_| {})
541 .await
542 .unwrap_err();
543 assert!(matches!(err, Error::CidMismatch { .. }), "got {err:?}");
544 server.join().unwrap();
545 });
546 }
547
548 async fn store_collection(store: &FsStore, members: &[(&str, &[u8])]) -> Hash {
552 let mut pairs = Vec::new();
553 let mut tags = Vec::new();
555 for (name, data) in members {
556 let tt = store.add_bytes(data.to_vec()).temp_tag().await.unwrap();
557 pairs.push((name.to_string(), tt.hash()));
558 tags.push(tt);
559 }
560 let collection = Collection::from_iter(pairs);
561 let root = collection.store(store.as_ref()).await.unwrap();
562 root.hash()
563 }
564
565 #[test]
568 fn export_collection_writes_members() {
569 runtime().block_on(async {
570 let tmp = tempfile::tempdir().unwrap();
571 let store = FsStore::load(tmp.path()).await.unwrap();
572 let root =
573 store_collection(&store, &[("a.txt", b"alpha"), ("sub/b.txt", b"beta")]).await;
574
575 let dest = tmp.path().join("out");
576 let total = export_collection_to(&store, root, &dest, |_| {})
577 .await
578 .unwrap();
579
580 assert_eq!(total, (b"alpha".len() + b"beta".len()) as u64);
581 assert_eq!(std::fs::read(dest.join("a.txt")).unwrap(), b"alpha");
582 assert_eq!(std::fs::read(dest.join("sub/b.txt")).unwrap(), b"beta");
583 assert!(!partial_sibling(&dest).exists());
585 });
586 }
587
588 #[test]
591 fn export_collection_rejects_unsafe_member() {
592 runtime().block_on(async {
593 let tmp = tempfile::tempdir().unwrap();
594 let store = FsStore::load(tmp.path()).await.unwrap();
595 let root = store_collection(&store, &[("../escape", b"evil")]).await;
596
597 let dest = tmp.path().join("out");
598 let err = export_collection_to(&store, root, &dest, |_| {})
599 .await
600 .unwrap_err();
601 assert!(
602 matches!(&err, Error::Iroh(m) if m.contains("unsafe collection member name")),
603 "got {err:?}"
604 );
605 assert!(!dest.exists());
607 assert!(!partial_sibling(&dest).exists());
608 assert!(!tmp.path().join("escape").exists());
610 });
611 }
612
613 #[test]
616 fn scoped_path_removes_file_on_drop() {
617 let tmp = tempfile::tempdir().unwrap();
618 let file = tmp.path().join("staged");
619 std::fs::write(&file, b"partial").unwrap();
620 {
621 let _guard = ScopedPath(file.clone());
622 assert!(file.exists());
623 }
624 assert!(!file.exists());
625 }
626
627 #[test]
629 fn scoped_path_removes_dir_on_drop() {
630 let tmp = tempfile::tempdir().unwrap();
631 let dir = tmp.path().join("staging");
632 std::fs::create_dir(&dir).unwrap();
633 std::fs::write(dir.join("inner"), b"x").unwrap();
634 {
635 let _guard = ScopedPath(dir.clone());
636 assert!(dir.exists());
637 }
638 assert!(!dir.exists());
639 }
640
641 #[test]
642 fn safe_join_allows_nested_paths() {
643 let base = Path::new("/dest");
644 assert_eq!(
645 safe_join(base, "a/b/c.txt"),
646 Some(PathBuf::from("/dest/a/b/c.txt"))
647 );
648 assert!(safe_join(base, "file.bin").is_some());
649 assert!(safe_join(base, "./file.bin").is_some());
650 }
651
652 #[test]
653 fn safe_join_rejects_traversal_and_absolute() {
654 let base = Path::new("/dest");
655 assert!(safe_join(base, "../escape").is_none());
657 assert!(safe_join(base, "a/../../escape").is_none());
658 assert!(safe_join(base, "..").is_none());
659 assert!(safe_join(base, "/etc/passwd").is_none());
661 }
662}