1use std::io;
14use std::os::unix::fs::PermissionsExt;
15use std::path::{Path, PathBuf};
16use std::str::FromStr;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use iroh_blobs::api::downloader::Downloader;
22use iroh_blobs::store::fs::FsStore;
23use iroh_blobs::HashAndFormat;
24use radicle::git::Oid;
25use radicle::identity::RepoId;
26use radicle_artifact_core::cid::Cid;
27use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
28use tokio::net::{UnixListener, UnixStream};
29use tokio::signal::unix::{signal, SignalKind};
30use tokio::sync::{broadcast, mpsc};
31use url::Url;
32
33use crate::seeder;
34use crate::{fetch, Error as ShareError};
35use radicle_artifact_client::tokio::Client;
36use radicle_artifact_core::cid::{self as cid_utils, ArtifactKind};
37use radicle_artifact_core::keys::EndpointId;
38use radicle_artifact_core::protocol::{
39 Command, CommandError, CommandResult, DownloadReceipt, ErrorCode, ExportReceipt, FetchLocation,
40 FetchProgress, FetchReceipt, HasResult, ImportMode, SeedReceipt, SeededEntry, Status,
41 StreamEvent, UnseedReceipt,
42};
43use radicle_artifact_core::ARTIFACTS_DIR;
44
45const DRAIN_TIMEOUT: Duration = Duration::from_secs(300);
50
51const READ_TIMEOUT: Duration = Duration::from_secs(30);
55
56const ROUTER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2);
59
60#[derive(Debug, thiserror::Error)]
62pub enum NodeError {
63 #[error("another rad-artifact node is already running at {0}")]
66 AlreadyRunning(PathBuf),
67 #[error(transparent)]
69 Share(#[from] ShareError),
70 #[error("node I/O error: {0}")]
72 Io(#[from] io::Error),
73}
74
75struct NodeCtx {
82 store: FsStore,
84 downloader: Downloader,
86 endpoint: iroh::Endpoint,
89 endpoint_id: EndpointId,
91 started_at_unix: i64,
93}
94
95pub async fn run(home: &Path, secret: iroh::SecretKey) -> Result<(), NodeError> {
112 let socket_path = home.join(ARTIFACTS_DIR).join("control.sock");
113 if let Some(parent) = socket_path.parent() {
115 std::fs::create_dir_all(parent)?;
116 }
117
118 if socket_path.exists() {
122 let probe = Client::new(socket_path.clone());
123 if probe.is_running().await {
124 return Err(NodeError::AlreadyRunning(socket_path));
125 }
126 std::fs::remove_file(&socket_path)?;
127 }
128
129 let seeder = seeder::bootstrap(home, secret).await?;
130
131 let listener = UnixListener::bind(&socket_path)?;
132 std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600))?;
133
134 let started_at_unix = SystemTime::now()
135 .duration_since(UNIX_EPOCH)
136 .map(|d| d.as_secs() as i64)
137 .unwrap_or(0);
138
139 let endpoint_id = EndpointId::from(seeder.router.endpoint().id());
140
141 tracing::info!(
142 endpoint_id = %endpoint_id,
143 socket = %socket_path.display(),
144 "rad-artifact node ready"
145 );
146
147 let downloader = Downloader::new_with_opts(
150 seeder.blobs.as_ref(),
151 seeder.router.endpoint(),
152 fetch::pool_options(),
153 );
154 let ctx = Arc::new(NodeCtx {
155 store: seeder.blobs.clone(),
156 downloader,
157 endpoint: seeder.router.endpoint().clone(),
158 endpoint_id,
159 started_at_unix,
160 });
161
162 let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(8);
167 spawn_signal_handler(shutdown_tx.clone());
168
169 let in_flight = Arc::new(AtomicUsize::new(0));
170
171 loop {
172 tokio::select! {
173 biased;
174 res = shutdown_rx.recv() => {
175 if res.is_ok() { break; }
177 }
178 accept = listener.accept() => {
179 let (stream, _addr) = match accept {
180 Ok(v) => v,
181 Err(e) => {
182 tracing::warn!("accept error: {e}");
183 continue;
184 }
185 };
186 let ctx = ctx.clone();
187 let shutdown_tx = shutdown_tx.clone();
188 let in_flight = in_flight.clone();
189 in_flight.fetch_add(1, Ordering::SeqCst);
190 tokio::spawn(async move {
191 if let Err(e) = handle_connection(stream, &ctx, &shutdown_tx).await {
192 tracing::warn!("handler error: {e}");
193 }
194 in_flight.fetch_sub(1, Ordering::SeqCst);
195 });
196 }
197 }
198 }
199
200 drop(listener);
202 let _ = std::fs::remove_file(&socket_path);
203
204 let drain_deadline = Instant::now() + DRAIN_TIMEOUT;
206 while in_flight.load(Ordering::SeqCst) > 0 && Instant::now() < drain_deadline {
207 tokio::time::sleep(Duration::from_millis(50)).await;
208 }
209
210 let _ = tokio::time::timeout(ROUTER_SHUTDOWN_TIMEOUT, seeder.router.shutdown()).await;
211 tracing::info!("rad-artifact node stopped");
212 Ok(())
213}
214
215fn spawn_signal_handler(shutdown_tx: broadcast::Sender<()>) {
217 tokio::spawn(async move {
218 let Ok(mut term) = signal(SignalKind::terminate()) else {
219 return;
220 };
221 let Ok(mut int) = signal(SignalKind::interrupt()) else {
222 return;
223 };
224 tokio::select! {
225 _ = term.recv() => {}
226 _ = int.recv() => {}
227 }
228 let _ = shutdown_tx.send(());
229 });
230}
231
232async fn handle_connection(
235 stream: UnixStream,
236 ctx: &NodeCtx,
237 shutdown_tx: &broadcast::Sender<()>,
238) -> io::Result<()> {
239 let (read, mut write) = stream.into_split();
240 let mut reader = BufReader::new(read);
241 let mut line = String::new();
242 let n = match tokio::time::timeout(READ_TIMEOUT, reader.read_line(&mut line)).await {
245 Ok(res) => res?,
246 Err(_) => return Ok(()),
247 };
248 if n == 0 {
249 return Ok(());
250 }
251
252 match parse_command(line.trim_end()) {
253 Ok(Command::Export { cid, dest }) => {
256 stream_export(ctx, &mut reader, &mut write, cid, dest).await
257 }
258 Ok(Command::Fetch {
259 rid,
260 cid,
261 locations,
262 seed,
263 }) => stream_fetch(ctx, &mut reader, &mut write, rid, cid, locations, seed).await,
264 Ok(Command::Download {
265 rid,
266 cid,
267 locations,
268 dest,
269 seed,
270 }) => {
271 stream_download(
272 ctx,
273 &mut reader,
274 &mut write,
275 rid,
276 cid,
277 locations,
278 dest,
279 seed,
280 )
281 .await
282 }
283 Ok(cmd) => write_line(&mut write, dispatch(cmd, ctx, shutdown_tx).await).await,
285 Err((code, msg)) => write_line(&mut write, err_json::<()>(code, msg)).await,
286 }
287}
288
289async fn write_line(
291 write: &mut (impl AsyncWriteExt + Unpin),
292 mut response: String,
293) -> io::Result<()> {
294 response.push('\n');
295 write.write_all(response.as_bytes()).await?;
296 write.flush().await
297}
298
299fn parse_command(line: &str) -> Result<Command, (ErrorCode, String)> {
303 let value: serde_json::Value = serde_json::from_str(line).map_err(|e| {
304 (
305 ErrorCode::InvalidRequest,
306 format!("invalid command JSON: {e}"),
307 )
308 })?;
309 if let Some(rid_s) = value.get("rid").and_then(|v| v.as_str()) {
310 if let Err(e) = RepoId::from_str(rid_s) {
311 return Err((
312 ErrorCode::InvalidRequest,
313 format!("invalid rid {rid_s:?}: {e}"),
314 ));
315 }
316 }
317 if let Some(cid_s) = value.get("cid").and_then(|v| v.as_str()) {
318 if let Err(e) = Cid::from_str(cid_s) {
319 return Err((
320 ErrorCode::InvalidRequest,
321 format!("invalid cid {cid_s:?}: {e}"),
322 ));
323 }
324 }
325 serde_json::from_value(value)
326 .map_err(|e| (ErrorCode::InvalidRequest, format!("invalid command: {e}")))
327}
328
329async fn dispatch(cmd: Command, ctx: &NodeCtx, shutdown_tx: &broadcast::Sender<()>) -> String {
333 let store = &ctx.store;
334 match cmd {
335 Command::Alive => ok_json(()),
337 Command::Status => {
338 match build_status(store, &ctx.endpoint, ctx.endpoint_id, ctx.started_at_unix).await {
339 Ok(status) => ok_json(status),
340 Err(e) => err_from_share::<Status>(e),
341 }
342 }
343 Command::Seed {
344 rid,
345 release,
346 cid,
347 path,
348 kind,
349 mode,
350 } => seed_response(store, rid, release, cid, &path, kind, mode, ctx.endpoint_id).await,
351 Command::Unseed { rid, release, cid } => unseed_response(store, rid, release, cid).await,
352 Command::IsSeeding { rid, cid } => is_seeding_response(store, &rid, &cid).await,
353 Command::ListSeeded { rid } => list_seeded_response(store, rid).await,
354 Command::Has { cid } => has_response(store, &cid).await,
355 Command::Export { .. } | Command::Fetch { .. } | Command::Download { .. } => {
357 unreachable!("streaming commands are handled before dispatch")
358 }
359 Command::Shutdown => {
360 let resp = ok_json(());
363 let _ = shutdown_tx.send(());
364 resp
365 }
366 }
367}
368
369fn hash_and_format(cid: &Cid) -> Result<HashAndFormat, (ErrorCode, String)> {
371 let hash = cid_utils::cid_to_blake3_hash(cid)
372 .map_err(|e| (ErrorCode::InvalidRequest, e.to_string()))?;
373 match cid_utils::artifact_kind(cid) {
374 Ok(ArtifactKind::Blob) => Ok(HashAndFormat::raw(hash.into())),
375 Ok(ArtifactKind::Collection) => Ok(HashAndFormat::hash_seq(hash.into())),
376 Err(e) => Err((ErrorCode::InvalidRequest, e.to_string())),
377 }
378}
379
380async fn has_response(store: &FsStore, cid: &Cid) -> String {
383 let haf = match hash_and_format(cid) {
384 Ok(h) => h,
385 Err((code, msg)) => return err_json::<HasResult>(code, msg),
386 };
387 match store.remote().local(haf).await {
388 Ok(info) => {
389 let bytes = info.local_bytes();
390 ok_json(HasResult {
391 present: bytes > 0,
392 complete: info.is_complete(),
393 bytes,
394 })
395 }
396 Err(e) => err_json::<HasResult>(ErrorCode::Iroh, format!("local lookup: {e}")),
397 }
398}
399
400async fn write_frame<T: serde::Serialize>(
402 write: &mut (impl AsyncWriteExt + Unpin),
403 event: &StreamEvent<T>,
404) -> io::Result<()> {
405 let mut line = serde_json::to_string(event).unwrap_or_else(|e| {
406 format!(r#"{{"error":{{"code":"internal","message":"encode: {e}"}}}}"#)
407 });
408 line.push('\n');
409 write.write_all(line.as_bytes()).await?;
410 write.flush().await
411}
412
413async fn stream_error<T: serde::Serialize>(
415 write: &mut (impl AsyncWriteExt + Unpin),
416 code: ErrorCode,
417 message: String,
418) -> io::Result<()> {
419 write_frame::<T>(write, &StreamEvent::Error(CommandError { code, message })).await
420}
421
422async fn run_stream<T: serde::Serialize>(
431 read: &mut (impl AsyncReadExt + Unpin),
432 write: &mut (impl AsyncWriteExt + Unpin),
433 op: impl AsyncFnOnce(mpsc::UnboundedSender<FetchProgress>) -> Result<T, (ErrorCode, String)>,
434) -> io::Result<()> {
435 let (tx, mut rx) = mpsc::unbounded_channel::<FetchProgress>();
436 let fut = op(tx);
437 tokio::pin!(fut);
438 let mut probe = [0u8; 1];
439 loop {
440 tokio::select! {
441 biased;
442 res = &mut fut => {
443 while let Ok(p) = rx.try_recv() {
445 write_frame(write, &StreamEvent::<T>::Progress(p)).await?;
446 }
447 let event = match res {
448 Ok(payload) => StreamEvent::Okay(payload),
449 Err((code, message)) => StreamEvent::Error(CommandError { code, message }),
450 };
451 return write_frame(write, &event).await;
452 }
453 Some(p) = rx.recv() => {
454 write_frame(write, &StreamEvent::<T>::Progress(p)).await?;
455 }
456 _ = read.read(&mut probe) => {
461 return Ok(());
462 }
463 }
464 }
465}
466
467async fn stream_export(
470 ctx: &NodeCtx,
471 read: &mut (impl AsyncReadExt + Unpin),
472 write: &mut (impl AsyncWriteExt + Unpin),
473 cid: Cid,
474 dest: PathBuf,
475) -> io::Result<()> {
476 let kind = match cid_utils::artifact_kind(&cid) {
477 Ok(k) => k,
478 Err(e) => {
479 return stream_error::<ExportReceipt>(write, ErrorCode::InvalidRequest, e.to_string())
480 .await
481 }
482 };
483 let haf = match hash_and_format(&cid) {
484 Ok(h) => h,
485 Err((code, msg)) => return stream_error::<ExportReceipt>(write, code, msg).await,
486 };
487 let hash = haf.hash;
488
489 match ctx.store.blobs().has(hash).await {
491 Ok(true) => {}
492 Ok(false) => {
493 return stream_error::<ExportReceipt>(
494 write,
495 ErrorCode::NotLocal,
496 format!("content for {cid} is not complete in the store"),
497 )
498 .await
499 }
500 Err(e) => {
501 return stream_error::<ExportReceipt>(
502 write,
503 ErrorCode::Iroh,
504 format!("local lookup: {e}"),
505 )
506 .await
507 }
508 }
509
510 run_stream(read, write, async move |tx| {
511 let on_progress = move |p| {
512 let _ = tx.send(p);
513 };
514 let bytes = match kind {
515 ArtifactKind::Blob => fetch::export_blob_to(&ctx.store, hash, &dest, on_progress).await,
516 ArtifactKind::Collection => {
517 fetch::export_collection_to(&ctx.store, hash, &dest, on_progress).await
518 }
519 }
520 .map_err(|e| (share_error_to_code(&e), e.to_string()))?;
521 Ok(ExportReceipt { cid, dest, bytes })
522 })
523 .await
524}
525
526fn partition(locations: &[FetchLocation]) -> (Vec<EndpointId>, Vec<Url>) {
528 let mut iroh = Vec::new();
529 let mut urls = Vec::new();
530 for loc in locations {
531 match loc {
532 FetchLocation::Iroh(id) => iroh.push(*id),
533 FetchLocation::Url(u) => urls.push(u.clone()),
534 }
535 }
536 (iroh, urls)
537}
538
539async fn export_to_dest(
541 store: &FsStore,
542 hash: iroh_blobs::Hash,
543 kind: ArtifactKind,
544 dest: &Path,
545 on_progress: impl FnMut(FetchProgress),
546) -> Result<u64, (ErrorCode, String)> {
547 match kind {
548 ArtifactKind::Blob => fetch::export_blob_to(store, hash, dest, on_progress).await,
549 ArtifactKind::Collection => {
550 fetch::export_collection_to(store, hash, dest, on_progress).await
551 }
552 }
553 .map_err(|e| (share_error_to_code(&e), e.to_string()))
554}
555
556struct Fetched {
562 kind: ArtifactKind,
564 hash: iroh_blobs::Hash,
566 from_cache: bool,
568 _tt: iroh_blobs::api::TempTag,
570}
571
572async fn fetch_into_store(
582 ctx: &NodeCtx,
583 cid: &Cid,
584 locations: &[FetchLocation],
585 on_progress: &mut impl FnMut(FetchProgress),
586) -> Result<Fetched, (ErrorCode, String)> {
587 let kind =
588 cid_utils::artifact_kind(cid).map_err(|e| (ErrorCode::InvalidRequest, e.to_string()))?;
589 let haf = hash_and_format(cid)?;
590 let hash = haf.hash;
591 let store = &ctx.store;
592
593 let tt = store
600 .tags()
601 .temp_tag(haf)
602 .await
603 .map_err(|e| (ErrorCode::Iroh, format!("temp tag: {e}")))?;
604
605 let already = store
607 .blobs()
608 .has(hash)
609 .await
610 .map_err(|e| (ErrorCode::Iroh, format!("local lookup: {e}")))?;
611 if already {
612 return Ok(Fetched {
613 kind,
614 hash,
615 from_cache: true,
616 _tt: tt,
617 });
618 }
619
620 on_progress(FetchProgress::Connecting);
621
622 let (iroh_ids, urls) = partition(locations);
623 let no_locations = iroh_ids.is_empty() && urls.is_empty();
624 let mut errors: Vec<String> = Vec::new();
625 let mut got = false;
626
627 if !iroh_ids.is_empty() {
628 match fetch::download_iroh_to_store(
629 &ctx.downloader,
630 store,
631 haf,
632 iroh_ids,
633 &mut *on_progress,
634 )
635 .await
636 {
637 Ok(()) => got = true,
638 Err(errs) => errors.extend(errs.into_iter().map(|e| e.to_string())),
639 }
640 }
641 if !got {
642 match kind {
643 ArtifactKind::Blob => {
646 for url in &urls {
647 match fetch::http_to_store(store, url, cid, &mut *on_progress).await {
648 Ok(_) => break,
649 Err(e) => errors.push(e.to_string()),
650 }
651 }
652 }
653 ArtifactKind::Collection => {
654 for url in &urls {
655 errors.push(format!("HTTP fetch unsupported for collection: {url}"));
656 }
657 }
658 }
659 }
660
661 let complete = store
664 .remote()
665 .local(haf)
666 .await
667 .map(|i| i.is_complete())
668 .unwrap_or(false);
669 if !complete {
670 drop(tt); let code = if no_locations {
672 ErrorCode::NoLocations
673 } else {
674 ErrorCode::AllFailed
675 };
676 let msg = if errors.is_empty() {
677 "no locations succeeded".to_string()
678 } else {
679 errors.join("; ")
680 };
681 return Err((code, msg));
682 }
683
684 Ok(Fetched {
685 kind,
686 hash,
687 from_cache: false,
688 _tt: tt,
689 })
690}
691
692async fn stream_fetch(
701 ctx: &NodeCtx,
702 read: &mut (impl AsyncReadExt + Unpin),
703 write: &mut (impl AsyncWriteExt + Unpin),
704 rid: RepoId,
705 cid: Cid,
706 locations: Vec<FetchLocation>,
707 seed: Option<Oid>,
708) -> io::Result<()> {
709 let endpoint_id = ctx.endpoint_id;
710
711 run_stream(read, write, async move |tx| {
712 let mut on_progress = move |p| {
713 let _ = tx.send(p);
714 };
715 let store = &ctx.store;
716
717 let fetched = fetch_into_store(ctx, &cid, &locations, &mut on_progress).await?;
718 let seeded = tag_if_seeding(store, &rid, seed.as_ref(), &cid, fetched.hash)
719 .await
720 .map_err(|e| (share_error_to_code(&e), e.to_string()))?;
721
722 let bytes = seeder::artifact_size_for(store, &cid, fetched.hash).await;
724 Ok(FetchReceipt {
725 rid,
726 cid,
727 bytes,
728 from_cache: fetched.from_cache,
729 seeded,
730 endpoint_id,
731 })
732 })
734 .await
735}
736
737#[allow(clippy::too_many_arguments)]
745async fn stream_download(
746 ctx: &NodeCtx,
747 read: &mut (impl AsyncReadExt + Unpin),
748 write: &mut (impl AsyncWriteExt + Unpin),
749 rid: RepoId,
750 cid: Cid,
751 locations: Vec<FetchLocation>,
752 dest: PathBuf,
753 seed: Option<Oid>,
754) -> io::Result<()> {
755 let endpoint_id = ctx.endpoint_id;
756
757 run_stream(read, write, async move |tx| {
758 let mut on_progress = move |p| {
759 let _ = tx.send(p);
760 };
761 let store = &ctx.store;
762
763 let fetched = fetch_into_store(ctx, &cid, &locations, &mut on_progress).await?;
764
765 let bytes =
767 export_to_dest(store, fetched.hash, fetched.kind, &dest, &mut on_progress).await?;
768 let seeded = tag_if_seeding(store, &rid, seed.as_ref(), &cid, fetched.hash)
769 .await
770 .map_err(|e| (share_error_to_code(&e), e.to_string()))?;
771
772 Ok(DownloadReceipt {
773 rid,
774 cid,
775 dest,
776 bytes,
777 from_cache: fetched.from_cache,
778 seeded,
779 endpoint_id,
780 })
781 })
783 .await
784}
785
786async fn tag_if_seeding(
793 store: &iroh_blobs::api::Store,
794 rid: &RepoId,
795 seed: Option<&Oid>,
796 cid: &Cid,
797 hash: iroh_blobs::Hash,
798) -> Result<bool, ShareError> {
799 let Some(release) = seed else {
800 return Ok(false);
801 };
802 seeder::tag_seeded(store, rid, release, cid, hash).await?;
803 Ok(true)
804}
805
806#[allow(clippy::too_many_arguments)]
807async fn seed_response(
808 store: &FsStore,
809 rid: RepoId,
810 release: Oid,
811 cid: Cid,
812 path: &Path,
813 kind: ArtifactKind,
814 mode: ImportMode,
815 endpoint_id: EndpointId,
816) -> String {
817 if !path.exists() {
818 return err_json::<SeedReceipt>(
819 ErrorCode::PathNotFound,
820 format!("path not found: {}", path.display()),
821 );
822 }
823
824 let was_already = match seeder::is_seeded(store, &rid, &release, &cid).await {
825 Ok(v) => v,
826 Err(e) => return err_from_share::<SeedReceipt>(e),
827 };
828 let hash = match seeder::seed_artifact(store, &rid, &release, &cid, path, kind, mode).await {
829 Ok(hash) => hash,
830 Err(e) => return err_from_share::<SeedReceipt>(e),
831 };
832 let bytes = seeder::artifact_size_for(store, &cid, hash).await;
833 let receipt = SeedReceipt {
834 rid,
835 cid,
836 endpoint_id,
837 bytes,
838 was_new: !was_already,
839 };
840 ok_json(receipt)
841}
842
843async fn unseed_response(store: &FsStore, rid: RepoId, release: Option<Oid>, cid: Cid) -> String {
848 let was_removed = match &release {
849 Some(release) => match seeder::untag_seeded(store, &rid, release, &cid).await {
850 Ok(removed) => removed,
851 Err(e) => return err_from_share::<UnseedReceipt>(e),
852 },
853 None => match seeder::untag_all(store, &rid, &cid).await {
854 Ok(removed) => removed > 0,
855 Err(e) => return err_from_share::<UnseedReceipt>(e),
856 },
857 };
858 ok_json(UnseedReceipt {
859 rid,
860 cid,
861 was_removed,
862 })
863}
864
865async fn is_seeding_response(store: &FsStore, rid: &RepoId, cid: &Cid) -> String {
866 match seeder::is_seeded_any(store, rid, cid).await {
867 Ok(v) => ok_json(v),
868 Err(e) => err_from_share::<bool>(e),
869 }
870}
871
872async fn list_seeded_response(store: &FsStore, rid: RepoId) -> String {
873 let cids = match seeder::seeded_cids(store, &rid).await {
874 Ok(v) => v,
875 Err(e) => return err_from_share::<Vec<SeededEntry>>(e),
876 };
877 let mut out = Vec::with_capacity(cids.len());
878 for (cid, hash) in cids {
879 let bytes = seeder::artifact_size_for(store, &cid, hash).await;
880 out.push(SeededEntry { cid, bytes });
881 }
882 ok_json(out)
883}
884
885async fn build_status(
886 store: &FsStore,
887 endpoint: &iroh::Endpoint,
888 endpoint_id: EndpointId,
889 started_at_unix: i64,
890) -> Result<Status, ShareError> {
891 let metrics = endpoint.metrics();
892 let distinct: std::collections::HashMap<iroh_blobs::Hash, Cid> = seeder::all_seeded(store)
895 .await?
896 .into_iter()
897 .map(|(_rid, _release, cid, hash)| (hash, cid))
898 .collect();
899 let count = distinct.len();
900 let mut bytes_logical = 0u64;
901 for (hash, cid) in &distinct {
902 bytes_logical =
903 bytes_logical.saturating_add(seeder::artifact_size_for(store, cid, *hash).await);
904 }
905 let s = &metrics.socket;
909 let opened_total = s.num_conns_opened.get();
910 let closed_total = s.num_conns_closed.get();
911 let connections = radicle_artifact_core::protocol::ConnectionStats {
912 active: opened_total.saturating_sub(closed_total) as u32,
913 opened_total,
914 closed_total,
915 direct_total: s.num_conns_direct.get(),
916 holepunch_attempts: s.holepunch_attempts.get(),
917 paths_direct: s.paths_direct.get(),
918 paths_relayed: s.paths_relay.get(),
919 };
920 let traffic = radicle_artifact_core::protocol::TrafficStats {
921 out_bytes: s
922 .send_ipv4
923 .get()
924 .saturating_add(s.send_ipv6.get())
925 .saturating_add(s.send_relay.get()),
926 in_bytes: s
927 .recv_data_ipv4
928 .get()
929 .saturating_add(s.recv_data_ipv6.get())
930 .saturating_add(s.recv_data_relay.get())
931 .saturating_add(s.recv_data_custom.get()),
932 };
933 let relay = relay_stats(endpoint);
934 let relay_unreachable = !relay.relays.iter().any(|r| r.connected);
937 Ok(Status {
938 endpoint_id,
939 started_at_unix,
940 seeded: radicle_artifact_core::protocol::SeededStats {
941 count,
942 bytes_logical,
943 },
944 connections,
945 traffic,
946 relay,
947 warnings: radicle_artifact_core::protocol::Warnings { relay_unreachable },
948 })
949}
950
951fn relay_stats(endpoint: &iroh::Endpoint) -> radicle_artifact_core::protocol::RelayStats {
957 use iroh::Watcher;
958
959 let report = endpoint.net_report().get();
960 let mut latency_ms: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
962 if let Some(r) = &report {
963 for (_probe, url, dur) in r.relay_latency.iter() {
964 let ms = dur.as_millis() as u64;
965 latency_ms
966 .entry(url.to_string())
967 .and_modify(|m| *m = (*m).min(ms))
968 .or_insert(ms);
969 }
970 }
971
972 let relays = endpoint
973 .home_relay_status()
974 .get()
975 .into_iter()
976 .map(|s| {
977 let url = s.url().to_string();
978 radicle_artifact_core::protocol::RelayHealth {
979 latency_ms: latency_ms.get(&url).copied(),
980 connected: s.is_connected(),
981 last_error: s.last_error().map(|e| e.to_string()),
982 url,
983 }
984 })
985 .collect();
986
987 radicle_artifact_core::protocol::RelayStats {
988 relays,
989 preferred: report
990 .as_ref()
991 .and_then(|r| r.preferred_relay.as_ref().map(|u| u.to_string())),
992 udp_v4: report.as_ref().map(|r| r.udp_v4).unwrap_or(false),
993 udp_v6: report.as_ref().map(|r| r.udp_v6).unwrap_or(false),
994 }
995}
996
997fn ok_json<T: serde::Serialize>(v: T) -> String {
998 serde_json::to_string(&CommandResult::Okay(v))
999 .unwrap_or_else(|e| format!(r#"{{"error":{{"code":"internal","message":"encode: {e}"}}}}"#))
1000}
1001
1002fn err_json<T>(code: ErrorCode, message: String) -> String
1003where
1004 T: serde::Serialize,
1005{
1006 serde_json::to_string(&CommandResult::<T>::Error(CommandError { code, message }))
1007 .unwrap_or_else(|e| format!(r#"{{"error":{{"code":"internal","message":"encode: {e}"}}}}"#))
1008}
1009
1010fn err_from_share<T>(e: ShareError) -> String
1011where
1012 T: serde::Serialize,
1013{
1014 let code = share_error_to_code(&e);
1015 err_json::<T>(code, e.to_string())
1016}
1017
1018fn share_error_to_code(e: &ShareError) -> ErrorCode {
1019 match e {
1020 ShareError::CidMismatch { .. } => ErrorCode::CidMismatch,
1021 ShareError::Io(_) => ErrorCode::Io,
1022 ShareError::Iroh(_) => ErrorCode::Iroh,
1023 _ => ErrorCode::Internal,
1024 }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029 use std::fs;
1030
1031 use cid::multihash::Multihash;
1032
1033 use super::*;
1034 use radicle_artifact_core::cid::{
1035 self as cid_utils, ArtifactKind, HASH_CODE_BLAKE3, RAW_CODEC,
1036 };
1037
1038 fn fake_blob_cid(data: &[u8]) -> Cid {
1041 let digest = blake3::hash(data);
1042 let mh = Multihash::<64>::wrap(HASH_CODE_BLAKE3, digest.as_bytes()).unwrap();
1043 Cid::from(cid::Cid::new_v1(RAW_CODEC, mh))
1044 }
1045
1046 fn rid_a() -> RepoId {
1047 RepoId::from_str("rad:z2u2CP3ZJzB7ZqE8jHrau19yjpdip").unwrap()
1048 }
1049
1050 fn release_a() -> Oid {
1051 Oid::from_str("0123456789abcdef0123456789abcdef01234567").unwrap()
1052 }
1053
1054 async fn start_node(
1057 home: &Path,
1058 secret: iroh::SecretKey,
1059 ) -> (PathBuf, tokio::task::JoinHandle<Result<(), NodeError>>) {
1060 let home_path = home.to_path_buf();
1061 let handle = tokio::spawn(async move { run(&home_path, secret).await });
1062 let socket = home.join(ARTIFACTS_DIR).join("control.sock");
1063 for _ in 0..200 {
1064 if socket.exists() {
1065 break;
1066 }
1067 tokio::time::sleep(Duration::from_millis(50)).await;
1068 }
1069 assert!(socket.exists(), "control socket never appeared");
1070 (socket, handle)
1071 }
1072
1073 #[test]
1077 fn node_round_trip() {
1078 let rt = tokio::runtime::Builder::new_multi_thread()
1079 .enable_all()
1080 .build()
1081 .unwrap();
1082 rt.block_on(async {
1083 let home = tempfile::tempdir().unwrap();
1084
1085 let blob_path = home.path().join("payload.bin");
1088 let payload = b"hello rad-artifact";
1089 fs::write(&blob_path, payload).unwrap();
1090 let real_cid = cid_utils::compute_blob_cid(&blob_path).unwrap();
1091 let rid = rid_a();
1092
1093 let secret = iroh::SecretKey::from_bytes(&[1u8; 32]);
1095 let expected_endpoint_id = EndpointId::from(secret.public());
1096
1097 let home_path = home.path().to_path_buf();
1100 let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1101
1102 let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1105 for _ in 0..200 {
1106 if socket.exists() {
1107 break;
1108 }
1109 tokio::time::sleep(Duration::from_millis(50)).await;
1110 }
1111 assert!(socket.exists(), "control socket never appeared");
1112
1113 let client = Client::new(socket.clone());
1114
1115 assert!(client.is_running().await);
1117
1118 let status = client.status().await.unwrap();
1120 assert_eq!(status.endpoint_id, expected_endpoint_id);
1121 assert_eq!(status.seeded.count, 0);
1122
1123 let receipt = client
1125 .seed(
1126 rid,
1127 release_a(),
1128 real_cid,
1129 &blob_path,
1130 ArtifactKind::Blob,
1131 ImportMode::Copy,
1132 )
1133 .await
1134 .unwrap();
1135 assert!(receipt.was_new);
1136 assert_eq!(receipt.endpoint_id, expected_endpoint_id);
1137 assert_eq!(receipt.bytes, payload.len() as u64);
1138
1139 let receipt2 = client
1141 .seed(
1142 rid,
1143 release_a(),
1144 real_cid,
1145 &blob_path,
1146 ArtifactKind::Blob,
1147 ImportMode::Copy,
1148 )
1149 .await
1150 .unwrap();
1151 assert!(!receipt2.was_new);
1152
1153 let bad_path = home.path().join("tampered.bin");
1155 fs::write(&bad_path, b"different bytes").unwrap();
1156 let bad_cid = fake_blob_cid(b"not the right preimage");
1157 let err = client
1158 .seed(
1159 rid,
1160 release_a(),
1161 bad_cid,
1162 &bad_path,
1163 ArtifactKind::Blob,
1164 ImportMode::Copy,
1165 )
1166 .await
1167 .expect_err("CID mismatch must error");
1168 match err {
1169 radicle_artifact_client::ClientError::Remote(CommandError { code, .. }) => {
1170 assert_eq!(code, ErrorCode::CidMismatch);
1171 }
1172 other => panic!("expected CidMismatch error, got {other:?}"),
1173 }
1174
1175 assert!(client.is_seeding(rid, real_cid).await.unwrap());
1177
1178 let entries = client.list_seeded(rid).await.unwrap();
1180 assert_eq!(entries.len(), 1);
1181 assert_eq!(entries[0].cid, real_cid);
1182 assert_eq!(entries[0].bytes, payload.len() as u64);
1183
1184 let status = client.status().await.unwrap();
1186 assert_eq!(status.seeded.count, 1);
1187 assert_eq!(status.seeded.bytes_logical, payload.len() as u64);
1188
1189 let r1 = client
1191 .unseed(rid, Some(release_a()), real_cid)
1192 .await
1193 .unwrap();
1194 assert!(r1.was_removed);
1195 let r2 = client
1196 .unseed(rid, Some(release_a()), real_cid)
1197 .await
1198 .unwrap();
1199 assert!(!r2.was_removed);
1200 assert!(!client.is_seeding(rid, real_cid).await.unwrap());
1201
1202 client.shutdown().await.unwrap();
1204 tokio::time::timeout(Duration::from_secs(10), node_handle)
1205 .await
1206 .expect("node did not exit within 10s")
1207 .expect("join error")
1208 .expect("node returned error");
1209
1210 assert!(!socket.exists());
1212 });
1213 }
1214
1215 #[test]
1219 fn invalid_typed_fields_surface_as_invalid_request() {
1220 let rt = tokio::runtime::Builder::new_multi_thread()
1221 .enable_all()
1222 .build()
1223 .unwrap();
1224 rt.block_on(async {
1225 let home = tempfile::tempdir().unwrap();
1226 let secret = iroh::SecretKey::from_bytes(&[4u8; 32]);
1227 let home_path = home.path().to_path_buf();
1228 let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1229
1230 let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1231 for _ in 0..200 {
1232 if socket.exists() {
1233 break;
1234 }
1235 tokio::time::sleep(Duration::from_millis(50)).await;
1236 }
1237 assert!(socket.exists());
1238
1239 for (frame, expected_field) in [
1242 (
1243 br#"{"command":"list-seeded","rid":"not-a-real-rid"}"#.as_slice(),
1244 "rid",
1245 ),
1246 (
1247 br#"{"command":"is-seeding","rid":"rad:z2u2CP3ZJzB7ZqE8jHrau19yjpdip","cid":"not-a-real-cid"}"#.as_slice(),
1248 "cid",
1249 ),
1250 ] {
1251 let mut stream = tokio::net::UnixStream::connect(&socket).await.unwrap();
1252 tokio::io::AsyncWriteExt::write_all(&mut stream, frame)
1253 .await
1254 .unwrap();
1255 tokio::io::AsyncWriteExt::write_all(&mut stream, b"\n")
1256 .await
1257 .unwrap();
1258 let mut buf = String::new();
1259 tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1260 .await
1261 .unwrap();
1262 let parsed: CommandResult<serde_json::Value> =
1263 serde_json::from_str(buf.trim()).unwrap();
1264 match parsed {
1265 CommandResult::Error(CommandError { code, message }) => {
1266 assert_eq!(code, ErrorCode::InvalidRequest);
1267 assert!(
1268 message.contains(expected_field),
1269 "message should name {expected_field}: {message}"
1270 );
1271 }
1272 CommandResult::Okay(_) => panic!("expected error, got ok"),
1273 }
1274 }
1275
1276 let client = Client::new(socket);
1278 client.shutdown().await.unwrap();
1279 node_handle.await.unwrap().unwrap();
1280 });
1281 }
1282
1283 #[test]
1286 fn double_start_errors() {
1287 let rt = tokio::runtime::Builder::new_multi_thread()
1288 .enable_all()
1289 .build()
1290 .unwrap();
1291 rt.block_on(async {
1292 let home = tempfile::tempdir().unwrap();
1293 let secret = iroh::SecretKey::from_bytes(&[2u8; 32]);
1294 let home_path = home.path().to_path_buf();
1295 let first = tokio::spawn(async move { run(&home_path, secret).await });
1296
1297 let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1298 for _ in 0..200 {
1299 if socket.exists() {
1300 break;
1301 }
1302 tokio::time::sleep(Duration::from_millis(50)).await;
1303 }
1304 assert!(socket.exists());
1305
1306 let secret2 = iroh::SecretKey::from_bytes(&[3u8; 32]);
1308 let err = run(home.path(), secret2).await.expect_err("must fail");
1309 assert!(matches!(err, NodeError::AlreadyRunning(_)));
1310
1311 let client = Client::new(socket);
1313 client.shutdown().await.unwrap();
1314 first.await.unwrap().unwrap();
1315 });
1316 }
1317
1318 async fn oneshot<T: serde::de::DeserializeOwned>(
1320 socket: &Path,
1321 cmd: &Command,
1322 ) -> CommandResult<T> {
1323 let mut stream = UnixStream::connect(socket).await.unwrap();
1324 let mut line = serde_json::to_string(cmd).unwrap();
1325 line.push('\n');
1326 tokio::io::AsyncWriteExt::write_all(&mut stream, line.as_bytes())
1327 .await
1328 .unwrap();
1329 let mut buf = String::new();
1330 tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1331 .await
1332 .unwrap();
1333 serde_json::from_str(buf.trim()).unwrap()
1334 }
1335
1336 async fn streaming<T: serde::de::DeserializeOwned>(
1338 socket: &Path,
1339 cmd: &Command,
1340 ) -> (usize, StreamEvent<T>) {
1341 let mut stream = UnixStream::connect(socket).await.unwrap();
1342 let mut line = serde_json::to_string(cmd).unwrap();
1343 line.push('\n');
1344 tokio::io::AsyncWriteExt::write_all(&mut stream, line.as_bytes())
1345 .await
1346 .unwrap();
1347 let mut buf = String::new();
1348 tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1349 .await
1350 .unwrap();
1351 let mut progress = 0usize;
1352 let mut terminal = None;
1353 for l in buf.lines().filter(|l| !l.trim().is_empty()) {
1354 match serde_json::from_str::<StreamEvent<T>>(l).unwrap() {
1355 StreamEvent::Progress(_) => progress += 1,
1356 term => terminal = Some(term),
1357 }
1358 }
1359 (progress, terminal.expect("a terminal frame"))
1360 }
1361
1362 #[test]
1365 fn has_and_export_round_trip() {
1366 let rt = tokio::runtime::Builder::new_multi_thread()
1367 .enable_all()
1368 .build()
1369 .unwrap();
1370 rt.block_on(async {
1371 let home = tempfile::tempdir().unwrap();
1372 let payload = b"hello has/export";
1373 let blob_path = home.path().join("payload.bin");
1374 fs::write(&blob_path, payload).unwrap();
1375 let cid = cid_utils::compute_blob_cid(&blob_path).unwrap();
1376 let rid = rid_a();
1377
1378 let secret = iroh::SecretKey::from_bytes(&[5u8; 32]);
1379 let home_path = home.path().to_path_buf();
1380 let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1381
1382 let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1383 for _ in 0..200 {
1384 if socket.exists() {
1385 break;
1386 }
1387 tokio::time::sleep(Duration::from_millis(50)).await;
1388 }
1389 assert!(socket.exists(), "control socket never appeared");
1390
1391 let client = Client::new(socket.clone());
1393 client
1394 .seed(
1395 rid,
1396 release_a(),
1397 cid,
1398 &blob_path,
1399 ArtifactKind::Blob,
1400 ImportMode::Copy,
1401 )
1402 .await
1403 .unwrap();
1404
1405 match oneshot::<HasResult>(&socket, &Command::Has { cid }).await {
1407 CommandResult::Okay(h) => {
1408 assert!(h.present);
1409 assert!(h.complete);
1410 assert_eq!(h.bytes, payload.len() as u64);
1411 }
1412 CommandResult::Error(e) => panic!("has errored: {e:?}"),
1413 }
1414
1415 let unknown = fake_blob_cid(b"never stored");
1417 match oneshot::<HasResult>(&socket, &Command::Has { cid: unknown }).await {
1418 CommandResult::Okay(h) => {
1419 assert!(!h.present);
1420 assert!(!h.complete);
1421 }
1422 CommandResult::Error(e) => panic!("has errored: {e:?}"),
1423 }
1424
1425 let dest = home.path().join("exported.bin");
1427 let (_progress, term) = streaming::<ExportReceipt>(
1428 &socket,
1429 &Command::Export {
1430 cid,
1431 dest: dest.clone(),
1432 },
1433 )
1434 .await;
1435 match term {
1436 StreamEvent::Okay(r) => {
1437 assert_eq!(r.bytes, payload.len() as u64);
1438 assert_eq!(r.dest, dest);
1439 }
1440 other => panic!("expected okay, got {other:?}"),
1441 }
1442 assert_eq!(fs::read(&dest).unwrap(), payload);
1443
1444 let (_p, term) = streaming::<ExportReceipt>(
1446 &socket,
1447 &Command::Export {
1448 cid: unknown,
1449 dest: home.path().join("nope.bin"),
1450 },
1451 )
1452 .await;
1453 match term {
1454 StreamEvent::Error(e) => assert_eq!(e.code, ErrorCode::NotLocal),
1455 other => panic!("expected NotLocal error, got {other:?}"),
1456 }
1457
1458 client.shutdown().await.unwrap();
1459 node_handle.await.unwrap().unwrap();
1460 });
1461 }
1462
1463 #[test]
1470 fn fetch_and_download_fast_path_and_no_locations() {
1471 let rt = tokio::runtime::Builder::new_multi_thread()
1472 .enable_all()
1473 .build()
1474 .unwrap();
1475 rt.block_on(async {
1476 let home = tempfile::tempdir().unwrap();
1477 let payload = b"hello fetch fast-path";
1478 let blob_path = home.path().join("payload.bin");
1479 fs::write(&blob_path, payload).unwrap();
1480 let cid = cid_utils::compute_blob_cid(&blob_path).unwrap();
1481 let rid = rid_a();
1482
1483 let secret = iroh::SecretKey::from_bytes(&[6u8; 32]);
1484 let home_path = home.path().to_path_buf();
1485 let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1486
1487 let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1488 for _ in 0..200 {
1489 if socket.exists() {
1490 break;
1491 }
1492 tokio::time::sleep(Duration::from_millis(50)).await;
1493 }
1494 assert!(socket.exists(), "control socket never appeared");
1495
1496 let client = Client::new(socket.clone());
1498 client
1499 .seed(
1500 rid,
1501 release_a(),
1502 cid,
1503 &blob_path,
1504 ArtifactKind::Blob,
1505 ImportMode::Copy,
1506 )
1507 .await
1508 .unwrap();
1509
1510 let (_p, term) = streaming::<FetchReceipt>(
1513 &socket,
1514 &Command::Fetch {
1515 rid,
1516 cid,
1517 locations: vec![],
1518 seed: None,
1519 },
1520 )
1521 .await;
1522 match term {
1523 StreamEvent::Okay(r) => {
1524 assert!(r.from_cache);
1525 assert!(!r.seeded);
1526 assert_eq!(r.bytes, payload.len() as u64);
1527 }
1528 other => panic!("expected okay, got {other:?}"),
1529 }
1530
1531 let dest = home.path().join("downloaded.bin");
1533 let (_p, term) = streaming::<DownloadReceipt>(
1534 &socket,
1535 &Command::Download {
1536 rid,
1537 cid,
1538 locations: vec![],
1539 dest: dest.clone(),
1540 seed: None,
1541 },
1542 )
1543 .await;
1544 match term {
1545 StreamEvent::Okay(r) => {
1546 assert!(r.from_cache);
1547 assert!(!r.seeded);
1548 assert_eq!(r.bytes, payload.len() as u64);
1549 assert_eq!(r.dest, dest);
1550 }
1551 other => panic!("expected okay, got {other:?}"),
1552 }
1553 assert_eq!(fs::read(&dest).unwrap(), payload);
1554
1555 let rid2 = RepoId::from_str("rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5").unwrap();
1558 let (_p, term) = streaming::<FetchReceipt>(
1559 &socket,
1560 &Command::Fetch {
1561 rid: rid2,
1562 cid,
1563 locations: vec![],
1564 seed: Some(release_a()),
1565 },
1566 )
1567 .await;
1568 match term {
1569 StreamEvent::Okay(r) => {
1570 assert!(r.from_cache);
1571 assert!(r.seeded);
1572 }
1573 other => panic!("expected okay, got {other:?}"),
1574 }
1575 assert!(client.is_seeding(rid2, cid).await.unwrap());
1576
1577 let unknown = fake_blob_cid(b"absent");
1579 let (_p, term) = streaming::<FetchReceipt>(
1580 &socket,
1581 &Command::Fetch {
1582 rid,
1583 cid: unknown,
1584 locations: vec![],
1585 seed: None,
1586 },
1587 )
1588 .await;
1589 match term {
1590 StreamEvent::Error(e) => assert_eq!(e.code, ErrorCode::NoLocations),
1591 other => panic!("expected NoLocations error, got {other:?}"),
1592 }
1593
1594 client.shutdown().await.unwrap();
1595 node_handle.await.unwrap().unwrap();
1596 });
1597 }
1598
1599 #[test]
1604 fn run_stream_aborts_on_client_disconnect() {
1605 use std::sync::atomic::AtomicBool;
1606
1607 let rt = tokio::runtime::Builder::new_multi_thread()
1608 .enable_all()
1609 .build()
1610 .unwrap();
1611 rt.block_on(async {
1612 let (client_end, server_end) = tokio::io::duplex(64);
1615 let (mut srv_read, mut srv_write) = tokio::io::split(server_end);
1616
1617 struct DropFlag(Arc<AtomicBool>);
1620 impl Drop for DropFlag {
1621 fn drop(&mut self) {
1622 self.0.store(true, Ordering::SeqCst);
1623 }
1624 }
1625 let aborted = Arc::new(AtomicBool::new(false));
1626 let completed = Arc::new(AtomicBool::new(false));
1627 let aborted_op = aborted.clone();
1628 let completed_op = completed.clone();
1629
1630 let op = async move |_tx: mpsc::UnboundedSender<FetchProgress>| {
1631 let _guard = DropFlag(aborted_op);
1632 std::future::pending::<()>().await;
1634 completed_op.store(true, Ordering::SeqCst);
1635 Ok::<(), (ErrorCode, String)>(())
1636 };
1637
1638 let server =
1639 tokio::spawn(
1640 async move { run_stream::<()>(&mut srv_read, &mut srv_write, op).await },
1641 );
1642
1643 tokio::time::sleep(Duration::from_millis(50)).await;
1646 drop(client_end); let res = tokio::time::timeout(Duration::from_secs(5), server)
1649 .await
1650 .expect("run_stream did not return after disconnect")
1651 .expect("join error");
1652 assert!(res.is_ok());
1653 assert!(
1654 aborted.load(Ordering::SeqCst),
1655 "op future was not dropped on disconnect"
1656 );
1657 assert!(
1658 !completed.load(Ordering::SeqCst),
1659 "op should have been aborted, not completed"
1660 );
1661 });
1662 }
1663
1664 #[test]
1667 fn stale_socket_is_reclaimed() {
1668 let rt = tokio::runtime::Builder::new_multi_thread()
1669 .enable_all()
1670 .build()
1671 .unwrap();
1672 rt.block_on(async {
1673 let home = tempfile::tempdir().unwrap();
1674 let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1675 fs::create_dir_all(socket.parent().unwrap()).unwrap();
1676 drop(UnixListener::bind(&socket).unwrap());
1678 assert!(socket.exists());
1679
1680 let secret = iroh::SecretKey::from_bytes(&[7u8; 32]);
1681 let home_path = home.path().to_path_buf();
1682 let handle = tokio::spawn(async move { run(&home_path, secret).await });
1683
1684 let client = Client::new(socket.clone());
1687 let mut ready = false;
1688 for _ in 0..200 {
1689 if client.is_running().await {
1690 ready = true;
1691 break;
1692 }
1693 tokio::time::sleep(Duration::from_millis(50)).await;
1694 }
1695 assert!(ready, "node never came up after reclaiming stale socket");
1696
1697 client.shutdown().await.unwrap();
1698 handle.await.unwrap().unwrap();
1699 });
1700 }
1701
1702 #[test]
1705 fn seed_missing_path_errors() {
1706 let rt = tokio::runtime::Builder::new_multi_thread()
1707 .enable_all()
1708 .build()
1709 .unwrap();
1710 rt.block_on(async {
1711 let home = tempfile::tempdir().unwrap();
1712 let secret = iroh::SecretKey::from_bytes(&[8u8; 32]);
1713 let (socket, handle) = start_node(home.path(), secret).await;
1714
1715 let client = Client::new(socket.clone());
1716 let missing = home.path().join("does-not-exist.bin");
1717 let err = client
1718 .seed(
1719 rid_a(),
1720 release_a(),
1721 fake_blob_cid(b"whatever"),
1722 &missing,
1723 ArtifactKind::Blob,
1724 ImportMode::Copy,
1725 )
1726 .await
1727 .expect_err("missing path must error");
1728 match err {
1729 radicle_artifact_client::ClientError::Remote(CommandError { code, .. }) => {
1730 assert_eq!(code, ErrorCode::PathNotFound);
1731 }
1732 other => panic!("expected PathNotFound, got {other:?}"),
1733 }
1734
1735 client.shutdown().await.unwrap();
1736 handle.await.unwrap().unwrap();
1737 });
1738 }
1739
1740 #[test]
1743 fn malformed_json_surfaces_as_invalid_request() {
1744 let rt = tokio::runtime::Builder::new_multi_thread()
1745 .enable_all()
1746 .build()
1747 .unwrap();
1748 rt.block_on(async {
1749 let home = tempfile::tempdir().unwrap();
1750 let secret = iroh::SecretKey::from_bytes(&[9u8; 32]);
1751 let (socket, handle) = start_node(home.path(), secret).await;
1752
1753 let mut stream = UnixStream::connect(&socket).await.unwrap();
1754 tokio::io::AsyncWriteExt::write_all(&mut stream, b"this is not json\n")
1755 .await
1756 .unwrap();
1757 let mut buf = String::new();
1758 tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1759 .await
1760 .unwrap();
1761 let parsed: CommandResult<serde_json::Value> =
1762 serde_json::from_str(buf.trim()).unwrap();
1763 match parsed {
1764 CommandResult::Error(CommandError { code, message }) => {
1765 assert_eq!(code, ErrorCode::InvalidRequest);
1766 assert!(
1767 message.contains("invalid command JSON"),
1768 "message should name the JSON failure: {message}"
1769 );
1770 }
1771 CommandResult::Okay(_) => panic!("expected error, got ok"),
1772 }
1773
1774 let client = Client::new(socket);
1775 client.shutdown().await.unwrap();
1776 handle.await.unwrap().unwrap();
1777 });
1778 }
1779}