1use std::collections::HashMap;
15use std::path::Path;
16use std::time::Duration;
17
18use crate::iroh::EndpointConfig;
19use crate::Error;
20use iroh::protocol::Router;
21use iroh_blobs::api::blobs::{AddPathOptions, ImportMode as IrohImportMode};
22use iroh_blobs::api::{Store, TempTag};
23use iroh_blobs::format::collection::Collection;
24use iroh_blobs::store::fs::{options::Options as FsStoreOptions, FsStore};
25use iroh_blobs::store::GcConfig;
26use iroh_blobs::{BlobFormat, BlobsProtocol, Hash, HashAndFormat};
27use n0_future::StreamExt;
28use radicle::git::Oid;
29use radicle::identity::RepoId;
30use radicle_artifact_core::cid::{self as cid_utils, ArtifactKind, Cid};
31
32pub use radicle_artifact_core::protocol::ImportMode;
33
34fn to_iroh_import_mode(m: ImportMode) -> IrohImportMode {
37 match m {
38 ImportMode::Copy => IrohImportMode::Copy,
39 ImportMode::Reference => IrohImportMode::TryReference,
40 }
41}
42
43pub const ARTIFACTS_DIR: &str = "artifacts";
45
46pub const STORE_DIR: &str = "store";
48
49const SEEDED_TAG_V1: u8 = 0x01;
52
53const ONLINE_TIMEOUT: Duration = Duration::from_secs(10);
55
56const GC_INTERVAL: Duration = Duration::from_secs(60 * 60);
62
63pub struct Seeder {
66 pub blobs: FsStore,
68 pub router: Router,
70}
71
72pub async fn bootstrap(home: &Path, secret: iroh::SecretKey) -> Result<Seeder, Error> {
83 let dir = home.join(ARTIFACTS_DIR);
84 std::fs::create_dir_all(&dir).map_err(Error::Io)?;
85
86 let store_dir = dir.join(STORE_DIR);
90 let db_path = store_dir.join("blobs.db");
91 let mut options = FsStoreOptions::new(&store_dir);
92 options.gc = Some(GcConfig {
93 interval: GC_INTERVAL,
94 add_protected: None,
95 });
96 let blobs = FsStore::load_with_opts(db_path, options)
97 .await
98 .map_err(|e| Error::Iroh(format!("FsStore load: {e}")))?;
99
100 let preset = EndpointConfig::from_env()?;
101 tracing::info!("iroh endpoint config: {preset}");
102 let endpoint = iroh::Endpoint::builder(preset)
103 .secret_key(secret)
104 .bind()
105 .await
106 .map_err(|e| Error::Iroh(format!("endpoint bind: {e}")))?;
107
108 if tokio::time::timeout(ONLINE_TIMEOUT, endpoint.online())
113 .await
114 .is_err()
115 {
116 tracing::warn!("endpoint not relay-connected after {ONLINE_TIMEOUT:?}; continuing anyway");
117 }
118
119 let blobs_protocol = BlobsProtocol::new(&blobs, None);
120 let router = Router::builder(endpoint)
121 .accept(iroh_blobs::ALPN, blobs_protocol)
122 .spawn();
123
124 Ok(Seeder { blobs, router })
125}
126
127fn seeded_tag(rid: &RepoId, release: &Oid, cid: &Cid) -> Vec<u8> {
139 let mut out = seeded_release_prefix(rid, release);
140 out.extend_from_slice(&cid.as_inner().to_bytes());
141 out
142}
143
144fn seeded_rid_prefix(rid: &RepoId) -> Vec<u8> {
146 let rid_b = rid_bytes(rid);
147 let mut out = Vec::with_capacity(2 + rid_b.len());
148 out.push(SEEDED_TAG_V1);
149 out.push(rid_b.len() as u8);
150 out.extend_from_slice(rid_b);
151 out
152}
153
154fn seeded_release_prefix(rid: &RepoId, release: &Oid) -> Vec<u8> {
156 let mut out = seeded_rid_prefix(rid);
157 let rel_b = AsRef::<[u8]>::as_ref(release);
158 out.push(rel_b.len() as u8);
159 out.extend_from_slice(rel_b);
160 out
161}
162
163fn rid_bytes(rid: &RepoId) -> &[u8] {
166 AsRef::<[u8]>::as_ref(&**rid)
167}
168
169fn oid_from_bytes(b: &[u8]) -> Option<Oid> {
172 match b.len() {
173 20 => Some(Oid::from_sha1(b.try_into().ok()?)),
174 _ => None,
177 }
178}
179
180fn parse_seeded_tag(name: &[u8]) -> Option<(RepoId, Oid, Cid)> {
182 let rest = name.strip_prefix(&[SEEDED_TAG_V1])?;
183 let (rid_b, rest) = take_len_prefixed(rest)?;
184 let (rel_b, cid_b) = take_len_prefixed(rest)?;
185 let rid = RepoId::from(oid_from_bytes(rid_b)?);
186 let release = oid_from_bytes(rel_b)?;
187 let cid = Cid::from(cid::Cid::try_from(cid_b).ok()?);
188 Some((rid, release, cid))
189}
190
191fn take_len_prefixed(buf: &[u8]) -> Option<(&[u8], &[u8])> {
193 let (&len, rest) = buf.split_first()?;
194 let len = usize::from(len);
195 if rest.len() < len {
196 return None;
197 }
198 Some(rest.split_at(len))
199}
200
201fn add_opts(path: std::path::PathBuf, mode: ImportMode) -> AddPathOptions {
203 AddPathOptions {
204 path,
205 format: BlobFormat::Raw,
206 mode: to_iroh_import_mode(mode),
207 }
208}
209
210pub async fn import_blob(
219 store: &Store,
220 path: &Path,
221 expected: &Cid,
222 mode: ImportMode,
223) -> Result<(Hash, TempTag), Error> {
224 let abs = dunce::canonicalize(path).map_err(|e| Error::Iroh(format!("canonicalize: {e}")))?;
226 let tt = store
227 .add_path_with_opts(add_opts(abs, mode))
228 .temp_tag()
229 .await
230 .map_err(|e| Error::Iroh(format!("import blob: {e}")))?;
231 let hash = tt.hash();
232
233 let actual = cid_utils::blake3_hash_to_cid(hash.into(), ArtifactKind::Blob);
234 if actual != *expected {
235 return Err(Error::CidMismatch {
236 expected: expected.to_string(),
237 actual: actual.to_string(),
238 });
239 }
240 Ok((hash, tt))
241}
242
243pub async fn import_collection(
254 store: &Store,
255 dir: &Path,
256 expected: &Cid,
257 mode: ImportMode,
258) -> Result<(Hash, TempTag), Error> {
259 let entries = cid_utils::canonical_walk(dir).map_err(Error::Io)?;
260
261 let mut pairs: Vec<(String, Hash)> = Vec::new();
262 let mut file_tags = Vec::with_capacity(entries.len());
265 for (name, abs) in entries {
266 let tt = store
267 .add_path_with_opts(add_opts(abs, mode))
268 .temp_tag()
269 .await
270 .map_err(|e| Error::Iroh(format!("import file {name}: {e}")))?;
271 pairs.push((name, tt.hash()));
272 file_tags.push(tt);
273 }
274
275 let collection = Collection::from_iter(pairs);
276 let root_tag = collection
277 .store(store)
278 .await
279 .map_err(|e| Error::Iroh(format!("store collection: {e}")))?;
280 drop(file_tags);
282
283 let hash = root_tag.hash();
284 let actual = cid_utils::blake3_hash_to_cid(hash.into(), ArtifactKind::Collection);
285 if actual != *expected {
286 return Err(Error::CidMismatch {
287 expected: expected.to_string(),
288 actual: actual.to_string(),
289 });
290 }
291 Ok((hash, root_tag))
292}
293
294pub async fn tag_seeded(
300 store: &Store,
301 rid: &RepoId,
302 release: &Oid,
303 cid: &Cid,
304 hash: Hash,
305) -> Result<(), Error> {
306 let kind = cid_utils::artifact_kind(cid)?;
307 let value = match kind {
308 ArtifactKind::Blob => HashAndFormat::raw(hash),
309 ArtifactKind::Collection => HashAndFormat::hash_seq(hash),
310 };
311 store
312 .tags()
313 .set(seeded_tag(rid, release, cid), value)
314 .await
315 .map_err(|e| Error::Iroh(format!("set seeded tag: {e}")))?;
316 Ok(())
317}
318
319pub async fn seed_artifact(
326 store: &Store,
327 rid: &RepoId,
328 release: &Oid,
329 cid: &Cid,
330 path: &Path,
331 kind: ArtifactKind,
332 mode: ImportMode,
333) -> Result<Hash, Error> {
334 let (hash, _tt) = match kind {
335 ArtifactKind::Blob => import_blob(store, path, cid, mode).await?,
336 ArtifactKind::Collection => import_collection(store, path, cid, mode).await?,
337 };
338 tag_seeded(store, rid, release, cid, hash).await?;
339 Ok(hash)
341}
342
343pub async fn untag_seeded(
352 store: &Store,
353 rid: &RepoId,
354 release: &Oid,
355 cid: &Cid,
356) -> Result<bool, Error> {
357 let removed = store
358 .tags()
359 .delete(seeded_tag(rid, release, cid))
360 .await
361 .map_err(|e| Error::Iroh(format!("delete seeded tag: {e}")))?;
362 Ok(removed > 0)
363}
364
365pub async fn untag_all(store: &Store, rid: &RepoId, cid: &Cid) -> Result<usize, Error> {
378 let prefix = seeded_rid_prefix(rid);
379 let mut stream = store
380 .tags()
381 .list_prefix(&prefix)
382 .await
383 .map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
384
385 let mut names = Vec::new();
388 while let Some(item) = stream.next().await {
389 let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
390 if let Some((_, _, tag_cid)) = parse_seeded_tag(info.name.as_ref()) {
391 if &tag_cid == cid {
392 names.push(info.name);
393 }
394 }
395 }
396
397 let mut removed = 0;
400 for name in names {
401 removed += store
402 .tags()
403 .delete(name)
404 .await
405 .map_err(|e| Error::Iroh(format!("delete seeded tag: {e}")))?
406 as usize;
407 }
408 Ok(removed)
409}
410
411pub async fn is_seeded(
413 store: &Store,
414 rid: &RepoId,
415 release: &Oid,
416 cid: &Cid,
417) -> Result<bool, Error> {
418 let info = store
419 .tags()
420 .get(seeded_tag(rid, release, cid))
421 .await
422 .map_err(|e| Error::Iroh(format!("get seeded tag: {e}")))?;
423 Ok(info.is_some())
424}
425
426pub async fn is_seeded_any(store: &Store, rid: &RepoId, cid: &Cid) -> Result<bool, Error> {
432 let prefix = seeded_rid_prefix(rid);
433 let mut stream = store
434 .tags()
435 .list_prefix(&prefix)
436 .await
437 .map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
438
439 while let Some(item) = stream.next().await {
440 let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
441 if let Some((_, _, tag_cid)) = parse_seeded_tag(info.name.as_ref()) {
442 if &tag_cid == cid {
443 return Ok(true);
444 }
445 }
446 }
447 Ok(false)
448}
449
450pub async fn seeded_cids(store: &Store, rid: &RepoId) -> Result<HashMap<Cid, Hash>, Error> {
458 let prefix = seeded_rid_prefix(rid);
459 let mut stream = store
460 .tags()
461 .list_prefix(&prefix)
462 .await
463 .map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
464
465 let mut out = HashMap::new();
466 while let Some(item) = stream.next().await {
467 let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
468 if let Some((_, _, cid)) = parse_seeded_tag(info.name.as_ref()) {
469 out.insert(cid, info.hash);
470 }
471 }
472 Ok(out)
473}
474
475pub async fn all_seeded(store: &Store) -> Result<Vec<(RepoId, Oid, Cid, Hash)>, Error> {
484 let mut stream = store
485 .tags()
486 .list_prefix([SEEDED_TAG_V1])
487 .await
488 .map_err(|e| Error::Iroh(format!("list seeded tags: {e}")))?;
489
490 let mut out = Vec::new();
491 while let Some(item) = stream.next().await {
492 let info = item.map_err(|e| Error::Iroh(format!("seeded tag stream: {e}")))?;
493 if let Some((rid, release, cid)) = parse_seeded_tag(info.name.as_ref()) {
494 out.push((rid, release, cid, info.hash));
495 }
496 }
497 Ok(out)
498}
499
500pub async fn artifact_size_for(store: &Store, cid: &Cid, hash: Hash) -> u64 {
507 let Ok(kind) = cid_utils::artifact_kind(cid) else {
508 return 0;
509 };
510 match kind {
511 ArtifactKind::Blob => blob_size(store, hash).await,
512 ArtifactKind::Collection => match Collection::load(hash, store).await {
513 Ok(collection) => {
514 let mut total = 0u64;
515 for (_, child) in collection.iter() {
516 total = total.saturating_add(blob_size(store, *child).await);
517 }
518 total
519 }
520 Err(_) => 0,
521 },
522 }
523}
524
525async fn blob_size(store: &Store, hash: Hash) -> u64 {
526 use iroh_blobs::api::proto::BlobStatus;
527 match store.blobs().status(hash).await {
528 Ok(BlobStatus::Complete { size }) => size,
529 Ok(BlobStatus::Partial { size }) => size.unwrap_or(0),
530 _ => 0,
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use std::collections::HashSet;
537 use std::str::FromStr;
538
539 use super::*;
540
541 fn blob_cid(data: &[u8]) -> Cid {
546 use cid::multihash::Multihash;
547 let digest = blake3::hash(data);
548 let mh = Multihash::<64>::wrap(cid_utils::HASH_CODE_BLAKE3, digest.as_bytes()).unwrap();
549 Cid::from(cid::Cid::new_v1(cid_utils::RAW_CODEC, mh))
550 }
551
552 fn rid_pair() -> (RepoId, RepoId) {
554 let a = RepoId::from_str("rad:z2u2CP3ZJzB7ZqE8jHrau19yjpdip").unwrap();
556 let b = RepoId::from_str("rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5").unwrap();
557 assert_ne!(a, b);
558 (a, b)
559 }
560
561 fn release(n: u8) -> Oid {
563 Oid::from_str(&format!("{n:040x}")).unwrap()
564 }
565
566 #[test]
570 fn per_repo_tags_isolate() {
571 let rt = tokio::runtime::Runtime::new().unwrap();
572 rt.block_on(async {
573 let tmp = tempfile::tempdir().unwrap();
574 let store = FsStore::load(tmp.path()).await.unwrap();
575
576 let (rid_a, rid_b) = rid_pair();
577 let rel = release(1);
578 let cid = blob_cid(b"shared bytes");
579 let hash = Hash::new(b"shared bytes");
582
583 tag_seeded(&store, &rid_a, &rel, &cid, hash).await.unwrap();
584 tag_seeded(&store, &rid_b, &rel, &cid, hash).await.unwrap();
585
586 assert!(is_seeded(&store, &rid_a, &rel, &cid).await.unwrap());
587 assert!(is_seeded(&store, &rid_b, &rel, &cid).await.unwrap());
588
589 let cids_a = seeded_cids(&store, &rid_a).await.unwrap();
590 let cids_b = seeded_cids(&store, &rid_b).await.unwrap();
591 assert_eq!(cids_a.len(), 1);
592 assert_eq!(cids_b.len(), 1);
593 assert!(cids_a.contains_key(&cid));
594 assert!(cids_b.contains_key(&cid));
595
596 untag_seeded(&store, &rid_a, &rel, &cid).await.unwrap();
597 assert!(!is_seeded(&store, &rid_a, &rel, &cid).await.unwrap());
598 assert!(is_seeded(&store, &rid_b, &rel, &cid).await.unwrap());
599
600 let cids_a = seeded_cids(&store, &rid_a).await.unwrap();
602 let cids_b = seeded_cids(&store, &rid_b).await.unwrap();
603 assert!(cids_a.is_empty());
604 assert_eq!(cids_b.len(), 1);
605 });
606 }
607
608 #[test]
612 fn per_release_tags_isolate() {
613 let rt = tokio::runtime::Runtime::new().unwrap();
614 rt.block_on(async {
615 let tmp = tempfile::tempdir().unwrap();
616 let store = FsStore::load(tmp.path()).await.unwrap();
617
618 let (rid, _) = rid_pair();
619 let (rel_a, rel_b) = (release(1), release(2));
620 let cid = blob_cid(b"shared across releases");
621 let hash = Hash::new(b"shared across releases");
622
623 tag_seeded(&store, &rid, &rel_a, &cid, hash).await.unwrap();
624 tag_seeded(&store, &rid, &rel_b, &cid, hash).await.unwrap();
625
626 assert_eq!(all_seeded(&store).await.unwrap().len(), 2);
628 assert_eq!(seeded_cids(&store, &rid).await.unwrap().len(), 1);
629
630 untag_seeded(&store, &rid, &rel_a, &cid).await.unwrap();
632 assert!(!is_seeded(&store, &rid, &rel_a, &cid).await.unwrap());
633 assert!(is_seeded(&store, &rid, &rel_b, &cid).await.unwrap());
634 assert!(is_seeded_any(&store, &rid, &cid).await.unwrap());
635
636 tag_seeded(&store, &rid, &rel_a, &cid, hash).await.unwrap();
638 assert_eq!(untag_all(&store, &rid, &cid).await.unwrap(), 2);
639 assert!(!is_seeded_any(&store, &rid, &cid).await.unwrap());
640 });
641 }
642
643 #[test]
645 fn unregister_unknown_is_noop() {
646 let rt = tokio::runtime::Runtime::new().unwrap();
647 rt.block_on(async {
648 let tmp = tempfile::tempdir().unwrap();
649 let store = FsStore::load(tmp.path()).await.unwrap();
650 let (rid_a, _) = rid_pair();
651 let rel = release(1);
652 let cid = blob_cid(b"never seeded");
653
654 untag_seeded(&store, &rid_a, &rel, &cid).await.unwrap();
656 assert!(!is_seeded(&store, &rid_a, &rel, &cid).await.unwrap());
657 });
658 }
659
660 #[test]
664 fn all_seeded_round_trip() {
665 let rt = tokio::runtime::Runtime::new().unwrap();
666 rt.block_on(async {
667 let tmp = tempfile::tempdir().unwrap();
668 let store = FsStore::load(tmp.path()).await.unwrap();
669 let (rid_a, rid_b) = rid_pair();
670 let (rel_1, rel_2) = (release(1), release(2));
671 let cid_x = blob_cid(b"x");
672 let cid_y = blob_cid(b"y");
673 let cid_z = blob_cid(b"z");
674 let hash = Hash::new(b"value");
675
676 let triples = [
677 (rid_a, rel_1, cid_x),
678 (rid_a, rel_2, cid_y),
679 (rid_b, rel_1, cid_x),
680 (rid_b, rel_1, cid_z),
681 ];
682 for (rid, rel, cid) in &triples {
683 tag_seeded(&store, rid, rel, cid, hash).await.unwrap();
684 }
685
686 let got: HashSet<(RepoId, Oid, Cid, Hash)> =
689 all_seeded(&store).await.unwrap().into_iter().collect();
690 let want: HashSet<(RepoId, Oid, Cid, Hash)> = triples
691 .into_iter()
692 .map(|(rid, rel, cid)| (rid, rel, cid, hash))
693 .collect();
694 assert_eq!(got, want);
695 });
696 }
697
698 #[test]
702 fn seeded_tag_layout() {
703 const SHA1_LEN: usize = 20;
706
707 let (rid, _) = rid_pair();
708 let rel = release(7);
709 let cid = blob_cid(b"layout");
710 let tag = seeded_tag(&rid, &rel, &cid);
711
712 let rid_off = 2;
713 let rel_len_off = rid_off + SHA1_LEN;
714 let rel_off = rel_len_off + 1;
715 let cid_off = rel_off + SHA1_LEN;
716
717 assert_eq!(tag.len(), cid_off + cid.as_inner().to_bytes().len());
718 assert_eq!(tag[0], SEEDED_TAG_V1);
719 assert_eq!(usize::from(tag[1]), SHA1_LEN);
720 assert_eq!(&tag[rid_off..rel_len_off], AsRef::<[u8]>::as_ref(&*rid));
721 assert_eq!(usize::from(tag[rel_len_off]), SHA1_LEN);
722 assert_eq!(&tag[rel_off..cid_off], AsRef::<[u8]>::as_ref(&rel));
723 assert_eq!(Cid::from(cid::Cid::try_from(&tag[cid_off..]).unwrap()), cid);
724
725 let (rid_back, rel_back, cid_back) = parse_seeded_tag(&tag).expect("decodes");
726 assert_eq!(rid_back, rid);
727 assert_eq!(rel_back, rel);
728 assert_eq!(cid_back, cid);
729 }
730}