Skip to main content

radicle_artifact_node/
fetch.rs

1//! Reusable fetch + export core for the node.
2//!
3//! These are the building blocks the node's `Fetch`/`Export` handlers call
4//! against their persistent [`FsStore`] and shared
5//! [`Downloader`]: a multi-provider
6//! iroh download (`download_iroh_to_store`), an HTTP-into-store download
7//! (`http_to_store`, so HTTP blobs become seedable), and atomic export
8//! helpers (`export_blob_to`, `export_collection_to`). None of them own
9//! the endpoint, store, or runtime — the caller supplies those.
10//!
11//! Progress is reported through a [`FetchProgress`] callback so the caller
12//! decides how to surface it: the node forwards frames over the control
13//! socket, which the CLI then renders as a progress bar.
14//!
15//! Both transports bound how long an unreachable provider can tie up a
16//! fetch. HTTP sets connect and receive-response timeouts on the
17//! `ureq::Agent`; iroh applies a connect timeout on the connection pool
18//! (`pool_options`) and an idle-progress timeout around the Downloader
19//! stream. Mid-body HTTP stalls are intentionally not bounded — ureq 3.3
20//! only offers a total-body timeout, which would break large downloads.
21//!
22//! Per-provider iroh causes are not preserved (the [`iroh_blobs`]
23//! downloader drops them on `ProviderFailed`); set
24//! `RUST_LOG=iroh_blobs=debug` in the node to surface them
25//! (subscriber is installed in `rad-artifact node start --foreground`).
26
27use std::io::{self, BufWriter, Write};
28use std::path::{Component, Path, PathBuf};
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::Duration;
31
32use iroh_blobs::api::blobs::{AddPathOptions, ExportProgressItem, ImportMode as IrohImportMode};
33use iroh_blobs::api::downloader::{DownloadProgressItem, Downloader, Shuffled};
34use iroh_blobs::format::collection::Collection;
35use iroh_blobs::store::fs::FsStore;
36use iroh_blobs::util::connection_pool::Options as PoolOptions;
37use iroh_blobs::{BlobFormat, Hash, HashAndFormat};
38use n0_future::StreamExt;
39use url::Url;
40
41use crate::Error;
42use radicle_artifact_core::cid::{self as cid_utils, ArtifactKind, Cid};
43use radicle_artifact_core::keys::EndpointId;
44use radicle_artifact_core::protocol::FetchProgress;
45
46/// Per-provider connect bound. A provider that cannot establish a usable
47/// connection (HTTP TCP handshake or iroh QUIC+relay path) within this
48/// window is abandoned so the next one is tried. More generous than
49/// iroh's 1s default to accommodate slower relay paths without giving up
50/// on reachable but cold providers.
51const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
52
53/// Idle (no-progress) bound. Reset only on `Progress` / `PartComplete`
54/// events — control events like `TryProvider` or `ProviderFailed` do not
55/// count as progress, so a cascade of dead providers can't keep the
56/// download alive past this window.
57const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
58
59/// Build a ureq agent with connect and response-header timeouts.
60///
61/// No total-body timeout: large artifact downloads must be allowed to
62/// stream for as long as they make progress, and ureq 3.3 does not offer
63/// a per-read socket timeout that would bound only stalls.
64fn http_agent() -> ureq::Agent {
65    let config = ureq::Agent::config_builder()
66        .timeout_connect(Some(CONNECT_TIMEOUT))
67        .timeout_recv_response(Some(CONNECT_TIMEOUT))
68        .build();
69    ureq::Agent::new_with_config(config)
70}
71
72/// Fetch HTTP content at `url` into `dest` using a pre-configured agent.
73fn fetch_http(agent: &ureq::Agent, url: &Url, dest: &mut dyn Write) -> Result<(), Error> {
74    let resp = agent
75        .get(url.as_str())
76        .call()
77        .map_err(|e| Error::Http(e.to_string()))?;
78    let mut reader = resp.into_body().into_reader();
79    io::copy(&mut reader, dest).map_err(Error::Io)?;
80    Ok(())
81}
82
83/// Build an iroh connection pool with our connect-timeout override.
84pub(crate) fn pool_options() -> PoolOptions {
85    PoolOptions {
86        connect_timeout: CONNECT_TIMEOUT,
87        ..PoolOptions::default()
88    }
89}
90
91/// Run a multi-provider iroh download into `store` using a caller-supplied
92/// [`Downloader`].
93///
94/// The reusable core behind the node's persistent-store fetch handler — it
95/// owns neither the endpoint nor the store, so partial progress persists
96/// across providers and across fetches. Progress is reported through
97/// `on_progress`; the node forwards [`FetchProgress`] frames over the
98/// control socket.
99///
100/// Returns per-provider errors on failure. `DownloadProgressItem::ProviderFailed`
101/// intentionally drops the underlying cause — the errors vector therefore
102/// records only which provider failed plus the final stream-level cause if
103/// the download terminates fatally.
104pub(crate) async fn download_iroh_to_store(
105    downloader: &Downloader,
106    store: &FsStore,
107    hash_and_format: HashAndFormat,
108    providers: Vec<EndpointId>,
109    mut on_progress: impl FnMut(FetchProgress),
110) -> Result<(), Vec<Error>> {
111    // Convert to iroh's bare type at the iroh-blobs API boundary.
112    let providers: Vec<iroh::EndpointId> =
113        providers.into_iter().map(EndpointId::into_inner).collect();
114
115    // below we shuffle to avoid overloading a single provider, but the
116    // trade-off is that we may lose freshness ordering if providers are prioritized by the caller.
117    let progress = downloader.download(hash_and_format, Shuffled::new(providers));
118    let mut stream = match progress.stream().await {
119        Ok(s) => s,
120        Err(e) => return Err(vec![Error::Iroh(format!("downloader rpc: {e}"))]),
121    };
122
123    let mut errors: Vec<Error> = Vec::new();
124    let mut fatal: Option<Error> = None;
125    // Idle deadline is only bumped by data-movement events (`Progress`,
126    // `PartComplete`). Control events from the downloader — `TryProvider`,
127    // `ProviderFailed` — do not reset it, so a stream of dead providers
128    // can't silently extend the wait beyond IDLE_TIMEOUT of real progress.
129    let mut deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
130    loop {
131        match tokio::time::timeout_at(deadline, stream.next()).await {
132            Err(_) => {
133                fatal = Some(Error::Iroh(format!(
134                    "no progress for {}s",
135                    IDLE_TIMEOUT.as_secs()
136                )));
137                break;
138            }
139            Ok(None) => break, // Download completed!
140            Ok(Some(item)) => match item {
141                DownloadProgressItem::TryProvider { id, .. } => {
142                    on_progress(FetchProgress::TryingLocation {
143                        endpoint_id: EndpointId::from(id),
144                    });
145                }
146                DownloadProgressItem::ProviderFailed { id, .. } => {
147                    let endpoint_id = EndpointId::from(id);
148                    on_progress(FetchProgress::LocationFailed { endpoint_id });
149                    errors.push(Error::Iroh(format!(
150                        "location {endpoint_id}: download failed"
151                    )));
152                }
153                DownloadProgressItem::Progress(offset) => {
154                    on_progress(FetchProgress::Downloading {
155                        offset,
156                        total: None,
157                    });
158                    deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
159                }
160                DownloadProgressItem::PartComplete { .. } => {
161                    deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
162                }
163                DownloadProgressItem::DownloadError => {
164                    fatal = Some(Error::Iroh("download error".into()));
165                    break;
166                }
167                DownloadProgressItem::Error(cause) => {
168                    fatal = Some(Error::Iroh(format!("{cause}")));
169                    break;
170                }
171            },
172        }
173    }
174
175    // Trust the store: if nothing is missing, we have the content — even
176    // if individual providers emitted `ProviderFailed` along the way. If
177    // the completeness check itself errors, treat the attempt as failed
178    // and surface the cause so it isn't silently swallowed.
179    let done = match store.blobs().has(hash_and_format.hash).await {
180        Ok(complete) => complete,
181        Err(e) => {
182            errors.push(Error::Iroh(format!("store completeness check: {e}")));
183            false
184        }
185    };
186    if done {
187        Ok(())
188    } else {
189        if let Some(e) = fatal {
190            errors.push(e);
191        }
192        Err(errors)
193    }
194}
195
196/// Removes a path (file or directory) when dropped — best-effort cleanup
197/// that also runs if the owning future is cancelled mid-operation, so an
198/// aborted export/download leaves no stray temp file or staging directory.
199struct ScopedPath(PathBuf);
200
201impl Drop for ScopedPath {
202    fn drop(&mut self) {
203        // One of these matches the path's kind; the other is a harmless no-op.
204        let _ = std::fs::remove_file(&self.0);
205        let _ = std::fs::remove_dir_all(&self.0);
206    }
207}
208
209/// How often [`http_to_store`] emits a `Downloading` frame while the
210/// blocking body transfer runs — frequent enough for responsive progress
211/// and to keep the caller's idle timer alive on large blobs.
212const HTTP_PROGRESS_INTERVAL: Duration = Duration::from_secs(1);
213
214/// Hidden sibling of `dest` used as the staging path for an atomic export:
215/// `.<name>.rad-partial` in the same directory, so renaming it onto `dest`
216/// is a same-filesystem move. Distinct suffix (not a replaced extension) so
217/// it won't clobber a user file that happens to share `dest`'s stem.
218fn partial_sibling(dest: &Path) -> PathBuf {
219    let name = dest
220        .file_name()
221        .map(|n| n.to_string_lossy().into_owned())
222        .unwrap_or_else(|| "artifact".to_string());
223    let staged = format!(".{name}.rad-partial");
224    match dest.parent() {
225        Some(parent) if !parent.as_os_str().is_empty() => parent.join(staged),
226        _ => PathBuf::from(staged),
227    }
228}
229
230/// Export one blob to `target`, forwarding byte progress as
231/// [`FetchProgress::Exporting`] frames so a long copy keeps the caller's
232/// idle timer alive. `entry` names the collection member (None for a single
233/// blob). Returns the blob size.
234///
235/// Copy-on-write filesystems complete instantly and emit no
236/// `CopyProgress` (so no intermediate frames), which is fine — there is no
237/// slow window to report. A plain cross-filesystem copy does emit them.
238async fn export_streamed(
239    store: &FsStore,
240    hash: Hash,
241    target: &Path,
242    entry: Option<&str>,
243    on_progress: &mut impl FnMut(FetchProgress),
244) -> Result<u64, Error> {
245    let mut stream = store.blobs().export(hash, target).stream().await;
246    let mut size: Option<u64> = None;
247    while let Some(item) = stream.next().await {
248        match item {
249            ExportProgressItem::Size(s) => size = Some(s),
250            ExportProgressItem::CopyProgress(offset) => on_progress(FetchProgress::Exporting {
251                offset,
252                total: size,
253                entry: entry.map(str::to_owned),
254            }),
255            ExportProgressItem::Done => {}
256            ExportProgressItem::Error(e) => return Err(Error::Iroh(format!("export: {e}"))),
257        }
258    }
259    let bytes = size.unwrap_or_else(|| std::fs::metadata(target).map(|m| m.len()).unwrap_or(0));
260    // Always emit a terminal frame for this entry so completion is visible
261    // even when the copy was instant (no CopyProgress events).
262    on_progress(FetchProgress::Exporting {
263        offset: bytes,
264        total: Some(bytes),
265        entry: entry.map(str::to_owned),
266    });
267    Ok(bytes)
268}
269
270/// Export a single blob from `store` to `dest`, atomically.
271///
272/// Writes to a sibling staging file and renames on success, so a kill or
273/// cancellation mid-export never leaves a truncated file at `dest`. The
274/// staging file is removed on every exit path (including future-drop and
275/// a failed rename). Returns the bytes written and streams progress.
276pub(crate) async fn export_blob_to(
277    store: &FsStore,
278    hash: Hash,
279    dest: &Path,
280    mut on_progress: impl FnMut(FetchProgress),
281) -> Result<u64, Error> {
282    let tmp = partial_sibling(dest);
283    let _tmp = ScopedPath(tmp.clone());
284    let bytes = export_streamed(store, hash, &tmp, None, &mut on_progress)
285        .await
286        .map_err(|e| {
287            Error::Iroh(format!(
288                "failed to export from the iroh store to '{}': {e}",
289                dest.display()
290            ))
291        })?;
292    std::fs::rename(&tmp, dest).map_err(Error::Io)?;
293    Ok(bytes)
294}
295
296/// Export a hashseq collection from `store`, atomically.
297///
298/// Members are written into a sibling staging directory; the whole
299/// directory is renamed onto `dest_dir` only once every member is exported,
300/// so a killed or disconnected export never leaves a partially-populated
301/// destination. Member names are sanitized — a name that is absolute or
302/// escapes the directory (a `..` component) is rejected rather than written
303/// outside `dest_dir`. Returns the total bytes written.
304pub(crate) async fn export_collection_to(
305    store: &FsStore,
306    hash: Hash,
307    dest_dir: &Path,
308    on_progress: impl FnMut(FetchProgress),
309) -> Result<u64, Error> {
310    let collection = Collection::load(hash, store.as_ref())
311        .await
312        .map_err(|e| Error::Iroh(format!("load collection: {e}")))?;
313
314    let staging = partial_sibling(dest_dir);
315    // Clear any leftover staging from a previously-crashed export, then
316    // guard it so a cancelled export cleans up too.
317    let _ = std::fs::remove_dir_all(&staging);
318    let _staging = ScopedPath(staging.clone());
319    std::fs::create_dir_all(&staging).map_err(Error::Io)?;
320
321    let total = export_members(store, &collection, &staging, on_progress)
322        .await
323        .map_err(|e| {
324            Error::Iroh(format!(
325                "failed to export from the iroh store to '{}': {e}",
326                dest_dir.display()
327            ))
328        })?;
329    // Swap staging into place. staging is a sibling of dest_dir, so the
330    // rename is a same-filesystem move; replace any existing destination
331    // first (rename onto a non-empty dir fails).
332    if dest_dir.exists() {
333        std::fs::remove_dir_all(dest_dir).map_err(Error::Io)?;
334    }
335    std::fs::rename(&staging, dest_dir).map_err(Error::Io)?;
336    Ok(total)
337}
338
339/// Export each collection member into `dir`, rejecting unsafe names.
340async fn export_members(
341    store: &FsStore,
342    collection: &Collection,
343    dir: &Path,
344    mut on_progress: impl FnMut(FetchProgress),
345) -> Result<u64, Error> {
346    let mut total = 0u64;
347    for (name, entry_hash) in collection.iter() {
348        let target = safe_join(dir, name)
349            .ok_or_else(|| Error::Iroh(format!("unsafe collection member name: {name:?}")))?;
350        if let Some(parent) = target.parent() {
351            std::fs::create_dir_all(parent).map_err(Error::Io)?;
352        }
353        let name_ref: &str = name;
354        let bytes = export_streamed(
355            store,
356            *entry_hash,
357            &target,
358            Some(name_ref),
359            &mut on_progress,
360        )
361        .await
362        .map_err(|e| Error::Iroh(format!("export '{name}': {e}")))?;
363        total = total.saturating_add(bytes);
364    }
365    Ok(total)
366}
367
368/// Join `name` under `base`, returning `None` when the name is absolute or
369/// would escape `base` (a root, prefix, or `..` component). Normal nested
370/// paths (`a/b/c`) and `.` are allowed — collection member names are
371/// untrusted (a malicious provider controls them), so this guards against
372/// writing outside the destination.
373fn safe_join(base: &Path, name: &str) -> Option<PathBuf> {
374    let rel = Path::new(name);
375    for component in rel.components() {
376        match component {
377            Component::Normal(_) | Component::CurDir => {}
378            Component::RootDir | Component::Prefix(_) | Component::ParentDir => return None,
379        }
380    }
381    Some(base.join(rel))
382}
383
384/// Download an HTTP(S) blob into `store`, verifying it matches `expected`.
385///
386/// Routes HTTP content through the store (rather than straight to disk) so
387/// an HTTP-fetched blob becomes a first-class, seedable blob — identical to
388/// one fetched over iroh. ureq is blocking, so the network read runs on a
389/// blocking thread; the file is then imported (copied) into the store and
390/// its hash checked against the CID. Blob-only: collections require iroh.
391///
392/// The returned bytes are protected only by the import's temp tag, which is
393/// dropped here — the caller must already hold a tag covering the expected
394/// hash (it does: the fetch handler tags before downloading).
395///
396/// While the body transfers, `Downloading` frames are emitted from the
397/// async side every [`HTTP_PROGRESS_INTERVAL`] (offset = the temp file's
398/// current size), keeping the caller's idle timer alive on large blobs. The
399/// caller emits the initial `Connecting` frame.
400///
401/// Cancellation note: the blocking download cannot be aborted mid-flight
402/// (dropping a `spawn_blocking` handle detaches the thread), so on a client
403/// disconnect the transfer runs to its bounded end. The temp file is always
404/// cleaned up via [`ScopedPath`], so no stray file is leaked. A stalled body
405/// keeps emitting frames (the file size stops growing), so an HTTP stall is
406/// not surfaced as an idle timeout — HTTP has no body-stall bound anyway.
407pub(crate) async fn http_to_store(
408    store: &FsStore,
409    url: &Url,
410    expected: &Cid,
411    mut on_progress: impl FnMut(FetchProgress),
412) -> Result<Hash, Error> {
413    let expected_hash = cid_utils::cid_to_blake3_hash(expected)?;
414    // Per-operation unique temp name: process id + a monotonic counter so
415    // concurrent fetches of the same CID don't share (and clobber) a path.
416    static SEQ: AtomicU64 = AtomicU64::new(0);
417    let n = SEQ.fetch_add(1, Ordering::Relaxed);
418    let tmp = std::env::temp_dir().join(format!(
419        ".rad-artifact-http-{}-{}-{n}",
420        expected_hash.to_hex(),
421        std::process::id(),
422    ));
423    let _tmp = ScopedPath(tmp.clone());
424
425    // ureq is blocking; run the download off the async runtime and report
426    // progress from here by polling the temp file's size on an interval.
427    let url_owned = url.clone();
428    let tmp_dl = tmp.clone();
429    let download = tokio::task::spawn_blocking(move || -> Result<(), Error> {
430        let agent = http_agent();
431        let file = std::fs::File::create(&tmp_dl).map_err(Error::Io)?;
432        let mut writer = BufWriter::new(file);
433        fetch_http(&agent, &url_owned, &mut writer)?;
434        writer.flush().map_err(Error::Io)
435    });
436    tokio::pin!(download);
437    let joined = loop {
438        tokio::select! {
439            res = &mut download => break res,
440            _ = tokio::time::sleep(HTTP_PROGRESS_INTERVAL) => {
441                let offset = std::fs::metadata(&tmp).map(|m| m.len()).unwrap_or(0);
442                on_progress(FetchProgress::Downloading { offset, total: None });
443            }
444        }
445    };
446    joined.map_err(|e| Error::Iroh(format!("http download task: {e}")))??;
447
448    // Import the file into the store (copy), then verify the hash.
449    let tt = store
450        .add_path_with_opts(AddPathOptions {
451            path: tmp.clone(),
452            format: BlobFormat::Raw,
453            mode: IrohImportMode::Copy,
454        })
455        .temp_tag()
456        .await
457        .map_err(|e| Error::Iroh(format!("import http blob: {e}")))?;
458    let hash = tt.hash();
459    if blake3::Hash::from(hash) != expected_hash {
460        let actual = cid_utils::blake3_hash_to_cid(hash.into(), ArtifactKind::Blob);
461        return Err(Error::CidMismatch {
462            expected: expected.to_string(),
463            actual: actual.to_string(),
464        });
465    }
466    Ok(hash)
467}
468
469#[cfg(test)]
470mod tests {
471    use std::io::Read;
472    use std::net::TcpListener;
473
474    use super::*;
475
476    /// Spawn a one-shot HTTP/1.1 server that serves `body` once and exits.
477    /// Returns the URL to GET and the server thread's join handle.
478    fn serve_once(body: Vec<u8>) -> (Url, std::thread::JoinHandle<()>) {
479        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
480        let addr = listener.local_addr().unwrap();
481        let handle = std::thread::spawn(move || {
482            if let Ok((mut stream, _)) = listener.accept() {
483                // Drain the request so the client's write side completes.
484                let mut buf = [0u8; 1024];
485                let _ = stream.read(&mut buf);
486                let header = format!(
487                    "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
488                    body.len()
489                );
490                let _ = stream.write_all(header.as_bytes());
491                let _ = stream.write_all(&body);
492                let _ = stream.flush();
493            }
494        });
495        let url = Url::parse(&format!("http://{addr}/blob")).unwrap();
496        (url, handle)
497    }
498
499    fn runtime() -> tokio::runtime::Runtime {
500        tokio::runtime::Builder::new_multi_thread()
501            .enable_all()
502            .build()
503            .unwrap()
504    }
505
506    /// `http_to_store` downloads the body, imports it, and returns the hash
507    /// when the bytes match the expected CID.
508    #[test]
509    fn http_to_store_imports_matching_blob() {
510        runtime().block_on(async {
511            let tmp = tempfile::tempdir().unwrap();
512            let store = FsStore::load(tmp.path()).await.unwrap();
513
514            let body = b"hello over http".to_vec();
515            let expected = cid_utils::blake3_hash_to_cid(blake3::hash(&body), ArtifactKind::Blob);
516            let (url, server) = serve_once(body.clone());
517
518            let hash = http_to_store(&store, &url, &expected, |_| {})
519                .await
520                .unwrap();
521            assert_eq!(hash, Hash::new(&body));
522            assert!(store.blobs().has(hash).await.unwrap());
523            server.join().unwrap();
524        });
525    }
526
527    /// A body whose bytes don't hash to `expected` surfaces `CidMismatch`
528    /// rather than silently importing the wrong content.
529    #[test]
530    fn http_to_store_rejects_mismatch() {
531        runtime().block_on(async {
532            let tmp = tempfile::tempdir().unwrap();
533            let store = FsStore::load(tmp.path()).await.unwrap();
534
535            // Expect one thing, serve another.
536            let expected =
537                cid_utils::blake3_hash_to_cid(blake3::hash(b"expected"), ArtifactKind::Blob);
538            let (url, server) = serve_once(b"something else entirely".to_vec());
539
540            let err = http_to_store(&store, &url, &expected, |_| {})
541                .await
542                .unwrap_err();
543            assert!(matches!(err, Error::CidMismatch { .. }), "got {err:?}");
544            server.join().unwrap();
545        });
546    }
547
548    /// Build a small collection in the store and seed three child blobs.
549    /// Returns the root hash. Member names are caller-supplied so tests can
550    /// inject unsafe ones.
551    async fn store_collection(store: &FsStore, members: &[(&str, &[u8])]) -> Hash {
552        let mut pairs = Vec::new();
553        // Hold child tags until the root tag covers them.
554        let mut tags = Vec::new();
555        for (name, data) in members {
556            let tt = store.add_bytes(data.to_vec()).temp_tag().await.unwrap();
557            pairs.push((name.to_string(), tt.hash()));
558            tags.push(tt);
559        }
560        let collection = Collection::from_iter(pairs);
561        let root = collection.store(store.as_ref()).await.unwrap();
562        root.hash()
563    }
564
565    /// `export_collection_to` writes every member to disk, creating nested
566    /// directories, and renames the staging tree onto the destination.
567    #[test]
568    fn export_collection_writes_members() {
569        runtime().block_on(async {
570            let tmp = tempfile::tempdir().unwrap();
571            let store = FsStore::load(tmp.path()).await.unwrap();
572            let root =
573                store_collection(&store, &[("a.txt", b"alpha"), ("sub/b.txt", b"beta")]).await;
574
575            let dest = tmp.path().join("out");
576            let total = export_collection_to(&store, root, &dest, |_| {})
577                .await
578                .unwrap();
579
580            assert_eq!(total, (b"alpha".len() + b"beta".len()) as u64);
581            assert_eq!(std::fs::read(dest.join("a.txt")).unwrap(), b"alpha");
582            assert_eq!(std::fs::read(dest.join("sub/b.txt")).unwrap(), b"beta");
583            // Staging sibling is gone once the swap completes.
584            assert!(!partial_sibling(&dest).exists());
585        });
586    }
587
588    /// A member name that escapes the destination (`..`) aborts the export;
589    /// the destination is never created and the staging tree is cleaned up.
590    #[test]
591    fn export_collection_rejects_unsafe_member() {
592        runtime().block_on(async {
593            let tmp = tempfile::tempdir().unwrap();
594            let store = FsStore::load(tmp.path()).await.unwrap();
595            let root = store_collection(&store, &[("../escape", b"evil")]).await;
596
597            let dest = tmp.path().join("out");
598            let err = export_collection_to(&store, root, &dest, |_| {})
599                .await
600                .unwrap_err();
601            assert!(
602                matches!(&err, Error::Iroh(m) if m.contains("unsafe collection member name")),
603                "got {err:?}"
604            );
605            // Neither the destination nor the staging tree survives.
606            assert!(!dest.exists());
607            assert!(!partial_sibling(&dest).exists());
608            // And nothing was written outside the destination.
609            assert!(!tmp.path().join("escape").exists());
610        });
611    }
612
613    /// `ScopedPath` removes a file when dropped — the cancellation-cleanup
614    /// guarantee the export/download paths rely on.
615    #[test]
616    fn scoped_path_removes_file_on_drop() {
617        let tmp = tempfile::tempdir().unwrap();
618        let file = tmp.path().join("staged");
619        std::fs::write(&file, b"partial").unwrap();
620        {
621            let _guard = ScopedPath(file.clone());
622            assert!(file.exists());
623        }
624        assert!(!file.exists());
625    }
626
627    /// `ScopedPath` also removes a staging directory when dropped.
628    #[test]
629    fn scoped_path_removes_dir_on_drop() {
630        let tmp = tempfile::tempdir().unwrap();
631        let dir = tmp.path().join("staging");
632        std::fs::create_dir(&dir).unwrap();
633        std::fs::write(dir.join("inner"), b"x").unwrap();
634        {
635            let _guard = ScopedPath(dir.clone());
636            assert!(dir.exists());
637        }
638        assert!(!dir.exists());
639    }
640
641    #[test]
642    fn safe_join_allows_nested_paths() {
643        let base = Path::new("/dest");
644        assert_eq!(
645            safe_join(base, "a/b/c.txt"),
646            Some(PathBuf::from("/dest/a/b/c.txt"))
647        );
648        assert!(safe_join(base, "file.bin").is_some());
649        assert!(safe_join(base, "./file.bin").is_some());
650    }
651
652    #[test]
653    fn safe_join_rejects_traversal_and_absolute() {
654        let base = Path::new("/dest");
655        // Leading, interior, and bare parent-dir escapes are all rejected.
656        assert!(safe_join(base, "../escape").is_none());
657        assert!(safe_join(base, "a/../../escape").is_none());
658        assert!(safe_join(base, "..").is_none());
659        // Absolute paths must not replace the base.
660        assert!(safe_join(base, "/etc/passwd").is_none());
661    }
662}