Skip to main content

radicle_artifact_node/
node.rs

1//! Long-running `rad-artifact` node: owns the seeder, exposes a
2//! control socket, drives graceful shutdown on signals.
3//!
4//! [`run`] bootstraps the [`Seeder`](crate::seeder::Seeder), binds
5//! `<home>/artifacts/control.sock`, accepts one [`Command`] per
6//! connection and writes back one [`CommandResult`]. Parent-side
7//! helpers (detached spawn, passphrase resolution, log rotation,
8//! liveness polling) live in the `radicle-artifact` crate's `lifecycle` module.
9//!
10//! - tags survive shutdown — restart resumes seeding what was previously
11//!   tagged
12
13use std::io;
14use std::os::unix::fs::PermissionsExt;
15use std::path::{Path, PathBuf};
16use std::str::FromStr;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21use iroh_blobs::api::downloader::Downloader;
22use iroh_blobs::store::fs::FsStore;
23use iroh_blobs::HashAndFormat;
24use radicle::git::Oid;
25use radicle::identity::RepoId;
26use radicle_artifact_core::cid::Cid;
27use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
28use tokio::net::{UnixListener, UnixStream};
29use tokio::signal::unix::{signal, SignalKind};
30use tokio::sync::{broadcast, mpsc};
31use url::Url;
32
33use crate::seeder;
34use crate::{fetch, Error as ShareError};
35use radicle_artifact_client::tokio::Client;
36use radicle_artifact_core::cid::{self as cid_utils, ArtifactKind};
37use radicle_artifact_core::keys::EndpointId;
38use radicle_artifact_core::protocol::{
39    Command, CommandError, CommandResult, DownloadReceipt, ErrorCode, ExportReceipt, FetchLocation,
40    FetchProgress, FetchReceipt, HasResult, ImportMode, SeedReceipt, SeededEntry, Status,
41    StreamEvent, UnseedReceipt,
42};
43use radicle_artifact_core::ARTIFACTS_DIR;
44
45/// How long shutdown waits for in-flight handlers before forcing the
46/// router down anyway. Sized to outlast a large collection import so a
47/// shutdown mid-seed doesn't abort the write; a genuinely stuck handler
48/// still can't pin us past this bound.
49const DRAIN_TIMEOUT: Duration = Duration::from_secs(300);
50
51/// How long a connection may sit without sending its command line before
52/// we drop it. Bounds idle/half-open clients so they can't pin a handler
53/// task and fd indefinitely.
54const READ_TIMEOUT: Duration = Duration::from_secs(30);
55
56/// How long [`iroh::protocol::Router::shutdown`] gets before we give up
57/// and return.
58const ROUTER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2);
59
60/// Failure modes for [`run`].
61#[derive(Debug, thiserror::Error)]
62pub enum NodeError {
63    /// Another node is already bound to the control socket — bail out
64    /// instead of stealing it.
65    #[error("another rad-artifact node is already running at {0}")]
66    AlreadyRunning(PathBuf),
67    /// Seeder bootstrap or runtime error.
68    #[error(transparent)]
69    Share(#[from] ShareError),
70    /// Local I/O (mkdir, bind, chmod) failure.
71    #[error("node I/O error: {0}")]
72    Io(#[from] io::Error),
73}
74
75/// Shared per-node state passed to every connection handler.
76///
77/// Built once at startup and wrapped in an `Arc` so each spawned handler
78/// gets a cheap clone. Holds the single `FsStore` and the single
79/// [`Downloader`] (built on the seeder endpoint, pooling connections
80/// across all fetches), plus identity/uptime fields for `Status`.
81struct NodeCtx {
82    /// The persistent blob store.
83    store: FsStore,
84    /// Downloader bound to the seeder endpoint, reused across fetches.
85    downloader: Downloader,
86    /// The seeder endpoint, kept for its live connection/traffic metrics
87    /// (a clone shares the underlying counters with the router's endpoint).
88    endpoint: iroh::Endpoint,
89    /// Endpoint id the node serves on.
90    endpoint_id: EndpointId,
91    /// Unix timestamp (seconds) when the node bound its socket.
92    started_at_unix: i64,
93}
94
95/// Run the node in the foreground until it receives a shutdown signal
96/// or a [`Command::Shutdown`].
97///
98/// On entry the function:
99/// 1. bootstraps the seeder under `<home>/artifacts/`
100/// 2. probes the control socket — if a live owner answers, returns
101///    [`NodeError::AlreadyRunning`]; if the socket file exists but
102///    nothing answers, unlinks it
103/// 3. binds the socket with 0600 perms
104/// 4. installs SIGTERM/SIGINT handlers and runs the accept loop
105///
106/// On exit the function drains in-flight handlers (up to
107/// `DRAIN_TIMEOUT`) and shuts the iroh router down (capped by
108/// `ROUTER_SHUTDOWN_TIMEOUT`). The socket file is unlinked. Seeded
109/// tags are intentionally left in place so a restart resumes the prior
110/// set.
111pub async fn run(home: &Path, secret: iroh::SecretKey) -> Result<(), NodeError> {
112    let socket_path = home.join(ARTIFACTS_DIR).join("control.sock");
113    // ensure the parent directory exists before we probe / bind
114    if let Some(parent) = socket_path.parent() {
115        std::fs::create_dir_all(parent)?;
116    }
117
118    // Probe the socket BEFORE opening the FsStore: a live owner means
119    // we'd block on its single-writer lock, so cheap-fail here with a
120    // friendly error. A stale file is unlinked so bind() can succeed.
121    if socket_path.exists() {
122        let probe = Client::new(socket_path.clone());
123        if probe.is_running().await {
124            return Err(NodeError::AlreadyRunning(socket_path));
125        }
126        std::fs::remove_file(&socket_path)?;
127    }
128
129    let seeder = seeder::bootstrap(home, secret).await?;
130
131    let listener = UnixListener::bind(&socket_path)?;
132    std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o600))?;
133
134    let started_at_unix = SystemTime::now()
135        .duration_since(UNIX_EPOCH)
136        .map(|d| d.as_secs() as i64)
137        .unwrap_or(0);
138
139    let endpoint_id = EndpointId::from(seeder.router.endpoint().id());
140
141    tracing::info!(
142        endpoint_id = %endpoint_id,
143        socket = %socket_path.display(),
144        "rad-artifact node ready"
145    );
146
147    // One Downloader on the seeder endpoint, reused across all fetches so
148    // the connection pool is shared (no per-fetch endpoint bind).
149    let downloader = Downloader::new_with_opts(
150        seeder.blobs.as_ref(),
151        seeder.router.endpoint(),
152        fetch::pool_options(),
153    );
154    let ctx = Arc::new(NodeCtx {
155        store: seeder.blobs.clone(),
156        downloader,
157        endpoint: seeder.router.endpoint().clone(),
158        endpoint_id,
159        started_at_unix,
160    });
161
162    // Subscribe before installing the signal handler: a signal that
163    // arrives during startup must land in a live receiver, otherwise the
164    // broadcast (which doesn't buffer for absent receivers) drops it and
165    // the accept loop never sees the shutdown.
166    let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(8);
167    spawn_signal_handler(shutdown_tx.clone());
168
169    let in_flight = Arc::new(AtomicUsize::new(0));
170
171    loop {
172        tokio::select! {
173            biased;
174            res = shutdown_rx.recv() => {
175                // any send (signal or Command::Shutdown) -> stop accepting
176                if res.is_ok() { break; }
177            }
178            accept = listener.accept() => {
179                let (stream, _addr) = match accept {
180                    Ok(v) => v,
181                    Err(e) => {
182                        tracing::warn!("accept error: {e}");
183                        continue;
184                    }
185                };
186                let ctx = ctx.clone();
187                let shutdown_tx = shutdown_tx.clone();
188                let in_flight = in_flight.clone();
189                in_flight.fetch_add(1, Ordering::SeqCst);
190                tokio::spawn(async move {
191                    if let Err(e) = handle_connection(stream, &ctx, &shutdown_tx).await {
192                        tracing::warn!("handler error: {e}");
193                    }
194                    in_flight.fetch_sub(1, Ordering::SeqCst);
195                });
196            }
197        }
198    }
199
200    // stop accepting new connections (file inode still alive until unlink)
201    drop(listener);
202    let _ = std::fs::remove_file(&socket_path);
203
204    // wait for in-flight handlers; bounded so a stuck handler can't pin us
205    let drain_deadline = Instant::now() + DRAIN_TIMEOUT;
206    while in_flight.load(Ordering::SeqCst) > 0 && Instant::now() < drain_deadline {
207        tokio::time::sleep(Duration::from_millis(50)).await;
208    }
209
210    let _ = tokio::time::timeout(ROUTER_SHUTDOWN_TIMEOUT, seeder.router.shutdown()).await;
211    tracing::info!("rad-artifact node stopped");
212    Ok(())
213}
214
215/// Watch SIGTERM/SIGINT in the background and broadcast a shutdown signal.
216fn spawn_signal_handler(shutdown_tx: broadcast::Sender<()>) {
217    tokio::spawn(async move {
218        let Ok(mut term) = signal(SignalKind::terminate()) else {
219            return;
220        };
221        let Ok(mut int) = signal(SignalKind::interrupt()) else {
222            return;
223        };
224        tokio::select! {
225            _ = term.recv() => {}
226            _ = int.recv() => {}
227        }
228        let _ = shutdown_tx.send(());
229    });
230}
231
232/// Read one command, then either write one response (one-shot commands)
233/// or stream frames (`Fetch`/`Export`), and close.
234async fn handle_connection(
235    stream: UnixStream,
236    ctx: &NodeCtx,
237    shutdown_tx: &broadcast::Sender<()>,
238) -> io::Result<()> {
239    let (read, mut write) = stream.into_split();
240    let mut reader = BufReader::new(read);
241    let mut line = String::new();
242    // Bound the wait for the command line so a connected-but-silent
243    // client can't pin this handler (and its fd) forever.
244    let n = match tokio::time::timeout(READ_TIMEOUT, reader.read_line(&mut line)).await {
245        Ok(res) => res?,
246        Err(_) => return Ok(()),
247    };
248    if n == 0 {
249        return Ok(());
250    }
251
252    match parse_command(line.trim_end()) {
253        // Streaming commands write their own frames directly. They also get
254        // the read half to watch for client disconnect during silent phases.
255        Ok(Command::Export { cid, dest }) => {
256            stream_export(ctx, &mut reader, &mut write, cid, dest).await
257        }
258        Ok(Command::Fetch {
259            rid,
260            cid,
261            locations,
262            seed,
263        }) => stream_fetch(ctx, &mut reader, &mut write, rid, cid, locations, seed).await,
264        Ok(Command::Download {
265            rid,
266            cid,
267            locations,
268            dest,
269            seed,
270        }) => {
271            stream_download(
272                ctx,
273                &mut reader,
274                &mut write,
275                rid,
276                cid,
277                locations,
278                dest,
279                seed,
280            )
281            .await
282        }
283        // One-shot commands return a single JSON line.
284        Ok(cmd) => write_line(&mut write, dispatch(cmd, ctx, shutdown_tx).await).await,
285        Err((code, msg)) => write_line(&mut write, err_json::<()>(code, msg)).await,
286    }
287}
288
289/// Write a single JSON response line (with trailing newline) and flush.
290async fn write_line(
291    write: &mut (impl AsyncWriteExt + Unpin),
292    mut response: String,
293) -> io::Result<()> {
294    response.push('\n');
295    write.write_all(response.as_bytes()).await?;
296    write.flush().await
297}
298
299/// Two-step wire decode: catch malformed `rid` (and other typed fields)
300/// with a message that names the field, instead of leaking the
301/// underlying serde error verbatim (`Unknown base code: n` etc.).
302fn parse_command(line: &str) -> Result<Command, (ErrorCode, String)> {
303    let value: serde_json::Value = serde_json::from_str(line).map_err(|e| {
304        (
305            ErrorCode::InvalidRequest,
306            format!("invalid command JSON: {e}"),
307        )
308    })?;
309    if let Some(rid_s) = value.get("rid").and_then(|v| v.as_str()) {
310        if let Err(e) = RepoId::from_str(rid_s) {
311            return Err((
312                ErrorCode::InvalidRequest,
313                format!("invalid rid {rid_s:?}: {e}"),
314            ));
315        }
316    }
317    if let Some(cid_s) = value.get("cid").and_then(|v| v.as_str()) {
318        if let Err(e) = Cid::from_str(cid_s) {
319            return Err((
320                ErrorCode::InvalidRequest,
321                format!("invalid cid {cid_s:?}: {e}"),
322            ));
323        }
324    }
325    serde_json::from_value(value)
326        .map_err(|e| (ErrorCode::InvalidRequest, format!("invalid command: {e}")))
327}
328
329/// Dispatch a one-shot command, returning the JSON line to send back
330/// (without trailing newline). Streaming commands (`Fetch`, `Export`) are
331/// handled in [`handle_connection`] before reaching here.
332async fn dispatch(cmd: Command, ctx: &NodeCtx, shutdown_tx: &broadcast::Sender<()>) -> String {
333    let store = &ctx.store;
334    match cmd {
335        // Cheap liveness probe: touch no state, just ack.
336        Command::Alive => ok_json(()),
337        Command::Status => {
338            match build_status(store, &ctx.endpoint, ctx.endpoint_id, ctx.started_at_unix).await {
339                Ok(status) => ok_json(status),
340                Err(e) => err_from_share::<Status>(e),
341            }
342        }
343        Command::Seed {
344            rid,
345            release,
346            cid,
347            path,
348            kind,
349            mode,
350        } => seed_response(store, rid, release, cid, &path, kind, mode, ctx.endpoint_id).await,
351        Command::Unseed { rid, release, cid } => unseed_response(store, rid, release, cid).await,
352        Command::IsSeeding { rid, cid } => is_seeding_response(store, &rid, &cid).await,
353        Command::ListSeeded { rid } => list_seeded_response(store, rid).await,
354        Command::Has { cid } => has_response(store, &cid).await,
355        // Intercepted in handle_connection; never reaches dispatch.
356        Command::Export { .. } | Command::Fetch { .. } | Command::Download { .. } => {
357            unreachable!("streaming commands are handled before dispatch")
358        }
359        Command::Shutdown => {
360            // ack first, then broadcast so the loop tears down after the
361            // response makes it to the wire
362            let resp = ok_json(());
363            let _ = shutdown_tx.send(());
364            resp
365        }
366    }
367}
368
369/// Resolve a CID to the iroh hash+format pair, or a wire error.
370fn hash_and_format(cid: &Cid) -> Result<HashAndFormat, (ErrorCode, String)> {
371    let hash = cid_utils::cid_to_blake3_hash(cid)
372        .map_err(|e| (ErrorCode::InvalidRequest, e.to_string()))?;
373    match cid_utils::artifact_kind(cid) {
374        Ok(ArtifactKind::Blob) => Ok(HashAndFormat::raw(hash.into())),
375        Ok(ArtifactKind::Collection) => Ok(HashAndFormat::hash_seq(hash.into())),
376        Err(e) => Err((ErrorCode::InvalidRequest, e.to_string())),
377    }
378}
379
380/// `Has`: report local presence/completeness for a CID. Hash-keyed, so it
381/// answers regardless of which repo (if any) tagged the content.
382async fn has_response(store: &FsStore, cid: &Cid) -> String {
383    let haf = match hash_and_format(cid) {
384        Ok(h) => h,
385        Err((code, msg)) => return err_json::<HasResult>(code, msg),
386    };
387    match store.remote().local(haf).await {
388        Ok(info) => {
389            let bytes = info.local_bytes();
390            ok_json(HasResult {
391                present: bytes > 0,
392                complete: info.is_complete(),
393                bytes,
394            })
395        }
396        Err(e) => err_json::<HasResult>(ErrorCode::Iroh, format!("local lookup: {e}")),
397    }
398}
399
400/// Serialize and write one stream frame as a JSON line, then flush.
401async fn write_frame<T: serde::Serialize>(
402    write: &mut (impl AsyncWriteExt + Unpin),
403    event: &StreamEvent<T>,
404) -> io::Result<()> {
405    let mut line = serde_json::to_string(event).unwrap_or_else(|e| {
406        format!(r#"{{"error":{{"code":"internal","message":"encode: {e}"}}}}"#)
407    });
408    line.push('\n');
409    write.write_all(line.as_bytes()).await?;
410    write.flush().await
411}
412
413/// Shorthand for writing a terminal error frame.
414async fn stream_error<T: serde::Serialize>(
415    write: &mut (impl AsyncWriteExt + Unpin),
416    code: ErrorCode,
417    message: String,
418) -> io::Result<()> {
419    write_frame::<T>(write, &StreamEvent::Error(CommandError { code, message })).await
420}
421
422/// Drive a streaming operation: forward `FetchProgress` frames as they
423/// arrive on the channel, then write the terminal okay/error frame.
424///
425/// Disconnect-to-abort works two ways, so a vanished client never leaves
426/// `op` running: a failed frame write propagates its error, and `read` is
427/// polled for EOF (or any unexpected inbound byte) even during silent
428/// phases that emit no frames (a long export or HTTP body). Either path
429/// returns, dropping `op` and aborting the in-flight download/export.
430async fn run_stream<T: serde::Serialize>(
431    read: &mut (impl AsyncReadExt + Unpin),
432    write: &mut (impl AsyncWriteExt + Unpin),
433    op: impl AsyncFnOnce(mpsc::UnboundedSender<FetchProgress>) -> Result<T, (ErrorCode, String)>,
434) -> io::Result<()> {
435    let (tx, mut rx) = mpsc::unbounded_channel::<FetchProgress>();
436    let fut = op(tx);
437    tokio::pin!(fut);
438    let mut probe = [0u8; 1];
439    loop {
440        tokio::select! {
441            biased;
442            res = &mut fut => {
443                // Flush any progress buffered before completion.
444                while let Ok(p) = rx.try_recv() {
445                    write_frame(write, &StreamEvent::<T>::Progress(p)).await?;
446                }
447                let event = match res {
448                    Ok(payload) => StreamEvent::Okay(payload),
449                    Err((code, message)) => StreamEvent::Error(CommandError { code, message }),
450                };
451                return write_frame(write, &event).await;
452            }
453            Some(p) = rx.recv() => {
454                write_frame(write, &StreamEvent::<T>::Progress(p)).await?;
455            }
456            // The client should stay silent until the terminal frame; a read
457            // readiness means it closed (EOF) or went away. Stop and drop
458            // `op`. read() is cancel-safe here: with no inbound bytes it just
459            // stays pending, so being dropped by another arm loses nothing.
460            _ = read.read(&mut probe) => {
461                return Ok(());
462            }
463        }
464    }
465}
466
467/// `Export`: stream already-local bytes to `dest`. Errors with `NotLocal`
468/// if the content isn't complete in the store.
469async fn stream_export(
470    ctx: &NodeCtx,
471    read: &mut (impl AsyncReadExt + Unpin),
472    write: &mut (impl AsyncWriteExt + Unpin),
473    cid: Cid,
474    dest: PathBuf,
475) -> io::Result<()> {
476    let kind = match cid_utils::artifact_kind(&cid) {
477        Ok(k) => k,
478        Err(e) => {
479            return stream_error::<ExportReceipt>(write, ErrorCode::InvalidRequest, e.to_string())
480                .await
481        }
482    };
483    let haf = match hash_and_format(&cid) {
484        Ok(h) => h,
485        Err((code, msg)) => return stream_error::<ExportReceipt>(write, code, msg).await,
486    };
487    let hash = haf.hash;
488
489    // Export needs the bytes already complete locally.
490    match ctx.store.blobs().has(hash).await {
491        Ok(true) => {}
492        Ok(false) => {
493            return stream_error::<ExportReceipt>(
494                write,
495                ErrorCode::NotLocal,
496                format!("content for {cid} is not complete in the store"),
497            )
498            .await
499        }
500        Err(e) => {
501            return stream_error::<ExportReceipt>(
502                write,
503                ErrorCode::Iroh,
504                format!("local lookup: {e}"),
505            )
506            .await
507        }
508    }
509
510    run_stream(read, write, async move |tx| {
511        let on_progress = move |p| {
512            let _ = tx.send(p);
513        };
514        let bytes = match kind {
515            ArtifactKind::Blob => fetch::export_blob_to(&ctx.store, hash, &dest, on_progress).await,
516            ArtifactKind::Collection => {
517                fetch::export_collection_to(&ctx.store, hash, &dest, on_progress).await
518            }
519        }
520        .map_err(|e| (share_error_to_code(&e), e.to_string()))?;
521        Ok(ExportReceipt { cid, dest, bytes })
522    })
523    .await
524}
525
526/// Split resolved locations into iroh providers and HTTP URLs.
527fn partition(locations: &[FetchLocation]) -> (Vec<EndpointId>, Vec<Url>) {
528    let mut iroh = Vec::new();
529    let mut urls = Vec::new();
530    for loc in locations {
531        match loc {
532            FetchLocation::Iroh(id) => iroh.push(*id),
533            FetchLocation::Url(u) => urls.push(u.clone()),
534        }
535    }
536    (iroh, urls)
537}
538
539/// Export `hash` to `dest` per `kind`, mapping errors to a wire code.
540async fn export_to_dest(
541    store: &FsStore,
542    hash: iroh_blobs::Hash,
543    kind: ArtifactKind,
544    dest: &Path,
545    on_progress: impl FnMut(FetchProgress),
546) -> Result<u64, (ErrorCode, String)> {
547    match kind {
548        ArtifactKind::Blob => fetch::export_blob_to(store, hash, dest, on_progress).await,
549        ArtifactKind::Collection => {
550            fetch::export_collection_to(store, hash, dest, on_progress).await
551        }
552    }
553    .map_err(|e| (share_error_to_code(&e), e.to_string()))
554}
555
556/// An Artifact fetched complete into the store: the bytes are present and
557/// held alive by a [Temp Tag](iroh_blobs::api::TempTag) until this value
558/// drops. The caller keeps it until any Seeded Tag is set, so there's no GC
559/// window between releasing the transient protection and the persistent tag
560/// landing; an error or client disconnect drops it and GC reclaims the bytes.
561struct Fetched {
562    /// Artifact kind derived from the CID (for export dispatch).
563    kind: ArtifactKind,
564    /// BLAKE3 hash of the content.
565    hash: iroh_blobs::Hash,
566    /// `true` if the bytes were already complete locally (no network used).
567    from_cache: bool,
568    /// Transient GC protection; released on drop, never read directly.
569    _tt: iroh_blobs::api::TempTag,
570}
571
572/// Fetch `cid` into the store: a no-op if the bytes are already complete
573/// locally, otherwise download from `locations` (iroh providers batched into
574/// one multi-provider download, HTTP URLs tried in sequence as a blob-only
575/// fallback) and re-check completeness against the store. The kind and hash
576/// are derived from `cid`.
577///
578/// Returns the complete, protected bytes — hold the result until any Seeded
579/// Tag is set, then let it drop. On failure the temp tag is dropped here so
580/// GC reclaims any partial.
581async fn fetch_into_store(
582    ctx: &NodeCtx,
583    cid: &Cid,
584    locations: &[FetchLocation],
585    on_progress: &mut impl FnMut(FetchProgress),
586) -> Result<Fetched, (ErrorCode, String)> {
587    let kind =
588        cid_utils::artifact_kind(cid).map_err(|e| (ErrorCode::InvalidRequest, e.to_string()))?;
589    let haf = hash_and_format(cid)?;
590    let hash = haf.hash;
591    let store = &ctx.store;
592
593    // Tag the CID to prevent GC before doing anything else. This covers both paths:
594    // cached bytes that exist only as an untagged GC cache (a prior non-seed
595    // fetch) can't be GCed between the completeness check and export/seed
596    // on the fast path, and in-flight bytes are protected on the download
597    // path. If the returned `Fetched` is dropped (disconnect) the temp tag
598    // drops with it.
599    let tt = store
600        .tags()
601        .temp_tag(haf)
602        .await
603        .map_err(|e| (ErrorCode::Iroh, format!("temp tag: {e}")))?;
604
605    // Fast path: bytes already complete locally.
606    let already = store
607        .blobs()
608        .has(hash)
609        .await
610        .map_err(|e| (ErrorCode::Iroh, format!("local lookup: {e}")))?;
611    if already {
612        return Ok(Fetched {
613            kind,
614            hash,
615            from_cache: true,
616            _tt: tt,
617        });
618    }
619
620    on_progress(FetchProgress::Connecting);
621
622    let (iroh_ids, urls) = partition(locations);
623    let no_locations = iroh_ids.is_empty() && urls.is_empty();
624    let mut errors: Vec<String> = Vec::new();
625    let mut got = false;
626
627    if !iroh_ids.is_empty() {
628        match fetch::download_iroh_to_store(
629            &ctx.downloader,
630            store,
631            haf,
632            iroh_ids,
633            &mut *on_progress,
634        )
635        .await
636        {
637            Ok(()) => got = true,
638            Err(errs) => errors.extend(errs.into_iter().map(|e| e.to_string())),
639        }
640    }
641    if !got {
642        match kind {
643            // HTTP is a blob-only fallback; stop at the first success
644            // (completeness is re-checked against the store below).
645            ArtifactKind::Blob => {
646                for url in &urls {
647                    match fetch::http_to_store(store, url, cid, &mut *on_progress).await {
648                        Ok(_) => break,
649                        Err(e) => errors.push(e.to_string()),
650                    }
651                }
652            }
653            ArtifactKind::Collection => {
654                for url in &urls {
655                    errors.push(format!("HTTP fetch unsupported for collection: {url}"));
656                }
657            }
658        }
659    }
660
661    // Trust the store: complete means we have it, whatever individual
662    // providers reported.
663    let complete = store
664        .remote()
665        .local(haf)
666        .await
667        .map(|i| i.is_complete())
668        .unwrap_or(false);
669    if !complete {
670        drop(tt); // release protection so GC reclaims the partial
671        let code = if no_locations {
672            ErrorCode::NoLocations
673        } else {
674            ErrorCode::AllFailed
675        };
676        let msg = if errors.is_empty() {
677            "no locations succeeded".to_string()
678        } else {
679            errors.join("; ")
680        };
681        return Err((code, msg));
682    }
683
684    Ok(Fetched {
685        kind,
686        hash,
687        from_cache: false,
688        _tt: tt,
689    })
690}
691
692/// `Fetch`: pull the artifact complete into the store (no-op if already
693/// local, else download from `locations`), tagging as seeded if requested.
694/// Writes nothing to disk — see [`stream_download`].
695///
696/// Abort-safe: the [`Fetched`] temp tag protects the content until the Seeded
697/// Tag is set and the value drops at the end of the closure; a disconnect
698/// mid-stream drops this future via [`run_stream`], releasing the tag so GC
699/// reclaims any partial.
700async fn stream_fetch(
701    ctx: &NodeCtx,
702    read: &mut (impl AsyncReadExt + Unpin),
703    write: &mut (impl AsyncWriteExt + Unpin),
704    rid: RepoId,
705    cid: Cid,
706    locations: Vec<FetchLocation>,
707    seed: Option<Oid>,
708) -> io::Result<()> {
709    let endpoint_id = ctx.endpoint_id;
710
711    run_stream(read, write, async move |tx| {
712        let mut on_progress = move |p| {
713            let _ = tx.send(p);
714        };
715        let store = &ctx.store;
716
717        let fetched = fetch_into_store(ctx, &cid, &locations, &mut on_progress).await?;
718        let seeded = tag_if_seeding(store, &rid, seed.as_ref(), &cid, fetched.hash)
719            .await
720            .map_err(|e| (share_error_to_code(&e), e.to_string()))?;
721
722        // Logical size now complete in the store; no disk write.
723        let bytes = seeder::artifact_size_for(store, &cid, fetched.hash).await;
724        Ok(FetchReceipt {
725            rid,
726            cid,
727            bytes,
728            from_cache: fetched.from_cache,
729            seeded,
730            endpoint_id,
731        })
732        // `fetched` (and its temp tag) drops here, after any Seeded Tag is set.
733    })
734    .await
735}
736
737/// `Download`: [`Fetch`](stream_fetch) into the store, then export to `dest`.
738/// Fast-path export if already local; tags as seeded if requested.
739///
740/// Abort-safe identically to [`stream_fetch`]: the [`Fetched`] temp tag
741/// protects the content across the download AND the export (the export runs
742/// before the Seeded Tag is set), so the bytes can't be reclaimed mid-export;
743/// a disconnect drops this future and the protection.
744#[allow(clippy::too_many_arguments)]
745async fn stream_download(
746    ctx: &NodeCtx,
747    read: &mut (impl AsyncReadExt + Unpin),
748    write: &mut (impl AsyncWriteExt + Unpin),
749    rid: RepoId,
750    cid: Cid,
751    locations: Vec<FetchLocation>,
752    dest: PathBuf,
753    seed: Option<Oid>,
754) -> io::Result<()> {
755    let endpoint_id = ctx.endpoint_id;
756
757    run_stream(read, write, async move |tx| {
758        let mut on_progress = move |p| {
759            let _ = tx.send(p);
760        };
761        let store = &ctx.store;
762
763        let fetched = fetch_into_store(ctx, &cid, &locations, &mut on_progress).await?;
764
765        // Export inside the protected window, before the Seeded Tag is set.
766        let bytes =
767            export_to_dest(store, fetched.hash, fetched.kind, &dest, &mut on_progress).await?;
768        let seeded = tag_if_seeding(store, &rid, seed.as_ref(), &cid, fetched.hash)
769            .await
770            .map_err(|e| (share_error_to_code(&e), e.to_string()))?;
771
772        Ok(DownloadReceipt {
773            rid,
774            cid,
775            dest,
776            bytes,
777            from_cache: fetched.from_cache,
778            seeded,
779            endpoint_id,
780        })
781        // `fetched` (and its temp tag) drops here, after any Seeded Tag is set.
782    })
783    .await
784}
785
786/// Tag `(rid, release, cid)` as seeded when a fetch/download asked to seed
787/// under `release`, returning whether a tag was set.
788///
789/// `seed: None` leaves the bytes untagged; the caller already has them in the
790/// store. Pairing seed intent with its release in one `Option` means there's
791/// no unscoped-seed case to handle here.
792async fn tag_if_seeding(
793    store: &iroh_blobs::api::Store,
794    rid: &RepoId,
795    seed: Option<&Oid>,
796    cid: &Cid,
797    hash: iroh_blobs::Hash,
798) -> Result<bool, ShareError> {
799    let Some(release) = seed else {
800        return Ok(false);
801    };
802    seeder::tag_seeded(store, rid, release, cid, hash).await?;
803    Ok(true)
804}
805
806#[allow(clippy::too_many_arguments)]
807async fn seed_response(
808    store: &FsStore,
809    rid: RepoId,
810    release: Oid,
811    cid: Cid,
812    path: &Path,
813    kind: ArtifactKind,
814    mode: ImportMode,
815    endpoint_id: EndpointId,
816) -> String {
817    if !path.exists() {
818        return err_json::<SeedReceipt>(
819            ErrorCode::PathNotFound,
820            format!("path not found: {}", path.display()),
821        );
822    }
823
824    let was_already = match seeder::is_seeded(store, &rid, &release, &cid).await {
825        Ok(v) => v,
826        Err(e) => return err_from_share::<SeedReceipt>(e),
827    };
828    let hash = match seeder::seed_artifact(store, &rid, &release, &cid, path, kind, mode).await {
829        Ok(hash) => hash,
830        Err(e) => return err_from_share::<SeedReceipt>(e),
831    };
832    let bytes = seeder::artifact_size_for(store, &cid, hash).await;
833    let receipt = SeedReceipt {
834        rid,
835        cid,
836        endpoint_id,
837        bytes,
838        was_new: !was_already,
839    };
840    ok_json(receipt)
841}
842
843/// `release: Some(id)` drops that one release's tag; `None` stops seeding the
844/// CID across every release of `rid`. `was_removed` reports whether the
845/// removal actually dropped a tag, read from the delete itself so a
846/// concurrent unseed can't make it lie.
847async fn unseed_response(store: &FsStore, rid: RepoId, release: Option<Oid>, cid: Cid) -> String {
848    let was_removed = match &release {
849        Some(release) => match seeder::untag_seeded(store, &rid, release, &cid).await {
850            Ok(removed) => removed,
851            Err(e) => return err_from_share::<UnseedReceipt>(e),
852        },
853        None => match seeder::untag_all(store, &rid, &cid).await {
854            Ok(removed) => removed > 0,
855            Err(e) => return err_from_share::<UnseedReceipt>(e),
856        },
857    };
858    ok_json(UnseedReceipt {
859        rid,
860        cid,
861        was_removed,
862    })
863}
864
865async fn is_seeding_response(store: &FsStore, rid: &RepoId, cid: &Cid) -> String {
866    match seeder::is_seeded_any(store, rid, cid).await {
867        Ok(v) => ok_json(v),
868        Err(e) => err_from_share::<bool>(e),
869    }
870}
871
872async fn list_seeded_response(store: &FsStore, rid: RepoId) -> String {
873    let cids = match seeder::seeded_cids(store, &rid).await {
874        Ok(v) => v,
875        Err(e) => return err_from_share::<Vec<SeededEntry>>(e),
876    };
877    let mut out = Vec::with_capacity(cids.len());
878    for (cid, hash) in cids {
879        let bytes = seeder::artifact_size_for(store, &cid, hash).await;
880        out.push(SeededEntry { cid, bytes });
881    }
882    ok_json(out)
883}
884
885async fn build_status(
886    store: &FsStore,
887    endpoint: &iroh::Endpoint,
888    endpoint_id: EndpointId,
889    started_at_unix: i64,
890) -> Result<Status, ShareError> {
891    let metrics = endpoint.metrics();
892    // A blob shared across releases carries one tag per release; collapse to
893    // distinct blobs so the count and byte total reflect what's on disk.
894    let distinct: std::collections::HashMap<iroh_blobs::Hash, Cid> = seeder::all_seeded(store)
895        .await?
896        .into_iter()
897        .map(|(_rid, _release, cid, hash)| (hash, cid))
898        .collect();
899    let count = distinct.len();
900    let mut bytes_logical = 0u64;
901    for (hash, cid) in &distinct {
902        bytes_logical =
903            bytes_logical.saturating_add(seeder::artifact_size_for(store, cid, *hash).await);
904    }
905    // Socket counters are byte totals (sends include disco frames; recv
906    // counts data only) and connection/path tallies — see the field docs
907    // on `ConnectionStats`/`TrafficStats` for the disco-vs-data split.
908    let s = &metrics.socket;
909    let opened_total = s.num_conns_opened.get();
910    let closed_total = s.num_conns_closed.get();
911    let connections = radicle_artifact_core::protocol::ConnectionStats {
912        active: opened_total.saturating_sub(closed_total) as u32,
913        opened_total,
914        closed_total,
915        direct_total: s.num_conns_direct.get(),
916        holepunch_attempts: s.holepunch_attempts.get(),
917        paths_direct: s.paths_direct.get(),
918        paths_relayed: s.paths_relay.get(),
919    };
920    let traffic = radicle_artifact_core::protocol::TrafficStats {
921        out_bytes: s
922            .send_ipv4
923            .get()
924            .saturating_add(s.send_ipv6.get())
925            .saturating_add(s.send_relay.get()),
926        in_bytes: s
927            .recv_data_ipv4
928            .get()
929            .saturating_add(s.recv_data_ipv6.get())
930            .saturating_add(s.recv_data_relay.get())
931            .saturating_add(s.recv_data_custom.get()),
932    };
933    let relay = relay_stats(endpoint);
934    // A bound socket with no connected home relay is reachable only by peers
935    // that can holepunch a direct path; flag it as advice.
936    let relay_unreachable = !relay.relays.iter().any(|r| r.connected);
937    Ok(Status {
938        endpoint_id,
939        started_at_unix,
940        seeded: radicle_artifact_core::protocol::SeededStats {
941            count,
942            bytes_logical,
943        },
944        connections,
945        traffic,
946        relay,
947        warnings: radicle_artifact_core::protocol::Warnings { relay_unreachable },
948    })
949}
950
951/// Snapshot home-relay connectivity and latency from the live endpoint.
952///
953/// `home_relay_status()` gives connection state per relay; `net_report()`
954/// (best-effort, may be empty before the first probe lands) supplies the
955/// preferred relay, UDP reachability, and per-relay round-trip latency.
956fn relay_stats(endpoint: &iroh::Endpoint) -> radicle_artifact_core::protocol::RelayStats {
957    use iroh::Watcher;
958
959    let report = endpoint.net_report().get();
960    // Lowest measured latency per relay URL across probe types (v4/v6/https).
961    let mut latency_ms: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
962    if let Some(r) = &report {
963        for (_probe, url, dur) in r.relay_latency.iter() {
964            let ms = dur.as_millis() as u64;
965            latency_ms
966                .entry(url.to_string())
967                .and_modify(|m| *m = (*m).min(ms))
968                .or_insert(ms);
969        }
970    }
971
972    let relays = endpoint
973        .home_relay_status()
974        .get()
975        .into_iter()
976        .map(|s| {
977            let url = s.url().to_string();
978            radicle_artifact_core::protocol::RelayHealth {
979                latency_ms: latency_ms.get(&url).copied(),
980                connected: s.is_connected(),
981                last_error: s.last_error().map(|e| e.to_string()),
982                url,
983            }
984        })
985        .collect();
986
987    radicle_artifact_core::protocol::RelayStats {
988        relays,
989        preferred: report
990            .as_ref()
991            .and_then(|r| r.preferred_relay.as_ref().map(|u| u.to_string())),
992        udp_v4: report.as_ref().map(|r| r.udp_v4).unwrap_or(false),
993        udp_v6: report.as_ref().map(|r| r.udp_v6).unwrap_or(false),
994    }
995}
996
997fn ok_json<T: serde::Serialize>(v: T) -> String {
998    serde_json::to_string(&CommandResult::Okay(v))
999        .unwrap_or_else(|e| format!(r#"{{"error":{{"code":"internal","message":"encode: {e}"}}}}"#))
1000}
1001
1002fn err_json<T>(code: ErrorCode, message: String) -> String
1003where
1004    T: serde::Serialize,
1005{
1006    serde_json::to_string(&CommandResult::<T>::Error(CommandError { code, message }))
1007        .unwrap_or_else(|e| format!(r#"{{"error":{{"code":"internal","message":"encode: {e}"}}}}"#))
1008}
1009
1010fn err_from_share<T>(e: ShareError) -> String
1011where
1012    T: serde::Serialize,
1013{
1014    let code = share_error_to_code(&e);
1015    err_json::<T>(code, e.to_string())
1016}
1017
1018fn share_error_to_code(e: &ShareError) -> ErrorCode {
1019    match e {
1020        ShareError::CidMismatch { .. } => ErrorCode::CidMismatch,
1021        ShareError::Io(_) => ErrorCode::Io,
1022        ShareError::Iroh(_) => ErrorCode::Iroh,
1023        _ => ErrorCode::Internal,
1024    }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use std::fs;
1030
1031    use cid::multihash::Multihash;
1032
1033    use super::*;
1034    use radicle_artifact_core::cid::{
1035        self as cid_utils, ArtifactKind, HASH_CODE_BLAKE3, RAW_CODEC,
1036    };
1037
1038    /// Build a fake but well-formed blob CID over `data` so the
1039    /// `tag_seeded` path picks `HashAndFormat::raw`.
1040    fn fake_blob_cid(data: &[u8]) -> Cid {
1041        let digest = blake3::hash(data);
1042        let mh = Multihash::<64>::wrap(HASH_CODE_BLAKE3, digest.as_bytes()).unwrap();
1043        Cid::from(cid::Cid::new_v1(RAW_CODEC, mh))
1044    }
1045
1046    fn rid_a() -> RepoId {
1047        RepoId::from_str("rad:z2u2CP3ZJzB7ZqE8jHrau19yjpdip").unwrap()
1048    }
1049
1050    fn release_a() -> Oid {
1051        Oid::from_str("0123456789abcdef0123456789abcdef01234567").unwrap()
1052    }
1053
1054    /// Spawn a node under `home`, wait for its control socket to appear, and
1055    /// return the socket path plus the join handle for asserting clean exit.
1056    async fn start_node(
1057        home: &Path,
1058        secret: iroh::SecretKey,
1059    ) -> (PathBuf, tokio::task::JoinHandle<Result<(), NodeError>>) {
1060        let home_path = home.to_path_buf();
1061        let handle = tokio::spawn(async move { run(&home_path, secret).await });
1062        let socket = home.join(ARTIFACTS_DIR).join("control.sock");
1063        for _ in 0..200 {
1064            if socket.exists() {
1065                break;
1066            }
1067            tokio::time::sleep(Duration::from_millis(50)).await;
1068        }
1069        assert!(socket.exists(), "control socket never appeared");
1070        (socket, handle)
1071    }
1072
1073    /// End-to-end client↔node round-trip covering Status, Seed (new +
1074    /// duplicate + CID mismatch), IsSeeding, ListSeeded, Unseed (new +
1075    /// duplicate), and Shutdown.
1076    #[test]
1077    fn node_round_trip() {
1078        let rt = tokio::runtime::Builder::new_multi_thread()
1079            .enable_all()
1080            .build()
1081            .unwrap();
1082        rt.block_on(async {
1083            let home = tempfile::tempdir().unwrap();
1084
1085            // Compute a real blob CID from a real file so the import
1086            // verification passes.
1087            let blob_path = home.path().join("payload.bin");
1088            let payload = b"hello rad-artifact";
1089            fs::write(&blob_path, payload).unwrap();
1090            let real_cid = cid_utils::compute_blob_cid(&blob_path).unwrap();
1091            let rid = rid_a();
1092
1093            // Pin the secret so the test is reproducible.
1094            let secret = iroh::SecretKey::from_bytes(&[1u8; 32]);
1095            let expected_endpoint_id = EndpointId::from(secret.public());
1096
1097            // Run the node on a tokio task; capture the join handle so
1098            // we can assert clean exit.
1099            let home_path = home.path().to_path_buf();
1100            let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1101
1102            // Wait for the socket to appear (bootstrap involves iroh
1103            // endpoint bind, which is async).
1104            let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1105            for _ in 0..200 {
1106                if socket.exists() {
1107                    break;
1108                }
1109                tokio::time::sleep(Duration::from_millis(50)).await;
1110            }
1111            assert!(socket.exists(), "control socket never appeared");
1112
1113            let client = Client::new(socket.clone());
1114
1115            // is_running succeeds.
1116            assert!(client.is_running().await);
1117
1118            // Status round-trips and reports the expected endpoint id.
1119            let status = client.status().await.unwrap();
1120            assert_eq!(status.endpoint_id, expected_endpoint_id);
1121            assert_eq!(status.seeded.count, 0);
1122
1123            // Seed (new).
1124            let receipt = client
1125                .seed(
1126                    rid,
1127                    release_a(),
1128                    real_cid,
1129                    &blob_path,
1130                    ArtifactKind::Blob,
1131                    ImportMode::Copy,
1132                )
1133                .await
1134                .unwrap();
1135            assert!(receipt.was_new);
1136            assert_eq!(receipt.endpoint_id, expected_endpoint_id);
1137            assert_eq!(receipt.bytes, payload.len() as u64);
1138
1139            // Seed again — idempotent: was_new false.
1140            let receipt2 = client
1141                .seed(
1142                    rid,
1143                    release_a(),
1144                    real_cid,
1145                    &blob_path,
1146                    ArtifactKind::Blob,
1147                    ImportMode::Copy,
1148                )
1149                .await
1150                .unwrap();
1151            assert!(!receipt2.was_new);
1152
1153            // Seed with a tampered path that does not match the CID.
1154            let bad_path = home.path().join("tampered.bin");
1155            fs::write(&bad_path, b"different bytes").unwrap();
1156            let bad_cid = fake_blob_cid(b"not the right preimage");
1157            let err = client
1158                .seed(
1159                    rid,
1160                    release_a(),
1161                    bad_cid,
1162                    &bad_path,
1163                    ArtifactKind::Blob,
1164                    ImportMode::Copy,
1165                )
1166                .await
1167                .expect_err("CID mismatch must error");
1168            match err {
1169                radicle_artifact_client::ClientError::Remote(CommandError { code, .. }) => {
1170                    assert_eq!(code, ErrorCode::CidMismatch);
1171                }
1172                other => panic!("expected CidMismatch error, got {other:?}"),
1173            }
1174
1175            // IsSeeding reflects the tag we set above.
1176            assert!(client.is_seeding(rid, real_cid).await.unwrap());
1177
1178            // ListSeeded returns exactly the one entry.
1179            let entries = client.list_seeded(rid).await.unwrap();
1180            assert_eq!(entries.len(), 1);
1181            assert_eq!(entries[0].cid, real_cid);
1182            assert_eq!(entries[0].bytes, payload.len() as u64);
1183
1184            // Status now reports one seeded artifact.
1185            let status = client.status().await.unwrap();
1186            assert_eq!(status.seeded.count, 1);
1187            assert_eq!(status.seeded.bytes_logical, payload.len() as u64);
1188
1189            // Unseed once removes the tag; second call is idempotent.
1190            let r1 = client
1191                .unseed(rid, Some(release_a()), real_cid)
1192                .await
1193                .unwrap();
1194            assert!(r1.was_removed);
1195            let r2 = client
1196                .unseed(rid, Some(release_a()), real_cid)
1197                .await
1198                .unwrap();
1199            assert!(!r2.was_removed);
1200            assert!(!client.is_seeding(rid, real_cid).await.unwrap());
1201
1202            // Shutdown — the node acks then exits cleanly.
1203            client.shutdown().await.unwrap();
1204            tokio::time::timeout(Duration::from_secs(10), node_handle)
1205                .await
1206                .expect("node did not exit within 10s")
1207                .expect("join error")
1208                .expect("node returned error");
1209
1210            // Socket file is gone after shutdown.
1211            assert!(!socket.exists());
1212        });
1213    }
1214
1215    /// Malformed `rid`/`cid` on the wire must surface as `InvalidRequest`
1216    /// with the offending field named in the message — so callers don't
1217    /// have to grep for "invalid command JSON" to recognize bad input.
1218    #[test]
1219    fn invalid_typed_fields_surface_as_invalid_request() {
1220        let rt = tokio::runtime::Builder::new_multi_thread()
1221            .enable_all()
1222            .build()
1223            .unwrap();
1224        rt.block_on(async {
1225            let home = tempfile::tempdir().unwrap();
1226            let secret = iroh::SecretKey::from_bytes(&[4u8; 32]);
1227            let home_path = home.path().to_path_buf();
1228            let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1229
1230            let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1231            for _ in 0..200 {
1232                if socket.exists() {
1233                    break;
1234                }
1235                tokio::time::sleep(Duration::from_millis(50)).await;
1236            }
1237            assert!(socket.exists());
1238
1239            // The typed Client can't construct these frames since the
1240            // fields are now RepoId / Cid, so we hand-roll the wire.
1241            for (frame, expected_field) in [
1242                (
1243                    br#"{"command":"list-seeded","rid":"not-a-real-rid"}"#.as_slice(),
1244                    "rid",
1245                ),
1246                (
1247                    br#"{"command":"is-seeding","rid":"rad:z2u2CP3ZJzB7ZqE8jHrau19yjpdip","cid":"not-a-real-cid"}"#.as_slice(),
1248                    "cid",
1249                ),
1250            ] {
1251                let mut stream = tokio::net::UnixStream::connect(&socket).await.unwrap();
1252                tokio::io::AsyncWriteExt::write_all(&mut stream, frame)
1253                    .await
1254                    .unwrap();
1255                tokio::io::AsyncWriteExt::write_all(&mut stream, b"\n")
1256                    .await
1257                    .unwrap();
1258                let mut buf = String::new();
1259                tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1260                    .await
1261                    .unwrap();
1262                let parsed: CommandResult<serde_json::Value> =
1263                    serde_json::from_str(buf.trim()).unwrap();
1264                match parsed {
1265                    CommandResult::Error(CommandError { code, message }) => {
1266                        assert_eq!(code, ErrorCode::InvalidRequest);
1267                        assert!(
1268                            message.contains(expected_field),
1269                            "message should name {expected_field}: {message}"
1270                        );
1271                    }
1272                    CommandResult::Okay(_) => panic!("expected error, got ok"),
1273                }
1274            }
1275
1276            // Clean shutdown.
1277            let client = Client::new(socket);
1278            client.shutdown().await.unwrap();
1279            node_handle.await.unwrap().unwrap();
1280        });
1281    }
1282
1283    /// A second `run` on the same home while the first is alive must
1284    /// fail with `AlreadyRunning`, not silently steal the socket.
1285    #[test]
1286    fn double_start_errors() {
1287        let rt = tokio::runtime::Builder::new_multi_thread()
1288            .enable_all()
1289            .build()
1290            .unwrap();
1291        rt.block_on(async {
1292            let home = tempfile::tempdir().unwrap();
1293            let secret = iroh::SecretKey::from_bytes(&[2u8; 32]);
1294            let home_path = home.path().to_path_buf();
1295            let first = tokio::spawn(async move { run(&home_path, secret).await });
1296
1297            let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1298            for _ in 0..200 {
1299                if socket.exists() {
1300                    break;
1301                }
1302                tokio::time::sleep(Duration::from_millis(50)).await;
1303            }
1304            assert!(socket.exists());
1305
1306            // Same home, different secret to keep keys distinct.
1307            let secret2 = iroh::SecretKey::from_bytes(&[3u8; 32]);
1308            let err = run(home.path(), secret2).await.expect_err("must fail");
1309            assert!(matches!(err, NodeError::AlreadyRunning(_)));
1310
1311            // Tear down the first one so the test cleans up.
1312            let client = Client::new(socket);
1313            client.shutdown().await.unwrap();
1314            first.await.unwrap().unwrap();
1315        });
1316    }
1317
1318    /// Send a one-shot command on a fresh connection and decode the reply.
1319    async fn oneshot<T: serde::de::DeserializeOwned>(
1320        socket: &Path,
1321        cmd: &Command,
1322    ) -> CommandResult<T> {
1323        let mut stream = UnixStream::connect(socket).await.unwrap();
1324        let mut line = serde_json::to_string(cmd).unwrap();
1325        line.push('\n');
1326        tokio::io::AsyncWriteExt::write_all(&mut stream, line.as_bytes())
1327            .await
1328            .unwrap();
1329        let mut buf = String::new();
1330        tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1331            .await
1332            .unwrap();
1333        serde_json::from_str(buf.trim()).unwrap()
1334    }
1335
1336    /// Send a streaming command; return (progress frame count, terminal frame).
1337    async fn streaming<T: serde::de::DeserializeOwned>(
1338        socket: &Path,
1339        cmd: &Command,
1340    ) -> (usize, StreamEvent<T>) {
1341        let mut stream = UnixStream::connect(socket).await.unwrap();
1342        let mut line = serde_json::to_string(cmd).unwrap();
1343        line.push('\n');
1344        tokio::io::AsyncWriteExt::write_all(&mut stream, line.as_bytes())
1345            .await
1346            .unwrap();
1347        let mut buf = String::new();
1348        tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1349            .await
1350            .unwrap();
1351        let mut progress = 0usize;
1352        let mut terminal = None;
1353        for l in buf.lines().filter(|l| !l.trim().is_empty()) {
1354            match serde_json::from_str::<StreamEvent<T>>(l).unwrap() {
1355                StreamEvent::Progress(_) => progress += 1,
1356                term => terminal = Some(term),
1357            }
1358        }
1359        (progress, terminal.expect("a terminal frame"))
1360    }
1361
1362    /// `Has` reflects store presence; `Export` streams local bytes to disk
1363    /// and reports `NotLocal` for content the store doesn't hold.
1364    #[test]
1365    fn has_and_export_round_trip() {
1366        let rt = tokio::runtime::Builder::new_multi_thread()
1367            .enable_all()
1368            .build()
1369            .unwrap();
1370        rt.block_on(async {
1371            let home = tempfile::tempdir().unwrap();
1372            let payload = b"hello has/export";
1373            let blob_path = home.path().join("payload.bin");
1374            fs::write(&blob_path, payload).unwrap();
1375            let cid = cid_utils::compute_blob_cid(&blob_path).unwrap();
1376            let rid = rid_a();
1377
1378            let secret = iroh::SecretKey::from_bytes(&[5u8; 32]);
1379            let home_path = home.path().to_path_buf();
1380            let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1381
1382            let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1383            for _ in 0..200 {
1384                if socket.exists() {
1385                    break;
1386                }
1387                tokio::time::sleep(Duration::from_millis(50)).await;
1388            }
1389            assert!(socket.exists(), "control socket never appeared");
1390
1391            // Seed the blob so its bytes live in the store.
1392            let client = Client::new(socket.clone());
1393            client
1394                .seed(
1395                    rid,
1396                    release_a(),
1397                    cid,
1398                    &blob_path,
1399                    ArtifactKind::Blob,
1400                    ImportMode::Copy,
1401                )
1402                .await
1403                .unwrap();
1404
1405            // Has: present and complete with the right size.
1406            match oneshot::<HasResult>(&socket, &Command::Has { cid }).await {
1407                CommandResult::Okay(h) => {
1408                    assert!(h.present);
1409                    assert!(h.complete);
1410                    assert_eq!(h.bytes, payload.len() as u64);
1411                }
1412                CommandResult::Error(e) => panic!("has errored: {e:?}"),
1413            }
1414
1415            // Has on content the store doesn't hold: absent.
1416            let unknown = fake_blob_cid(b"never stored");
1417            match oneshot::<HasResult>(&socket, &Command::Has { cid: unknown }).await {
1418                CommandResult::Okay(h) => {
1419                    assert!(!h.present);
1420                    assert!(!h.complete);
1421                }
1422                CommandResult::Error(e) => panic!("has errored: {e:?}"),
1423            }
1424
1425            // Export streams the bytes to disk; the file matches the payload.
1426            let dest = home.path().join("exported.bin");
1427            let (_progress, term) = streaming::<ExportReceipt>(
1428                &socket,
1429                &Command::Export {
1430                    cid,
1431                    dest: dest.clone(),
1432                },
1433            )
1434            .await;
1435            match term {
1436                StreamEvent::Okay(r) => {
1437                    assert_eq!(r.bytes, payload.len() as u64);
1438                    assert_eq!(r.dest, dest);
1439                }
1440                other => panic!("expected okay, got {other:?}"),
1441            }
1442            assert_eq!(fs::read(&dest).unwrap(), payload);
1443
1444            // Export of absent content reports NotLocal.
1445            let (_p, term) = streaming::<ExportReceipt>(
1446                &socket,
1447                &Command::Export {
1448                    cid: unknown,
1449                    dest: home.path().join("nope.bin"),
1450                },
1451            )
1452            .await;
1453            match term {
1454                StreamEvent::Error(e) => assert_eq!(e.code, ErrorCode::NotLocal),
1455                other => panic!("expected NotLocal error, got {other:?}"),
1456            }
1457
1458            client.shutdown().await.unwrap();
1459            node_handle.await.unwrap().unwrap();
1460        });
1461    }
1462
1463    /// `Fetch` (store-only) and `Download` (to disk) fast paths over
1464    /// already-local bytes: `Fetch` writes no file and reports the logical
1465    /// store size; `Download` writes the file; `Fetch --seed` tags a second
1466    /// repo by shared hash; an empty location set on absent content reports
1467    /// `NoLocations`. (The networked download path needs two endpoints and is
1468    /// exercised via the shared core, not here.)
1469    #[test]
1470    fn fetch_and_download_fast_path_and_no_locations() {
1471        let rt = tokio::runtime::Builder::new_multi_thread()
1472            .enable_all()
1473            .build()
1474            .unwrap();
1475        rt.block_on(async {
1476            let home = tempfile::tempdir().unwrap();
1477            let payload = b"hello fetch fast-path";
1478            let blob_path = home.path().join("payload.bin");
1479            fs::write(&blob_path, payload).unwrap();
1480            let cid = cid_utils::compute_blob_cid(&blob_path).unwrap();
1481            let rid = rid_a();
1482
1483            let secret = iroh::SecretKey::from_bytes(&[6u8; 32]);
1484            let home_path = home.path().to_path_buf();
1485            let node_handle = tokio::spawn(async move { run(&home_path, secret).await });
1486
1487            let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1488            for _ in 0..200 {
1489                if socket.exists() {
1490                    break;
1491                }
1492                tokio::time::sleep(Duration::from_millis(50)).await;
1493            }
1494            assert!(socket.exists(), "control socket never appeared");
1495
1496            // Seed so the bytes live in the store.
1497            let client = Client::new(socket.clone());
1498            client
1499                .seed(
1500                    rid,
1501                    release_a(),
1502                    cid,
1503                    &blob_path,
1504                    ArtifactKind::Blob,
1505                    ImportMode::Copy,
1506                )
1507                .await
1508                .unwrap();
1509
1510            // Fetch fast path: complete locally, no locations, no seed, no
1511            // disk write. Reports the logical store size.
1512            let (_p, term) = streaming::<FetchReceipt>(
1513                &socket,
1514                &Command::Fetch {
1515                    rid,
1516                    cid,
1517                    locations: vec![],
1518                    seed: None,
1519                },
1520            )
1521            .await;
1522            match term {
1523                StreamEvent::Okay(r) => {
1524                    assert!(r.from_cache);
1525                    assert!(!r.seeded);
1526                    assert_eq!(r.bytes, payload.len() as u64);
1527                }
1528                other => panic!("expected okay, got {other:?}"),
1529            }
1530
1531            // Download fast path: same bytes, but exported to disk.
1532            let dest = home.path().join("downloaded.bin");
1533            let (_p, term) = streaming::<DownloadReceipt>(
1534                &socket,
1535                &Command::Download {
1536                    rid,
1537                    cid,
1538                    locations: vec![],
1539                    dest: dest.clone(),
1540                    seed: None,
1541                },
1542            )
1543            .await;
1544            match term {
1545                StreamEvent::Okay(r) => {
1546                    assert!(r.from_cache);
1547                    assert!(!r.seeded);
1548                    assert_eq!(r.bytes, payload.len() as u64);
1549                    assert_eq!(r.dest, dest);
1550                }
1551                other => panic!("expected okay, got {other:?}"),
1552            }
1553            assert_eq!(fs::read(&dest).unwrap(), payload);
1554
1555            // Fetch with seed under a second repo: the bytes are shared by
1556            // hash, so a (rid2, release, cid) tag is set without re-downloading.
1557            let rid2 = RepoId::from_str("rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5").unwrap();
1558            let (_p, term) = streaming::<FetchReceipt>(
1559                &socket,
1560                &Command::Fetch {
1561                    rid: rid2,
1562                    cid,
1563                    locations: vec![],
1564                    seed: Some(release_a()),
1565                },
1566            )
1567            .await;
1568            match term {
1569                StreamEvent::Okay(r) => {
1570                    assert!(r.from_cache);
1571                    assert!(r.seeded);
1572                }
1573                other => panic!("expected okay, got {other:?}"),
1574            }
1575            assert!(client.is_seeding(rid2, cid).await.unwrap());
1576
1577            // Absent content with no locations to try: NoLocations.
1578            let unknown = fake_blob_cid(b"absent");
1579            let (_p, term) = streaming::<FetchReceipt>(
1580                &socket,
1581                &Command::Fetch {
1582                    rid,
1583                    cid: unknown,
1584                    locations: vec![],
1585                    seed: None,
1586                },
1587            )
1588            .await;
1589            match term {
1590                StreamEvent::Error(e) => assert_eq!(e.code, ErrorCode::NoLocations),
1591                other => panic!("expected NoLocations error, got {other:?}"),
1592            }
1593
1594            client.shutdown().await.unwrap();
1595            node_handle.await.unwrap().unwrap();
1596        });
1597    }
1598
1599    /// A client that vanishes mid-stream must drop the in-flight `op`, not
1600    /// leave it running. Driven directly over an in-memory pipe: the op parks
1601    /// forever, so the only way `run_stream` returns is the disconnect arm
1602    /// seeing EOF on the read half — at which point the op future is dropped.
1603    #[test]
1604    fn run_stream_aborts_on_client_disconnect() {
1605        use std::sync::atomic::AtomicBool;
1606
1607        let rt = tokio::runtime::Builder::new_multi_thread()
1608            .enable_all()
1609            .build()
1610            .unwrap();
1611        rt.block_on(async {
1612            // Duplex pipe standing in for the connection: we keep the client
1613            // end, run_stream reads/writes the server end.
1614            let (client_end, server_end) = tokio::io::duplex(64);
1615            let (mut srv_read, mut srv_write) = tokio::io::split(server_end);
1616
1617            // Guard whose Drop flips a flag, so we can prove the op future was
1618            // dropped (aborted) rather than allowed to complete.
1619            struct DropFlag(Arc<AtomicBool>);
1620            impl Drop for DropFlag {
1621                fn drop(&mut self) {
1622                    self.0.store(true, Ordering::SeqCst);
1623                }
1624            }
1625            let aborted = Arc::new(AtomicBool::new(false));
1626            let completed = Arc::new(AtomicBool::new(false));
1627            let aborted_op = aborted.clone();
1628            let completed_op = completed.clone();
1629
1630            let op = async move |_tx: mpsc::UnboundedSender<FetchProgress>| {
1631                let _guard = DropFlag(aborted_op);
1632                // Never resolves on its own; only a drop ends it.
1633                std::future::pending::<()>().await;
1634                completed_op.store(true, Ordering::SeqCst);
1635                Ok::<(), (ErrorCode, String)>(())
1636            };
1637
1638            let server =
1639                tokio::spawn(
1640                    async move { run_stream::<()>(&mut srv_read, &mut srv_write, op).await },
1641                );
1642
1643            // Let run_stream enter its select loop (op parked) before the
1644            // client disconnects, so we exercise a genuinely in-flight op.
1645            tokio::time::sleep(Duration::from_millis(50)).await;
1646            drop(client_end); // EOF on the server read half
1647
1648            let res = tokio::time::timeout(Duration::from_secs(5), server)
1649                .await
1650                .expect("run_stream did not return after disconnect")
1651                .expect("join error");
1652            assert!(res.is_ok());
1653            assert!(
1654                aborted.load(Ordering::SeqCst),
1655                "op future was not dropped on disconnect"
1656            );
1657            assert!(
1658                !completed.load(Ordering::SeqCst),
1659                "op should have been aborted, not completed"
1660            );
1661        });
1662    }
1663
1664    /// A leftover socket file with no live owner must be reclaimed: `run`
1665    /// probes it, finds nothing answering, unlinks it, and binds its own.
1666    #[test]
1667    fn stale_socket_is_reclaimed() {
1668        let rt = tokio::runtime::Builder::new_multi_thread()
1669            .enable_all()
1670            .build()
1671            .unwrap();
1672        rt.block_on(async {
1673            let home = tempfile::tempdir().unwrap();
1674            let socket = home.path().join(ARTIFACTS_DIR).join("control.sock");
1675            fs::create_dir_all(socket.parent().unwrap()).unwrap();
1676            // Bind then drop: the socket file lingers but nothing listens.
1677            drop(UnixListener::bind(&socket).unwrap());
1678            assert!(socket.exists());
1679
1680            let secret = iroh::SecretKey::from_bytes(&[7u8; 32]);
1681            let home_path = home.path().to_path_buf();
1682            let handle = tokio::spawn(async move { run(&home_path, secret).await });
1683
1684            // Poll is_running (not just file existence) since the stale file
1685            // is replaced in place — only a live answer means we're up.
1686            let client = Client::new(socket.clone());
1687            let mut ready = false;
1688            for _ in 0..200 {
1689                if client.is_running().await {
1690                    ready = true;
1691                    break;
1692                }
1693                tokio::time::sleep(Duration::from_millis(50)).await;
1694            }
1695            assert!(ready, "node never came up after reclaiming stale socket");
1696
1697            client.shutdown().await.unwrap();
1698            handle.await.unwrap().unwrap();
1699        });
1700    }
1701
1702    /// `Seed` against a path that doesn't exist must report `PathNotFound`
1703    /// before any import work.
1704    #[test]
1705    fn seed_missing_path_errors() {
1706        let rt = tokio::runtime::Builder::new_multi_thread()
1707            .enable_all()
1708            .build()
1709            .unwrap();
1710        rt.block_on(async {
1711            let home = tempfile::tempdir().unwrap();
1712            let secret = iroh::SecretKey::from_bytes(&[8u8; 32]);
1713            let (socket, handle) = start_node(home.path(), secret).await;
1714
1715            let client = Client::new(socket.clone());
1716            let missing = home.path().join("does-not-exist.bin");
1717            let err = client
1718                .seed(
1719                    rid_a(),
1720                    release_a(),
1721                    fake_blob_cid(b"whatever"),
1722                    &missing,
1723                    ArtifactKind::Blob,
1724                    ImportMode::Copy,
1725                )
1726                .await
1727                .expect_err("missing path must error");
1728            match err {
1729                radicle_artifact_client::ClientError::Remote(CommandError { code, .. }) => {
1730                    assert_eq!(code, ErrorCode::PathNotFound);
1731                }
1732                other => panic!("expected PathNotFound, got {other:?}"),
1733            }
1734
1735            client.shutdown().await.unwrap();
1736            handle.await.unwrap().unwrap();
1737        });
1738    }
1739
1740    /// Bare malformed JSON (not just a bad typed field) must surface as
1741    /// `InvalidRequest` naming the JSON parse failure.
1742    #[test]
1743    fn malformed_json_surfaces_as_invalid_request() {
1744        let rt = tokio::runtime::Builder::new_multi_thread()
1745            .enable_all()
1746            .build()
1747            .unwrap();
1748        rt.block_on(async {
1749            let home = tempfile::tempdir().unwrap();
1750            let secret = iroh::SecretKey::from_bytes(&[9u8; 32]);
1751            let (socket, handle) = start_node(home.path(), secret).await;
1752
1753            let mut stream = UnixStream::connect(&socket).await.unwrap();
1754            tokio::io::AsyncWriteExt::write_all(&mut stream, b"this is not json\n")
1755                .await
1756                .unwrap();
1757            let mut buf = String::new();
1758            tokio::io::AsyncReadExt::read_to_string(&mut stream, &mut buf)
1759                .await
1760                .unwrap();
1761            let parsed: CommandResult<serde_json::Value> =
1762                serde_json::from_str(buf.trim()).unwrap();
1763            match parsed {
1764                CommandResult::Error(CommandError { code, message }) => {
1765                    assert_eq!(code, ErrorCode::InvalidRequest);
1766                    assert!(
1767                        message.contains("invalid command JSON"),
1768                        "message should name the JSON failure: {message}"
1769                    );
1770                }
1771                CommandResult::Okay(_) => panic!("expected error, got ok"),
1772            }
1773
1774            let client = Client::new(socket);
1775            client.shutdown().await.unwrap();
1776            handle.await.unwrap().unwrap();
1777        });
1778    }
1779}