use std::io::{self, BufWriter, Write};
use std::path::{Component, Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use iroh_blobs::api::blobs::{AddPathOptions, ExportProgressItem, ImportMode as IrohImportMode};
use iroh_blobs::api::downloader::{DownloadProgressItem, Downloader, Shuffled};
use iroh_blobs::format::collection::Collection;
use iroh_blobs::store::fs::FsStore;
use iroh_blobs::util::connection_pool::Options as PoolOptions;
use iroh_blobs::{BlobFormat, Hash, HashAndFormat};
use n0_future::StreamExt;
use url::Url;
use crate::Error;
use radicle_artifact_core::cid::{self as cid_utils, ArtifactKind, Cid};
use radicle_artifact_core::keys::EndpointId;
use radicle_artifact_core::protocol::FetchProgress;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
fn http_agent() -> ureq::Agent {
let config = ureq::Agent::config_builder()
.timeout_connect(Some(CONNECT_TIMEOUT))
.timeout_recv_response(Some(CONNECT_TIMEOUT))
.build();
ureq::Agent::new_with_config(config)
}
fn fetch_http(agent: &ureq::Agent, url: &Url, dest: &mut dyn Write) -> Result<(), Error> {
let resp = agent
.get(url.as_str())
.call()
.map_err(|e| Error::Http(e.to_string()))?;
let mut reader = resp.into_body().into_reader();
io::copy(&mut reader, dest).map_err(Error::Io)?;
Ok(())
}
pub(crate) fn pool_options() -> PoolOptions {
PoolOptions {
connect_timeout: CONNECT_TIMEOUT,
..PoolOptions::default()
}
}
pub(crate) async fn download_iroh_to_store(
downloader: &Downloader,
store: &FsStore,
hash_and_format: HashAndFormat,
providers: Vec<EndpointId>,
mut on_progress: impl FnMut(FetchProgress),
) -> Result<(), Vec<Error>> {
let providers: Vec<iroh::EndpointId> =
providers.into_iter().map(EndpointId::into_inner).collect();
let progress = downloader.download(hash_and_format, Shuffled::new(providers));
let mut stream = match progress.stream().await {
Ok(s) => s,
Err(e) => return Err(vec![Error::Iroh(format!("downloader rpc: {e}"))]),
};
let mut errors: Vec<Error> = Vec::new();
let mut fatal: Option<Error> = None;
let mut deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
loop {
match tokio::time::timeout_at(deadline, stream.next()).await {
Err(_) => {
fatal = Some(Error::Iroh(format!(
"no progress for {}s",
IDLE_TIMEOUT.as_secs()
)));
break;
}
Ok(None) => break, Ok(Some(item)) => match item {
DownloadProgressItem::TryProvider { id, .. } => {
on_progress(FetchProgress::TryingLocation {
endpoint_id: EndpointId::from(id),
});
}
DownloadProgressItem::ProviderFailed { id, .. } => {
let endpoint_id = EndpointId::from(id);
on_progress(FetchProgress::LocationFailed { endpoint_id });
errors.push(Error::Iroh(format!(
"location {endpoint_id}: download failed"
)));
}
DownloadProgressItem::Progress(offset) => {
on_progress(FetchProgress::Downloading {
offset,
total: None,
});
deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
}
DownloadProgressItem::PartComplete { .. } => {
deadline = tokio::time::Instant::now() + IDLE_TIMEOUT;
}
DownloadProgressItem::DownloadError => {
fatal = Some(Error::Iroh("download error".into()));
break;
}
DownloadProgressItem::Error(cause) => {
fatal = Some(Error::Iroh(format!("{cause}")));
break;
}
},
}
}
let done = match store.blobs().has(hash_and_format.hash).await {
Ok(complete) => complete,
Err(e) => {
errors.push(Error::Iroh(format!("store completeness check: {e}")));
false
}
};
if done {
Ok(())
} else {
if let Some(e) = fatal {
errors.push(e);
}
Err(errors)
}
}
struct ScopedPath(PathBuf);
impl Drop for ScopedPath {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
let _ = std::fs::remove_dir_all(&self.0);
}
}
const HTTP_PROGRESS_INTERVAL: Duration = Duration::from_secs(1);
fn partial_sibling(dest: &Path) -> PathBuf {
let name = dest
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_else(|| "artifact".to_string());
let staged = format!(".{name}.rad-partial");
match dest.parent() {
Some(parent) if !parent.as_os_str().is_empty() => parent.join(staged),
_ => PathBuf::from(staged),
}
}
async fn export_streamed(
store: &FsStore,
hash: Hash,
target: &Path,
entry: Option<&str>,
on_progress: &mut impl FnMut(FetchProgress),
) -> Result<u64, Error> {
let mut stream = store.blobs().export(hash, target).stream().await;
let mut size: Option<u64> = None;
while let Some(item) = stream.next().await {
match item {
ExportProgressItem::Size(s) => size = Some(s),
ExportProgressItem::CopyProgress(offset) => on_progress(FetchProgress::Exporting {
offset,
total: size,
entry: entry.map(str::to_owned),
}),
ExportProgressItem::Done => {}
ExportProgressItem::Error(e) => return Err(Error::Iroh(format!("export: {e}"))),
}
}
let bytes = size.unwrap_or_else(|| std::fs::metadata(target).map(|m| m.len()).unwrap_or(0));
on_progress(FetchProgress::Exporting {
offset: bytes,
total: Some(bytes),
entry: entry.map(str::to_owned),
});
Ok(bytes)
}
pub(crate) async fn export_blob_to(
store: &FsStore,
hash: Hash,
dest: &Path,
mut on_progress: impl FnMut(FetchProgress),
) -> Result<u64, Error> {
let tmp = partial_sibling(dest);
let _tmp = ScopedPath(tmp.clone());
let bytes = export_streamed(store, hash, &tmp, None, &mut on_progress)
.await
.map_err(|e| {
Error::Iroh(format!(
"failed to export from the iroh store to '{}': {e}",
dest.display()
))
})?;
std::fs::rename(&tmp, dest).map_err(Error::Io)?;
Ok(bytes)
}
pub(crate) async fn export_collection_to(
store: &FsStore,
hash: Hash,
dest_dir: &Path,
on_progress: impl FnMut(FetchProgress),
) -> Result<u64, Error> {
let collection = Collection::load(hash, store.as_ref())
.await
.map_err(|e| Error::Iroh(format!("load collection: {e}")))?;
let staging = partial_sibling(dest_dir);
let _ = std::fs::remove_dir_all(&staging);
let _staging = ScopedPath(staging.clone());
std::fs::create_dir_all(&staging).map_err(Error::Io)?;
let total = export_members(store, &collection, &staging, on_progress)
.await
.map_err(|e| {
Error::Iroh(format!(
"failed to export from the iroh store to '{}': {e}",
dest_dir.display()
))
})?;
if dest_dir.exists() {
std::fs::remove_dir_all(dest_dir).map_err(Error::Io)?;
}
std::fs::rename(&staging, dest_dir).map_err(Error::Io)?;
Ok(total)
}
async fn export_members(
store: &FsStore,
collection: &Collection,
dir: &Path,
mut on_progress: impl FnMut(FetchProgress),
) -> Result<u64, Error> {
let mut total = 0u64;
for (name, entry_hash) in collection.iter() {
let target = safe_join(dir, name)
.ok_or_else(|| Error::Iroh(format!("unsafe collection member name: {name:?}")))?;
if let Some(parent) = target.parent() {
std::fs::create_dir_all(parent).map_err(Error::Io)?;
}
let name_ref: &str = name;
let bytes = export_streamed(
store,
*entry_hash,
&target,
Some(name_ref),
&mut on_progress,
)
.await
.map_err(|e| Error::Iroh(format!("export '{name}': {e}")))?;
total = total.saturating_add(bytes);
}
Ok(total)
}
fn safe_join(base: &Path, name: &str) -> Option<PathBuf> {
let rel = Path::new(name);
for component in rel.components() {
match component {
Component::Normal(_) | Component::CurDir => {}
Component::RootDir | Component::Prefix(_) | Component::ParentDir => return None,
}
}
Some(base.join(rel))
}
pub(crate) async fn http_to_store(
store: &FsStore,
url: &Url,
expected: &Cid,
mut on_progress: impl FnMut(FetchProgress),
) -> Result<Hash, Error> {
let expected_hash = cid_utils::cid_to_blake3_hash(expected)?;
static SEQ: AtomicU64 = AtomicU64::new(0);
let n = SEQ.fetch_add(1, Ordering::Relaxed);
let tmp = std::env::temp_dir().join(format!(
".rad-artifact-http-{}-{}-{n}",
expected_hash.to_hex(),
std::process::id(),
));
let _tmp = ScopedPath(tmp.clone());
let url_owned = url.clone();
let tmp_dl = tmp.clone();
let download = tokio::task::spawn_blocking(move || -> Result<(), Error> {
let agent = http_agent();
let file = std::fs::File::create(&tmp_dl).map_err(Error::Io)?;
let mut writer = BufWriter::new(file);
fetch_http(&agent, &url_owned, &mut writer)?;
writer.flush().map_err(Error::Io)
});
tokio::pin!(download);
let joined = loop {
tokio::select! {
res = &mut download => break res,
_ = tokio::time::sleep(HTTP_PROGRESS_INTERVAL) => {
let offset = std::fs::metadata(&tmp).map(|m| m.len()).unwrap_or(0);
on_progress(FetchProgress::Downloading { offset, total: None });
}
}
};
joined.map_err(|e| Error::Iroh(format!("http download task: {e}")))??;
let tt = store
.add_path_with_opts(AddPathOptions {
path: tmp.clone(),
format: BlobFormat::Raw,
mode: IrohImportMode::Copy,
})
.temp_tag()
.await
.map_err(|e| Error::Iroh(format!("import http blob: {e}")))?;
let hash = tt.hash();
if blake3::Hash::from(hash) != expected_hash {
let actual = cid_utils::blake3_hash_to_cid(hash.into(), ArtifactKind::Blob);
return Err(Error::CidMismatch {
expected: expected.to_string(),
actual: actual.to_string(),
});
}
Ok(hash)
}
#[cfg(test)]
mod tests {
use std::io::Read;
use std::net::TcpListener;
use super::*;
fn serve_once(body: Vec<u8>) -> (Url, std::thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let handle = std::thread::spawn(move || {
if let Ok((mut stream, _)) = listener.accept() {
let mut buf = [0u8; 1024];
let _ = stream.read(&mut buf);
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
);
let _ = stream.write_all(header.as_bytes());
let _ = stream.write_all(&body);
let _ = stream.flush();
}
});
let url = Url::parse(&format!("http://{addr}/blob")).unwrap();
(url, handle)
}
fn runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}
#[test]
fn http_to_store_imports_matching_blob() {
runtime().block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let body = b"hello over http".to_vec();
let expected = cid_utils::blake3_hash_to_cid(blake3::hash(&body), ArtifactKind::Blob);
let (url, server) = serve_once(body.clone());
let hash = http_to_store(&store, &url, &expected, |_| {})
.await
.unwrap();
assert_eq!(hash, Hash::new(&body));
assert!(store.blobs().has(hash).await.unwrap());
server.join().unwrap();
});
}
#[test]
fn http_to_store_rejects_mismatch() {
runtime().block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let expected =
cid_utils::blake3_hash_to_cid(blake3::hash(b"expected"), ArtifactKind::Blob);
let (url, server) = serve_once(b"something else entirely".to_vec());
let err = http_to_store(&store, &url, &expected, |_| {})
.await
.unwrap_err();
assert!(matches!(err, Error::CidMismatch { .. }), "got {err:?}");
server.join().unwrap();
});
}
async fn store_collection(store: &FsStore, members: &[(&str, &[u8])]) -> Hash {
let mut pairs = Vec::new();
let mut tags = Vec::new();
for (name, data) in members {
let tt = store.add_bytes(data.to_vec()).temp_tag().await.unwrap();
pairs.push((name.to_string(), tt.hash()));
tags.push(tt);
}
let collection = Collection::from_iter(pairs);
let root = collection.store(store.as_ref()).await.unwrap();
root.hash()
}
#[test]
fn export_collection_writes_members() {
runtime().block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let root =
store_collection(&store, &[("a.txt", b"alpha"), ("sub/b.txt", b"beta")]).await;
let dest = tmp.path().join("out");
let total = export_collection_to(&store, root, &dest, |_| {})
.await
.unwrap();
assert_eq!(total, (b"alpha".len() + b"beta".len()) as u64);
assert_eq!(std::fs::read(dest.join("a.txt")).unwrap(), b"alpha");
assert_eq!(std::fs::read(dest.join("sub/b.txt")).unwrap(), b"beta");
assert!(!partial_sibling(&dest).exists());
});
}
#[test]
fn export_collection_rejects_unsafe_member() {
runtime().block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let root = store_collection(&store, &[("../escape", b"evil")]).await;
let dest = tmp.path().join("out");
let err = export_collection_to(&store, root, &dest, |_| {})
.await
.unwrap_err();
assert!(
matches!(&err, Error::Iroh(m) if m.contains("unsafe collection member name")),
"got {err:?}"
);
assert!(!dest.exists());
assert!(!partial_sibling(&dest).exists());
assert!(!tmp.path().join("escape").exists());
});
}
#[test]
fn scoped_path_removes_file_on_drop() {
let tmp = tempfile::tempdir().unwrap();
let file = tmp.path().join("staged");
std::fs::write(&file, b"partial").unwrap();
{
let _guard = ScopedPath(file.clone());
assert!(file.exists());
}
assert!(!file.exists());
}
#[test]
fn scoped_path_removes_dir_on_drop() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().join("staging");
std::fs::create_dir(&dir).unwrap();
std::fs::write(dir.join("inner"), b"x").unwrap();
{
let _guard = ScopedPath(dir.clone());
assert!(dir.exists());
}
assert!(!dir.exists());
}
#[test]
fn safe_join_allows_nested_paths() {
let base = Path::new("/dest");
assert_eq!(
safe_join(base, "a/b/c.txt"),
Some(PathBuf::from("/dest/a/b/c.txt"))
);
assert!(safe_join(base, "file.bin").is_some());
assert!(safe_join(base, "./file.bin").is_some());
}
#[test]
fn safe_join_rejects_traversal_and_absolute() {
let base = Path::new("/dest");
assert!(safe_join(base, "../escape").is_none());
assert!(safe_join(base, "a/../../escape").is_none());
assert!(safe_join(base, "..").is_none());
assert!(safe_join(base, "/etc/passwd").is_none());
}
}