use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
use crate::iroh::EndpointConfig;
use crate::Error;
use iroh::protocol::Router;
use iroh_blobs::api::blobs::{AddPathOptions, ImportMode as IrohImportMode};
use iroh_blobs::api::{Store, TempTag};
use iroh_blobs::format::collection::Collection;
use iroh_blobs::store::fs::{options::Options as FsStoreOptions, FsStore};
use iroh_blobs::store::GcConfig;
use iroh_blobs::{BlobFormat, BlobsProtocol, Hash, HashAndFormat};
use n0_future::StreamExt;
use radicle::git::Oid;
use radicle::identity::RepoId;
use radicle_artifact_core::cid::{self as cid_utils, ArtifactKind, Cid};
pub use radicle_artifact_core::protocol::ImportMode;
fn to_iroh_import_mode(m: ImportMode) -> IrohImportMode {
match m {
ImportMode::Copy => IrohImportMode::Copy,
ImportMode::Reference => IrohImportMode::TryReference,
}
}
pub const ARTIFACTS_DIR: &str = "artifacts";
pub const STORE_DIR: &str = "store";
const SEEDED_TAG_V1: u8 = 0x01;
const ONLINE_TIMEOUT: Duration = Duration::from_secs(10);
const GC_INTERVAL: Duration = Duration::from_secs(60 * 60);
pub struct Seeder {
pub blobs: FsStore,
pub router: Router,
}
pub async fn bootstrap(home: &Path, secret: iroh::SecretKey) -> Result<Seeder, Error> {
let dir = home.join(ARTIFACTS_DIR);
std::fs::create_dir_all(&dir).map_err(Error::Io)?;
let store_dir = dir.join(STORE_DIR);
let db_path = store_dir.join("blobs.db");
let mut options = FsStoreOptions::new(&store_dir);
options.gc = Some(GcConfig {
interval: GC_INTERVAL,
add_protected: None,
});
let blobs = FsStore::load_with_opts(db_path, options)
.await
.map_err(|e| Error::Iroh(format!("FsStore load: {e}")))?;
let preset = EndpointConfig::from_env()?;
tracing::info!("iroh endpoint config: {preset}");
let endpoint = iroh::Endpoint::builder(preset)
.secret_key(secret)
.bind()
.await
.map_err(|e| Error::Iroh(format!("endpoint bind: {e}")))?;
if tokio::time::timeout(ONLINE_TIMEOUT, endpoint.online())
.await
.is_err()
{
tracing::warn!("endpoint not relay-connected after {ONLINE_TIMEOUT:?}; continuing anyway");
}
let blobs_protocol = BlobsProtocol::new(&blobs, None);
let router = Router::builder(endpoint)
.accept(iroh_blobs::ALPN, blobs_protocol)
.spawn();
Ok(Seeder { blobs, router })
}
fn seeded_tag(rid: &RepoId, release: &Oid, cid: &Cid) -> Vec<u8> {
let mut out = seeded_release_prefix(rid, release);
out.extend_from_slice(&cid.as_inner().to_bytes());
out
}
fn seeded_rid_prefix(rid: &RepoId) -> Vec<u8> {
let rid_b = rid_bytes(rid);
let mut out = Vec::with_capacity(2 + rid_b.len());
out.push(SEEDED_TAG_V1);
out.push(rid_b.len() as u8);
out.extend_from_slice(rid_b);
out
}
fn seeded_release_prefix(rid: &RepoId, release: &Oid) -> Vec<u8> {
let mut out = seeded_rid_prefix(rid);
let rel_b = AsRef::<[u8]>::as_ref(release);
out.push(rel_b.len() as u8);
out.extend_from_slice(rel_b);
out
}
fn rid_bytes(rid: &RepoId) -> &[u8] {
AsRef::<[u8]>::as_ref(&**rid)
}
fn oid_from_bytes(b: &[u8]) -> Option<Oid> {
match b.len() {
20 => Some(Oid::from_sha1(b.try_into().ok()?)),
_ => None,
}
}
fn parse_seeded_tag(name: &[u8]) -> Option<(RepoId, Oid, Cid)> {
let rest = name.strip_prefix(&[SEEDED_TAG_V1])?;
let (rid_b, rest) = take_len_prefixed(rest)?;
let (rel_b, cid_b) = take_len_prefixed(rest)?;
let rid = RepoId::from(oid_from_bytes(rid_b)?);
let release = oid_from_bytes(rel_b)?;
let cid = Cid::from(cid::Cid::try_from(cid_b).ok()?);
Some((rid, release, cid))
}
fn take_len_prefixed(buf: &[u8]) -> Option<(&[u8], &[u8])> {
let (&len, rest) = buf.split_first()?;
let len = usize::from(len);
if rest.len() < len {
return None;
}
Some(rest.split_at(len))
}
fn add_opts(path: std::path::PathBuf, mode: ImportMode) -> AddPathOptions {
AddPathOptions {
path,
format: BlobFormat::Raw,
mode: to_iroh_import_mode(mode),
}
}
pub async fn import_blob(
store: &Store,
path: &Path,
expected: &Cid,
mode: ImportMode,
) -> Result<(Hash, TempTag), Error> {
let abs = dunce::canonicalize(path).map_err(|e| Error::Iroh(format!("canonicalize: {e}")))?;
let tt = store
.add_path_with_opts(add_opts(abs, mode))
.temp_tag()
.await
.map_err(|e| Error::Iroh(format!("import blob: {e}")))?;
let hash = tt.hash();
let actual = cid_utils::blake3_hash_to_cid(hash.into(), ArtifactKind::Blob);
if actual != *expected {
return Err(Error::CidMismatch {
expected: expected.to_string(),
actual: actual.to_string(),
});
}
Ok((hash, tt))
}
pub async fn import_collection(
store: &Store,
dir: &Path,
expected: &Cid,
mode: ImportMode,
) -> Result<(Hash, TempTag), Error> {
let entries = cid_utils::canonical_walk(dir).map_err(Error::Io)?;
let mut pairs: Vec<(String, Hash)> = Vec::new();
let mut file_tags = Vec::with_capacity(entries.len());
for (name, abs) in entries {
let tt = store
.add_path_with_opts(add_opts(abs, mode))
.temp_tag()
.await
.map_err(|e| Error::Iroh(format!("import file {name}: {e}")))?;
pairs.push((name, tt.hash()));
file_tags.push(tt);
}
let collection = Collection::from_iter(pairs);
let root_tag = collection
.store(store)
.await
.map_err(|e| Error::Iroh(format!("store collection: {e}")))?;
drop(file_tags);
let hash = root_tag.hash();
let actual = cid_utils::blake3_hash_to_cid(hash.into(), ArtifactKind::Collection);
if actual != *expected {
return Err(Error::CidMismatch {
expected: expected.to_string(),
actual: actual.to_string(),
});
}
Ok((hash, root_tag))
}
pub async fn tag_seeded(
store: &Store,
rid: &RepoId,
release: &Oid,
cid: &Cid,
hash: Hash,
) -> Result<(), Error> {
let kind = cid_utils::artifact_kind(cid)?;
let value = match kind {
ArtifactKind::Blob => HashAndFormat::raw(hash),
ArtifactKind::Collection => HashAndFormat::hash_seq(hash),
};
store
.tags()
.set(seeded_tag(rid, release, cid), value)
.await
.map_err(|e| Error::Iroh(format!("set seeded tag: {e}")))?;
Ok(())
}
pub async fn seed_artifact(
store: &Store,
rid: &RepoId,
release: &Oid,
cid: &Cid,
path: &Path,
kind: ArtifactKind,
mode: ImportMode,
) -> Result<Hash, Error> {
let (hash, _tt) = match kind {
ArtifactKind::Blob => import_blob(store, path, cid, mode).await?,
ArtifactKind::Collection => import_collection(store, path, cid, mode).await?,
};
tag_seeded(store, rid, release, cid, hash).await?;
Ok(hash)
}
pub async fn untag_seeded(
store: &Store,
rid: &RepoId,
release: &Oid,
cid: &Cid,
) -> Result<bool, Error> {
let removed = store
.tags()
.delete(seeded_tag(rid, release, cid))
.await
.map_err(|e| Error::Iroh(format!("delete seeded tag: {e}")))?;
Ok(removed > 0)
}
pub async fn untag_all(store: &Store, rid: &RepoId, cid: &Cid) -> Result<usize, Error> {
let prefix = seeded_rid_prefix(rid);
let mut stream = store
.tags()
.list_prefix(&prefix)
.await
.map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
let mut names = Vec::new();
while let Some(item) = stream.next().await {
let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
if let Some((_, _, tag_cid)) = parse_seeded_tag(info.name.as_ref()) {
if &tag_cid == cid {
names.push(info.name);
}
}
}
let mut removed = 0;
for name in names {
removed += store
.tags()
.delete(name)
.await
.map_err(|e| Error::Iroh(format!("delete seeded tag: {e}")))?
as usize;
}
Ok(removed)
}
pub async fn is_seeded(
store: &Store,
rid: &RepoId,
release: &Oid,
cid: &Cid,
) -> Result<bool, Error> {
let info = store
.tags()
.get(seeded_tag(rid, release, cid))
.await
.map_err(|e| Error::Iroh(format!("get seeded tag: {e}")))?;
Ok(info.is_some())
}
pub async fn is_seeded_any(store: &Store, rid: &RepoId, cid: &Cid) -> Result<bool, Error> {
let prefix = seeded_rid_prefix(rid);
let mut stream = store
.tags()
.list_prefix(&prefix)
.await
.map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
while let Some(item) = stream.next().await {
let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
if let Some((_, _, tag_cid)) = parse_seeded_tag(info.name.as_ref()) {
if &tag_cid == cid {
return Ok(true);
}
}
}
Ok(false)
}
pub async fn seeded_cids(store: &Store, rid: &RepoId) -> Result<HashMap<Cid, Hash>, Error> {
let prefix = seeded_rid_prefix(rid);
let mut stream = store
.tags()
.list_prefix(&prefix)
.await
.map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
let mut out = HashMap::new();
while let Some(item) = stream.next().await {
let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
if let Some((_, _, cid)) = parse_seeded_tag(info.name.as_ref()) {
out.insert(cid, info.hash);
}
}
Ok(out)
}
pub async fn all_seeded(store: &Store) -> Result<Vec<(RepoId, Oid, Cid, Hash)>, Error> {
let mut stream = store
.tags()
.list_prefix([SEEDED_TAG_V1])
.await
.map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
let mut out = Vec::new();
while let Some(item) = stream.next().await {
let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
if let Some((rid, release, cid)) = parse_seeded_tag(info.name.as_ref()) {
out.push((rid, release, cid, info.hash));
}
}
Ok(out)
}
pub async fn artifact_size_for(store: &Store, cid: &Cid, hash: Hash) -> u64 {
let Ok(kind) = cid_utils::artifact_kind(cid) else {
return 0;
};
match kind {
ArtifactKind::Blob => blob_size(store, hash).await,
ArtifactKind::Collection => match Collection::load(hash, store).await {
Ok(collection) => {
let mut total = 0u64;
for (_, child) in collection.iter() {
total = total.saturating_add(blob_size(store, *child).await);
}
total
}
Err(_) => 0,
},
}
}
async fn blob_size(store: &Store, hash: Hash) -> u64 {
use iroh_blobs::api::proto::BlobStatus;
match store.blobs().status(hash).await {
Ok(BlobStatus::Complete { size }) => size,
Ok(BlobStatus::Partial { size }) => size.unwrap_or(0),
_ => 0,
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::str::FromStr;
use super::*;
fn blob_cid(data: &[u8]) -> Cid {
use cid::multihash::Multihash;
let digest = blake3::hash(data);
let mh = Multihash::<64>::wrap(cid_utils::HASH_CODE_BLAKE3, digest.as_bytes()).unwrap();
Cid::from(cid::Cid::new_v1(cid_utils::RAW_CODEC, mh))
}
fn rid_pair() -> (RepoId, RepoId) {
let a = RepoId::from_str("rad:z2u2CP3ZJzB7ZqE8jHrau19yjpdip").unwrap();
let b = RepoId::from_str("rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5").unwrap();
assert_ne!(a, b);
(a, b)
}
fn release(n: u8) -> Oid {
Oid::from_str(&format!("{n:040x}")).unwrap()
}
#[test]
fn per_repo_tags_isolate() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let (rid_a, rid_b) = rid_pair();
let rel = release(1);
let cid = blob_cid(b"shared bytes");
let hash = Hash::new(b"shared bytes");
tag_seeded(&store, &rid_a, &rel, &cid, hash).await.unwrap();
tag_seeded(&store, &rid_b, &rel, &cid, hash).await.unwrap();
assert!(is_seeded(&store, &rid_a, &rel, &cid).await.unwrap());
assert!(is_seeded(&store, &rid_b, &rel, &cid).await.unwrap());
let cids_a = seeded_cids(&store, &rid_a).await.unwrap();
let cids_b = seeded_cids(&store, &rid_b).await.unwrap();
assert_eq!(cids_a.len(), 1);
assert_eq!(cids_b.len(), 1);
assert!(cids_a.contains_key(&cid));
assert!(cids_b.contains_key(&cid));
untag_seeded(&store, &rid_a, &rel, &cid).await.unwrap();
assert!(!is_seeded(&store, &rid_a, &rel, &cid).await.unwrap());
assert!(is_seeded(&store, &rid_b, &rel, &cid).await.unwrap());
let cids_a = seeded_cids(&store, &rid_a).await.unwrap();
let cids_b = seeded_cids(&store, &rid_b).await.unwrap();
assert!(cids_a.is_empty());
assert_eq!(cids_b.len(), 1);
});
}
#[test]
fn per_release_tags_isolate() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let (rid, _) = rid_pair();
let (rel_a, rel_b) = (release(1), release(2));
let cid = blob_cid(b"shared across releases");
let hash = Hash::new(b"shared across releases");
tag_seeded(&store, &rid, &rel_a, &cid, hash).await.unwrap();
tag_seeded(&store, &rid, &rel_b, &cid, hash).await.unwrap();
assert_eq!(all_seeded(&store).await.unwrap().len(), 2);
assert_eq!(seeded_cids(&store, &rid).await.unwrap().len(), 1);
untag_seeded(&store, &rid, &rel_a, &cid).await.unwrap();
assert!(!is_seeded(&store, &rid, &rel_a, &cid).await.unwrap());
assert!(is_seeded(&store, &rid, &rel_b, &cid).await.unwrap());
assert!(is_seeded_any(&store, &rid, &cid).await.unwrap());
tag_seeded(&store, &rid, &rel_a, &cid, hash).await.unwrap();
assert_eq!(untag_all(&store, &rid, &cid).await.unwrap(), 2);
assert!(!is_seeded_any(&store, &rid, &cid).await.unwrap());
});
}
#[test]
fn unregister_unknown_is_noop() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let (rid_a, _) = rid_pair();
let rel = release(1);
let cid = blob_cid(b"never seeded");
untag_seeded(&store, &rid_a, &rel, &cid).await.unwrap();
assert!(!is_seeded(&store, &rid_a, &rel, &cid).await.unwrap());
});
}
#[test]
fn all_seeded_round_trip() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let tmp = tempfile::tempdir().unwrap();
let store = FsStore::load(tmp.path()).await.unwrap();
let (rid_a, rid_b) = rid_pair();
let (rel_1, rel_2) = (release(1), release(2));
let cid_x = blob_cid(b"x");
let cid_y = blob_cid(b"y");
let cid_z = blob_cid(b"z");
let hash = Hash::new(b"value");
let triples = [
(rid_a, rel_1, cid_x),
(rid_a, rel_2, cid_y),
(rid_b, rel_1, cid_x),
(rid_b, rel_1, cid_z),
];
for (rid, rel, cid) in &triples {
tag_seeded(&store, rid, rel, cid, hash).await.unwrap();
}
let got: HashSet<(RepoId, Oid, Cid, Hash)> =
all_seeded(&store).await.unwrap().into_iter().collect();
let want: HashSet<(RepoId, Oid, Cid, Hash)> = triples
.into_iter()
.map(|(rid, rel, cid)| (rid, rel, cid, hash))
.collect();
assert_eq!(got, want);
});
}
#[test]
fn seeded_tag_layout() {
const SHA1_LEN: usize = 20;
let (rid, _) = rid_pair();
let rel = release(7);
let cid = blob_cid(b"layout");
let tag = seeded_tag(&rid, &rel, &cid);
let rid_off = 2;
let rel_len_off = rid_off + SHA1_LEN;
let rel_off = rel_len_off + 1;
let cid_off = rel_off + SHA1_LEN;
assert_eq!(tag.len(), cid_off + cid.as_inner().to_bytes().len());
assert_eq!(tag[0], SEEDED_TAG_V1);
assert_eq!(usize::from(tag[1]), SHA1_LEN);
assert_eq!(&tag[rid_off..rel_len_off], AsRef::<[u8]>::as_ref(&*rid));
assert_eq!(usize::from(tag[rel_len_off]), SHA1_LEN);
assert_eq!(&tag[rel_off..cid_off], AsRef::<[u8]>::as_ref(&rel));
assert_eq!(Cid::from(cid::Cid::try_from(&tag[cid_off..]).unwrap()), cid);
let (rid_back, rel_back, cid_back) = parse_seeded_tag(&tag).expect("decodes");
assert_eq!(rid_back, rid);
assert_eq!(rel_back, rel);
assert_eq!(cid_back, cid);
}
}