Skip to main content

hypha/visitor/
spawn.rs

1use super::*;
2
3enum ArchiveEndpointAttemptError {
4    Retry(String),
5    Fatal(crate::HyphaError),
6}
7
8async fn try_archive_endpoints<F, Fut>(
9    sink: &dyn crate::EventSink,
10    hash: &str,
11    endpoints: &[substrate::CmnEndpoint],
12    mut attempt: F,
13) -> Result<(), crate::HyphaError>
14where
15    F: FnMut(substrate::CmnEndpoint, String) -> Fut,
16    Fut: std::future::Future<Output = Result<(), ArchiveEndpointAttemptError>>,
17{
18    let mut last_error = "no archive endpoints configured".to_string();
19
20    for archive_ep in endpoints
21        .iter()
22        .filter(|endpoint| endpoint.kind == "archive")
23    {
24        let resolved_url = build_archive_url_from_endpoint(archive_ep, hash)?;
25        match attempt(archive_ep.clone(), resolved_url.clone()).await {
26            Ok(()) => return Ok(()),
27            Err(ArchiveEndpointAttemptError::Retry(message)) => {
28                last_error = message.clone();
29                sink.emit(crate::HyphaEvent::Warn { message });
30            }
31            Err(ArchiveEndpointAttemptError::Fatal(err)) => return Err(err),
32        }
33    }
34
35    Err(crate::HyphaError::new(
36        "fetch_failed",
37        format!("Failed to download archive: {}", last_error),
38    ))
39}
40
41/// Handle the `spore spawn` command - create a working copy of a spore
42///
43/// Spawn flow for archive sources (default):
44/// 1. Download and extract archive to user directory
45/// 2. If --vcs git: initialize git repo with initial commit
46/// 3. Update spore.core.json: add spawn reference, clear domain
47///
48/// Spawn flow for git sources (--dist git):
49/// 1. Clone remote repo to cache bare repo (if not exists)
50/// 2. Clone from cache bare repo to user directory
51/// 3. If --vcs not specified with git dist: keep .git from clone
52/// 4. Update spore.core.json: add spawn reference, clear domain
53///
54/// Spawn a spore to a local directory — library level.
55pub async fn spawn(
56    uri_str: &str,
57    path: Option<&str>,
58    vcs: Option<&str>,
59    dist_preference: Option<&str>,
60    bond: bool,
61    sink: &dyn crate::EventSink,
62) -> Result<crate::output::SpawnOutput, crate::HyphaError> {
63    let uri = CmnUri::parse(uri_str).map_err(|e| crate::HyphaError::new("invalid_uri", e))?;
64
65    let hash = uri
66        .hash
67        .as_deref()
68        .ok_or_else(|| crate::HyphaError::new("invalid_uri", "spore URI must include a hash"))?;
69
70    let cache = CacheDir::new()?;
71    check_taste(sink, &cache, uri_str, &uri.domain, hash)?;
72
73    if let Some(p) = path {
74        let target = std::path::PathBuf::from(p);
75        if target.exists() {
76            return Err(crate::HyphaError::new(
77                "DIR_EXISTS",
78                format!("Target path already exists: {}", target.display()),
79            ));
80        }
81    }
82    let domain_cache = cache.domain(&uri.domain);
83
84    let entry = get_cmn_entry(sink, &domain_cache, cache.cmn_ttl_ms).await?;
85
86    let capsule = primary_capsule(&entry)?;
87    let ep = &capsule.endpoints;
88
89    let (manifest, spore) =
90        fetch_verified_spore(sink, capsule, hash, &domain_cache, cache.cmn_ttl_ms).await?;
91
92    let id_opt = (!spore.capsule.core.id.is_empty()).then_some(spore.capsule.core.id.as_str());
93    let name = spore.capsule.core.name.as_str();
94    let raw_id = id_opt.filter(|id| !id.is_empty());
95    let default_dir_name = substrate::local_dir_name(raw_id, Some(name), hash);
96
97    let dist_array = spore.distributions();
98    if dist_array.is_empty() {
99        return Err(crate::HyphaError::new(
100            "manifest_failed",
101            "No distribution options in spore manifest",
102        ));
103    }
104
105    let archive_dist = dist_array.iter().find(|d| dist_has_type(d, "archive"));
106    let git_dist = dist_array.iter().find(|d| dist_has_type(d, "git"));
107
108    let target_path = match path {
109        Some(p) => std::path::PathBuf::from(p),
110        None => {
111            let cwd = std::env::current_dir().map_err(|e| {
112                crate::HyphaError::new(
113                    "dir_error",
114                    format!("Failed to get current directory: {}", e),
115                )
116            })?;
117            let auto_dir_name = if raw_id.is_some()
118                && default_dir_name != hash
119                && cwd.join(&default_dir_name).exists()
120            {
121                hash.to_string()
122            } else {
123                default_dir_name.clone()
124            };
125            cwd.join(auto_dir_name)
126        }
127    };
128
129    if target_path.exists() {
130        return Err(crate::HyphaError::new(
131            "DIR_EXISTS",
132            format!(
133                "Target path already exists: {}. Remove it first.",
134                target_path.display()
135            ),
136        ));
137    }
138
139    let prefer_git = matches!(dist_preference, Some("git"));
140
141    let output = if prefer_git {
142        if let Some(git_d) = git_dist {
143            if !crate::git::is_available() {
144                return Err(crate::HyphaError::new(
145                    "git_not_found",
146                    "git is not installed. Install git or use --dist archive",
147                ));
148            }
149            spawn_from_git_lib(
150                sink,
151                uri_str,
152                hash,
153                name,
154                git_d,
155                &target_path,
156                &domain_cache,
157                &cache,
158                vcs,
159                &manifest,
160            )
161            .await?
162        } else if let Some(archive_d) = archive_dist {
163            spawn_from_archive_lib(
164                sink,
165                uri_str,
166                hash,
167                name,
168                archive_d,
169                &target_path,
170                &domain_cache,
171                ep,
172                vcs,
173                &manifest,
174            )
175            .await?
176        } else {
177            return Err(crate::HyphaError::new(
178                "manifest_failed",
179                "No distribution found in spore manifest",
180            ));
181        }
182    } else if let Some(archive_d) = archive_dist {
183        spawn_from_archive_lib(
184            sink,
185            uri_str,
186            hash,
187            name,
188            archive_d,
189            &target_path,
190            &domain_cache,
191            ep,
192            vcs,
193            &manifest,
194        )
195        .await?
196    } else if let Some(git_d) = git_dist {
197        if !crate::git::is_available() {
198            return Err(crate::HyphaError::new(
199                "git_not_found",
200                "No archive distribution and git is not installed",
201            ));
202        }
203        spawn_from_git_lib(
204            sink,
205            uri_str,
206            hash,
207            name,
208            git_d,
209            &target_path,
210            &domain_cache,
211            &cache,
212            vcs,
213            &manifest,
214        )
215        .await?
216    } else {
217        return Err(crate::HyphaError::new(
218            "manifest_failed",
219            "No distribution found in spore manifest",
220        ));
221    };
222
223    // Auto-bond after successful spawn
224    if bond {
225        if let Err(e) = bond_in_dir(&target_path, false, false, sink).await {
226            sink.emit(crate::HyphaEvent::Warn {
227                message: format!("Bond failed after spawn: {}", e),
228            });
229        }
230    }
231
232    Ok(output)
233}
234
235pub async fn handle_spawn(
236    out: &Output,
237    uri_str: &str,
238    path: Option<&str>,
239    vcs: Option<&str>,
240    dist_preference: Option<&str>,
241    bond: bool,
242) -> ExitCode {
243    let sink = crate::api::OutSink(out);
244    match spawn(uri_str, path, vcs, dist_preference, bond, &sink).await {
245        Ok(output) => out.ok(serde_json::to_value(output).unwrap_or_default()),
246        Err(e) => out.error_hypha(&e),
247    }
248}
249
250#[allow(clippy::too_many_arguments)]
251async fn spawn_from_git_lib(
252    sink: &dyn crate::EventSink,
253    uri_str: &str,
254    hash: &str,
255    name: &str,
256    git_dist: &substrate::SporeDist,
257    target_path: &std::path::Path,
258    domain_cache: &DomainCache,
259    cache: &CacheDir,
260    vcs: Option<&str>,
261    manifest: &serde_json::Value,
262) -> Result<crate::output::SpawnOutput, crate::HyphaError> {
263    let git_url = dist_git_url(git_dist).unwrap_or("");
264    let git_ref = dist_git_ref(git_dist);
265    let git_limits = crate::git::GitSizeLimits::new(
266        cache.spore_max_extract_bytes,
267        cache.spore_max_extract_files,
268    );
269
270    if git_url.is_empty() {
271        return Err(crate::HyphaError::new(
272            "spawn_error",
273            "Empty git URL in distribution",
274        ));
275    }
276
277    let temp_bare_name = format!(".spawn-bare-tmp-{}", std::process::id());
278    let temp_bare_path = domain_cache.repos_dir().join(&temp_bare_name);
279
280    if temp_bare_path.exists() {
281        let _ = std::fs::remove_dir_all(&temp_bare_path);
282    }
283
284    std::fs::create_dir_all(domain_cache.repos_dir()).map_err(|e| {
285        crate::HyphaError::new(
286            "spawn_error",
287            format!("Failed to create repos cache directory: {}", e),
288        )
289    })?;
290
291    crate::git::clone_bare_repo(git_url, &temp_bare_path).map_err(|e| {
292        warn_remove_dir(sink, &temp_bare_path);
293        crate::HyphaError::new("spawn_error", format!("Failed to clone bare repo: {}", e))
294    })?;
295    crate::git::enforce_size_budget(&temp_bare_path, git_limits).map_err(|e| {
296        warn_remove_dir(sink, &temp_bare_path);
297        crate::HyphaError::new(
298            "spawn_error",
299            format!("Git repo exceeds size budget: {}", e),
300        )
301    })?;
302
303    let root_commit = crate::git::get_root_commit_bare(&temp_bare_path).map_err(|e| {
304        warn_remove_dir(sink, &temp_bare_path);
305        crate::HyphaError::new("spawn_error", format!("Failed to get root commit: {}", e))
306    })?;
307
308    let bare_repo_path = domain_cache.repo_path(&root_commit);
309    if !bare_repo_path.exists() {
310        std::fs::rename(&temp_bare_path, &bare_repo_path).map_err(|e| {
311            warn_remove_dir(sink, &temp_bare_path);
312            crate::HyphaError::new(
313                "spawn_error",
314                format!("Failed to move bare repo to cache: {}", e),
315            )
316        })?;
317    } else {
318        let _ = std::fs::remove_dir_all(&temp_bare_path);
319    }
320
321    crate::git::enforce_size_budget(&bare_repo_path, git_limits).map_err(|e| {
322        crate::HyphaError::new(
323            "spawn_error",
324            format!("Git repo exceeds size budget: {}", e),
325        )
326    })?;
327
328    crate::git::clone_from_local_no_checkout(&bare_repo_path, target_path).map_err(|e| {
329        crate::HyphaError::new("spawn_error", format!("Failed to clone from cache: {}", e))
330    })?;
331    crate::git::configure_blobless_promisor_remote(
332        target_path,
333        crate::git::CMN_PROMISOR_REMOTE,
334        git_url,
335    )
336    .map_err(|e| {
337        warn_remove_dir(sink, target_path);
338        crate::HyphaError::new(
339            "spawn_error",
340            format!("Failed to configure git promisor remote: {}", e),
341        )
342    })?;
343
344    let checkout_ref = git_ref.unwrap_or("HEAD");
345    crate::git::checkout_ref(target_path, checkout_ref).map_err(|e| {
346        warn_remove_dir(sink, target_path);
347        crate::HyphaError::new(
348            "spawn_error",
349            format!("Failed to checkout ref {}: {}", checkout_ref, e),
350        )
351    })?;
352    crate::git::enforce_size_budget(target_path, git_limits).map_err(|e| {
353        warn_remove_dir(sink, target_path);
354        crate::HyphaError::new(
355            "spawn_error",
356            format!("Git checkout exceeds size budget: {}", e),
357        )
358    })?;
359
360    verify_downloaded_content(sink, target_path, target_path, manifest, hash, domain_cache)?;
361
362    let use_vcs = vcs == Some("git");
363    if use_vcs {
364        let _ = crate::git::set_remote_url(target_path, "origin", uri_str);
365        let _ = crate::git::add_remote(
366            target_path,
367            "spawn",
368            &format!("file://{}", bare_repo_path.display()),
369        );
370    } else {
371        let git_dir = target_path.join(".git");
372        if git_dir.exists() {
373            let _ = std::fs::remove_dir_all(&git_dir);
374        }
375    }
376
377    let spore_core_path = target_path.join("spore.core.json");
378    if spore_core_path.exists() {
379        if let Err(e) = save_spawned_from_manifest(&spore_core_path, manifest) {
380            sink.emit(crate::HyphaEvent::Warn {
381                message: format!("Failed to save spawned-from: {}", e),
382            });
383        }
384    }
385
386    let abs_path = target_path
387        .canonicalize()
388        .unwrap_or_else(|_| target_path.to_path_buf());
389
390    Ok(crate::output::SpawnOutput {
391        uri: uri_str.to_string(),
392        name: name.to_string(),
393        path: abs_path.display().to_string(),
394        source_type: "git".to_string(),
395        vcs: vcs.map(|v| v.to_string()),
396    })
397}
398
399#[allow(clippy::too_many_arguments)]
400async fn spawn_from_archive_lib(
401    sink: &dyn crate::EventSink,
402    uri_str: &str,
403    hash: &str,
404    name: &str,
405    _archive_dist: &substrate::SporeDist,
406    target_path: &std::path::Path,
407    domain_cache: &DomainCache,
408    endpoints: &[substrate::CmnEndpoint],
409    vcs: Option<&str>,
410    manifest: &serde_json::Value,
411) -> Result<crate::output::SpawnOutput, crate::HyphaError> {
412    let temp_dir = tempfile::tempdir().map_err(|e| {
413        crate::HyphaError::new("spawn_error", format!("Failed to create temp dir: {}", e))
414    })?;
415
416    let archive_path = temp_dir.path().join("archive");
417    let cache = CacheDir::new()?;
418    let limits = ExtractLimits::from_cache(&cache);
419
420    try_archive_endpoints(sink, hash, endpoints, |archive_ep, resolved_url| {
421        let archive_path = archive_path.clone();
422        let limits = &limits;
423        let max_download_bytes = cache.spore_max_download_bytes;
424
425        async move {
426            if let Err(e) = download_file(&resolved_url, &archive_path, max_download_bytes).await {
427                return Err(ArchiveEndpointAttemptError::Retry(format!(
428                    "Failed to download from {}: {}",
429                    resolved_url, e
430                )));
431            }
432
433            if target_path.exists() {
434                std::fs::remove_dir_all(target_path).map_err(|e| {
435                    ArchiveEndpointAttemptError::Fatal(crate::HyphaError::new(
436                        "spawn_error",
437                        format!(
438                            "Failed to clear target directory {} before retry: {}",
439                            target_path.display(),
440                            e
441                        ),
442                    ))
443                })?;
444            }
445            std::fs::create_dir_all(target_path).map_err(|e| {
446                ArchiveEndpointAttemptError::Fatal(crate::HyphaError::new(
447                    "spawn_error",
448                    format!("Failed to create target directory: {}", e),
449                ))
450            })?;
451
452            extract_archive(
453                &archive_path,
454                target_path,
455                &resolved_url,
456                archive_ep.format.as_deref(),
457                limits,
458            )
459            .map_err(|e| {
460                warn_remove_dir(sink, target_path);
461                if e.is_policy_rejected() {
462                    ArchiveEndpointAttemptError::Fatal(crate::HyphaError::new(
463                        "spore_security_rejected",
464                        e.to_string(),
465                    ))
466                } else if e.is_malicious() {
467                    ArchiveEndpointAttemptError::Retry(format!(
468                        "Unverified content from {} was rejected: {}",
469                        resolved_url, e
470                    ))
471                } else {
472                    ArchiveEndpointAttemptError::Retry(format!(
473                        "Failed to extract archive from {}: {}",
474                        resolved_url, e
475                    ))
476                }
477            })?;
478
479            verify_downloaded_content(sink, target_path, target_path, manifest, hash, domain_cache)
480                .map_err(|e| {
481                    ArchiveEndpointAttemptError::Retry(format!(
482                        "Failed to verify content from {}: {}",
483                        resolved_url, e
484                    ))
485                })?;
486
487            Ok(())
488        }
489    })
490    .await?;
491
492    if vcs == Some("git") {
493        crate::git::init_repo(target_path).map_err(|e| {
494            warn_remove_dir(sink, target_path);
495            crate::HyphaError::new(
496                "spawn_error",
497                format!("Failed to initialize git repo: {}", e),
498            )
499        })?;
500
501        let commit_message = format!("Spawned from {}", uri_str);
502        crate::git::add_all_and_commit(target_path, &commit_message).map_err(|e| {
503            warn_remove_dir(sink, target_path);
504            crate::HyphaError::new(
505                "spawn_error",
506                format!("Failed to create initial commit: {}", e),
507            )
508        })?;
509        let _ = crate::git::add_remote(target_path, "origin", uri_str);
510    }
511
512    let spore_core_path = target_path.join("spore.core.json");
513    if spore_core_path.exists() {
514        if let Err(e) = save_spawned_from_manifest(&spore_core_path, manifest) {
515            sink.emit(crate::HyphaEvent::Warn {
516                message: format!("Failed to save spawned-from: {}", e),
517            });
518        }
519    }
520
521    let abs_path = target_path
522        .canonicalize()
523        .unwrap_or_else(|_| target_path.to_path_buf());
524
525    Ok(crate::output::SpawnOutput {
526        uri: uri_str.to_string(),
527        name: name.to_string(),
528        path: abs_path.display().to_string(),
529        source_type: "archive".to_string(),
530        vcs: vcs.map(|v| v.to_string()),
531    })
532}
533
534/// Resolve an archive entry path against `dest`, rejecting anything that could
535/// escape the destination. This is defense-in-depth at the actual write site —
536/// substrate's tar extractor already rejects traversal, but the filesystem
537/// boundary should never trust the entry path unconditionally.
538fn safe_entry_target(
539    dest: &std::path::Path,
540    rel: &str,
541) -> Result<std::path::PathBuf, ExtractError> {
542    use std::path::Component;
543
544    if rel.is_empty() {
545        return Err(ExtractError::Failed(
546            "archive entry has empty path".to_string(),
547        ));
548    }
549    for component in std::path::Path::new(rel).components() {
550        match component {
551            Component::Normal(_) | Component::CurDir => {}
552            // RootDir, ParentDir (`..`), or a drive Prefix would escape `dest`.
553            _ => {
554                return Err(ExtractError::Failed(format!(
555                    "archive entry escapes destination: {}",
556                    rel
557                )))
558            }
559        }
560    }
561    Ok(dest.join(rel))
562}
563
564fn reject_archive_entries(
565    entries: &[substrate::archive::ArchiveEntry],
566    reject_path_components: &[String],
567) -> Result<(), ExtractError> {
568    for entry in entries {
569        if let Some(component) =
570            rejected_path_component(std::path::Path::new(&entry.path), reject_path_components)
571        {
572            return Err(ExtractError::PolicyRejected(format!(
573                "received spore content contains protected path component '{}': {}",
574                component, entry.path
575            )));
576        }
577    }
578    Ok(())
579}
580
581/// Write extracted archive entries to disk.
582fn write_entries_to_disk(
583    entries: &[substrate::archive::ArchiveEntry],
584    dest: &std::path::Path,
585    reject_path_components: &[String],
586) -> Result<(), ExtractError> {
587    reject_archive_entries(entries, reject_path_components)?;
588
589    for entry in entries {
590        let target = safe_entry_target(dest, &entry.path)?;
591        match entry.kind {
592            substrate::archive::EntryKind::Directory => {
593                std::fs::create_dir_all(&target).map_err(|e| {
594                    ExtractError::Failed(format!("Failed to create dir {}: {}", entry.path, e))
595                })?;
596            }
597            substrate::archive::EntryKind::File => {
598                if let Some(parent) = target.parent() {
599                    std::fs::create_dir_all(parent).map_err(|e| {
600                        ExtractError::Failed(format!("Failed to create parent dir: {}", e))
601                    })?;
602                }
603                std::fs::write(&target, &entry.data).map_err(|e| {
604                    ExtractError::Failed(format!("Failed to write file {}: {}", entry.path, e))
605                })?;
606                #[cfg(unix)]
607                if entry.executable {
608                    use std::os::unix::fs::PermissionsExt;
609                    std::fs::set_permissions(&target, std::fs::Permissions::from_mode(0o755))
610                        .map_err(|e| {
611                            ExtractError::Failed(format!(
612                                "Failed to set executable bit on {}: {}",
613                                entry.path, e
614                            ))
615                        })?;
616                }
617            }
618        }
619    }
620    Ok(())
621}
622
623/// Extract an archive to destination directory.
624/// Requires endpoint-declared archive format; no legacy suffix fallback.
625pub(super) fn extract_archive(
626    archive_path: &std::path::Path,
627    dest: &std::path::Path,
628    url: &str,
629    format_hint: Option<&str>,
630    limits: &ExtractLimits,
631) -> Result<(), ExtractError> {
632    std::fs::create_dir_all(dest).map_err(|e| {
633        ExtractError::Failed(format!("Failed to create destination directory: {}", e))
634    })?;
635
636    if format_hint == Some("tar+zstd") {
637        let compressed = std::fs::read(archive_path)
638            .map_err(|e| ExtractError::Failed(format!("Failed to read archive: {}", e)))?;
639        let sub_limits = substrate::archive::ExtractLimits {
640            max_bytes: limits.max_bytes,
641            max_files: limits.max_files,
642            max_file_bytes: limits.max_file_bytes,
643        };
644        let entries = substrate::archive::extract_tar_zstd(&compressed, &sub_limits)?;
645        write_entries_to_disk(&entries, dest, &limits.reject_path_components)
646    } else {
647        Err(ExtractError::Failed(format!(
648            "Unsupported archive format for {}: {:?}. Expected format tar+zstd",
649            url, format_hint
650        )))
651    }
652}
653
654/// Save source spore manifest to `.cmn/spawned-from/spore.json` after spawn.
655/// spore.core.json is left untouched — hatch handles domain/key changes,
656/// release checks spawned_from at publish time.
657pub(super) fn save_spawned_from_manifest(
658    spore_core_path: &Path,
659    manifest: &serde_json::Value,
660) -> Result<(), crate::HyphaError> {
661    let cmn_dir = spore_core_path.parent().unwrap_or(spore_core_path);
662    let spawned_from_dir = cmn_dir.join(".cmn").join("spawned-from");
663    std::fs::create_dir_all(&spawned_from_dir).map_err(|e| {
664        crate::HyphaError::new(
665            "spawn_error",
666            format!("Failed to create .cmn/spawned-from: {}", e),
667        )
668    })?;
669
670    let spore = substrate::decode_spore(manifest).map_err(|e| {
671        crate::HyphaError::new(
672            "spawn_error",
673            format!("Invalid source spore manifest: {}", e),
674        )
675    })?;
676    let pretty = spore.to_pretty_json().map_err(|e| {
677        crate::HyphaError::new(
678            "spawn_error",
679            format!("Failed to format source spore manifest: {}", e),
680        )
681    })?;
682    std::fs::write(spawned_from_dir.join("spore.json"), pretty).map_err(|e| {
683        crate::HyphaError::new(
684            "spawn_error",
685            format!("Failed to write .cmn/spawned-from/spore.json: {}", e),
686        )
687    })?;
688
689    Ok(())
690}
691
692/// Download and extract tarball with byte-level progress events. but emits `DownloadProgress` events
693/// for byte-level progress tracking (speed / ETA).
694pub(super) async fn download_and_extract_tarball_cached_with_progress(
695    url: &str,
696    dest: &std::path::Path,
697    cache: &CacheDir,
698    _domain: &str,
699    _hash: &str,
700    format_hint: Option<&str>,
701    sink: &dyn crate::EventSink,
702) -> Result<tempfile::NamedTempFile, ExtractError> {
703    let client = substrate::client::http_client(300)
704        .map_err(|e| format!("Failed to create HTTP client: {}", e))?;
705
706    let response = client
707        .get(url)
708        .send()
709        .await
710        .map_err(|e| format!("Failed to download: {}", e))?;
711
712    if !response.status().is_success() {
713        return Err(ExtractError::Failed(format!("HTTP {}", response.status())));
714    }
715
716    let max_download = cache.spore_max_download_bytes;
717    if let Some(content_length) = response.content_length() {
718        if content_length > max_download {
719            return Err(ExtractError::Malicious(format!(
720                "Remote payload too large: {} bytes exceeds limit {}",
721                content_length, max_download
722            )));
723        }
724    }
725
726    let mut archive_file = tempfile::NamedTempFile::new()
727        .map_err(|e| format!("Failed to create temp archive: {}", e))?;
728    let archive_path = archive_file.path().to_path_buf();
729    let (result, exceeded) = {
730        let mut limited = LimitedWriter::new(archive_file.as_file_mut(), max_download);
731        let result = substrate::client::download_response_to_writer(
732            response,
733            url,
734            u64::MAX,
735            &mut limited,
736            |downloaded_bytes, total_bytes| {
737                sink.emit(crate::HyphaEvent::DownloadProgress {
738                    downloaded_bytes,
739                    total_bytes,
740                });
741            },
742        )
743        .await;
744        (result, limited.exceeded)
745    };
746    if let Err(e) = result {
747        if exceeded {
748            return Err(ExtractError::Malicious(format!(
749                "Download exceeded size limit of {} bytes",
750                max_download
751            )));
752        }
753        return Err(ExtractError::Failed(e.to_string()));
754    }
755    archive_file
756        .as_file()
757        .sync_all()
758        .map_err(|e| format!("Failed to sync temp archive: {}", e))?;
759
760    let content_dir = dest.join("content");
761    std::fs::create_dir_all(&content_dir)
762        .map_err(|e| format!("Failed to create content directory: {}", e))?;
763
764    // Detect format from URL and extract
765    let limits = ExtractLimits::from_cache(cache);
766    extract_archive(&archive_path, &content_dir, url, format_hint, &limits)?;
767
768    Ok(archive_file)
769}
770
771/// Cache already-compressed archive file for future delta downloads.
772pub(super) fn cache_archive_compressed_file(
773    cache: &CacheDir,
774    domain: &str,
775    hash: &str,
776    archive_path: &std::path::Path,
777) {
778    use std::io::Write;
779
780    let cache_dir = cache.domain(domain).spore_path(hash);
781    if !cache_dir.is_dir() {
782        return;
783    }
784    let cache_path = cache_dir.join("archive.tar.zst");
785    let mut src = match std::fs::File::open(archive_path) {
786        Ok(file) => file,
787        Err(_) => return,
788    };
789    let mut tmp = match tempfile::NamedTempFile::new_in(&cache_dir) {
790        Ok(file) => file,
791        Err(_) => return,
792    };
793    if std::io::copy(&mut src, &mut tmp).is_err() {
794        return;
795    }
796    if tmp.as_file_mut().flush().is_err() || tmp.as_file().sync_all().is_err() {
797        return;
798    }
799    let _ = tmp.persist(&cache_path);
800}
801
802/// Cache decoded raw tar file as compressed archive for future delta downloads.
803pub(super) fn cache_archive_raw_file(
804    cache: &CacheDir,
805    domain: &str,
806    hash: &str,
807    raw_tar_path: &std::path::Path,
808    _spore_max_extract_bytes: u64,
809) {
810    use std::io::Write;
811
812    let cache_dir = cache.domain(domain).spore_path(hash);
813    if std::fs::create_dir_all(&cache_dir).is_err() {
814        return;
815    }
816    let cache_path = cache_dir.join("archive.tar.zst");
817
818    let raw_data = match std::fs::read(raw_tar_path) {
819        Ok(d) => d,
820        Err(_) => return,
821    };
822    let compressed = match substrate::archive::encode_zstd(&raw_data, 3) {
823        Ok(c) => c,
824        Err(_) => return,
825    };
826    let mut tmp = match tempfile::NamedTempFile::new_in(&cache_dir) {
827        Ok(file) => file,
828        Err(_) => return,
829    };
830    if tmp.as_file_mut().write_all(&compressed).is_err() {
831        return;
832    }
833    if tmp.as_file_mut().flush().is_err() || tmp.as_file().sync_all().is_err() {
834        return;
835    }
836    let _ = tmp.persist(&cache_path);
837}
838
839/// Download a delta archive, apply it, and extract directly to dest (no content subdir).
840/// Used by grow/pull_from_archive. Returns decoded raw tar file for cache reuse.
841pub(super) async fn download_and_apply_delta(
842    delta_url: &str,
843    old_archive_path: &std::path::Path,
844    dest: &std::path::Path,
845    limits: &ExtractLimits,
846    spore_max_download_bytes: u64,
847) -> Result<tempfile::NamedTempFile, ExtractError> {
848    let budget = DeltaByteBudget::new(spore_max_download_bytes, limits);
849    let delta_file = tempfile::NamedTempFile::new()
850        .map_err(|e| format!("Failed to create temp delta file: {}", e))?;
851    download_file(
852        delta_url,
853        delta_file.path(),
854        budget.spore_max_download_bytes,
855    )
856    .await
857    .map_err(|e| match e {
858        ExtractError::Malicious(msg) => {
859            ExtractError::Malicious(format!("Failed to download delta: {}", msg))
860        }
861        ExtractError::PolicyRejected(msg) => {
862            ExtractError::PolicyRejected(format!("Failed to download delta: {}", msg))
863        }
864        ExtractError::Failed(msg) => {
865            ExtractError::Failed(format!("Failed to download delta: {}", msg))
866        }
867    })?;
868
869    let old_raw_tar = load_old_archive_dictionary(old_archive_path, &budget)?;
870    let raw_tar_file = tempfile::NamedTempFile::new()
871        .map_err(|e| format!("Failed to create temp decoded delta file: {}", e))?;
872    decode_delta_to_raw_tar_file(
873        delta_file.path(),
874        &old_raw_tar,
875        raw_tar_file.path(),
876        &budget,
877    )?;
878
879    std::fs::create_dir_all(dest).map_err(|e| format!("Failed to create directory: {}", e))?;
880    let raw_tar_bytes = std::fs::read(raw_tar_file.path())
881        .map_err(|e| format!("Failed to read decoded delta archive: {}", e))?;
882    let sub_limits = substrate::archive::ExtractLimits {
883        max_bytes: limits.max_bytes,
884        max_files: limits.max_files,
885        max_file_bytes: limits.max_file_bytes,
886    };
887    let entries = substrate::archive::extract_tar(&raw_tar_bytes, &sub_limits)?;
888    write_entries_to_disk(&entries, dest, &limits.reject_path_components)?;
889
890    Ok(raw_tar_file)
891}
892
893#[cfg(test)]
894#[allow(clippy::unwrap_used)]
895mod tests {
896    use super::*;
897
898    #[cfg(unix)]
899    #[test]
900    fn write_entries_to_disk_preserves_executable_bit() {
901        use std::os::unix::fs::PermissionsExt;
902
903        let tmp = tempfile::tempdir().unwrap();
904        let entries = vec![
905            substrate::archive::ArchiveEntry {
906                path: "bin/run.sh".to_string(),
907                kind: substrate::archive::EntryKind::File,
908                data: b"#!/bin/sh\n".to_vec(),
909                executable: true,
910            },
911            substrate::archive::ArchiveEntry {
912                path: "README.md".to_string(),
913                kind: substrate::archive::EntryKind::File,
914                data: b"hello\n".to_vec(),
915                executable: false,
916            },
917        ];
918
919        write_entries_to_disk(&entries, tmp.path(), &[]).unwrap();
920
921        let run_mode = std::fs::metadata(tmp.path().join("bin/run.sh"))
922            .unwrap()
923            .permissions()
924            .mode();
925        let readme_mode = std::fs::metadata(tmp.path().join("README.md"))
926            .unwrap()
927            .permissions()
928            .mode();
929        assert_ne!(run_mode & 0o111, 0);
930        assert_eq!(readme_mode & 0o111, 0);
931    }
932
933    #[cfg(unix)]
934    #[test]
935    fn extract_archive_tar_zstd_preserves_executable_bit() {
936        use std::os::unix::fs::PermissionsExt;
937
938        let tmp = tempfile::tempdir().unwrap();
939        let archive_path = tmp.path().join("archive.tar.zst");
940        let dest = tmp.path().join("dest");
941
942        let mut tar_bytes = Vec::new();
943        {
944            let mut tar = tar::Builder::new(&mut tar_bytes);
945            let mut header = tar::Header::new_gnu();
946            let content = b"#!/bin/sh\necho ok\n";
947            header.set_size(content.len() as u64);
948            header.set_mode(0o755);
949            header.set_cksum();
950            tar.append_data(&mut header, "bin/run.sh", &content[..])
951                .unwrap();
952            tar.finish().unwrap();
953        }
954        let compressed = substrate::archive::encode_zstd(&tar_bytes, 3).unwrap();
955        std::fs::write(&archive_path, compressed).unwrap();
956
957        let limits = ExtractLimits {
958            max_bytes: 1 << 20,
959            max_files: 10,
960            max_file_bytes: 1 << 20,
961            reject_path_components: vec![".git".to_string(), ".cmn".to_string()],
962        };
963        extract_archive(
964            &archive_path,
965            &dest,
966            "https://example.com/archive.tar.zst",
967            Some("tar+zstd"),
968            &limits,
969        )
970        .unwrap();
971
972        let mode = std::fs::metadata(dest.join("bin/run.sh"))
973            .unwrap()
974            .permissions()
975            .mode();
976        assert_ne!(mode & 0o111, 0);
977    }
978
979    #[test]
980    fn write_entries_to_disk_rejects_protected_control_components() {
981        let tmp = tempfile::tempdir().unwrap();
982        let reject = vec![".git".to_string(), ".cmn".to_string()];
983        for path in [
984            ".git/config",
985            "src/.git/config",
986            ".cmn/state.json",
987            "src/.cmn/state.json",
988        ] {
989            let entries = vec![substrate::archive::ArchiveEntry {
990                path: path.to_string(),
991                kind: substrate::archive::EntryKind::File,
992                data: b"blocked\n".to_vec(),
993                executable: false,
994            }];
995
996            let err = write_entries_to_disk(&entries, tmp.path(), &reject).unwrap_err();
997            assert!(
998                err.is_policy_rejected(),
999                "{} should be policy rejected, got {}",
1000                path,
1001                err
1002            );
1003            assert!(!err.is_malicious(), "{} must not be toxic", path);
1004        }
1005    }
1006
1007    #[test]
1008    fn write_entries_to_disk_allows_similar_non_control_names() {
1009        let tmp = tempfile::tempdir().unwrap();
1010        let reject = vec![".git".to_string(), ".cmn".to_string()];
1011        let entries = vec![
1012            substrate::archive::ArchiveEntry {
1013                path: ".gitignore".to_string(),
1014                kind: substrate::archive::EntryKind::File,
1015                data: b"target/\n".to_vec(),
1016                executable: false,
1017            },
1018            substrate::archive::ArchiveEntry {
1019                path: ".gitattributes".to_string(),
1020                kind: substrate::archive::EntryKind::File,
1021                data: b"* text=auto\n".to_vec(),
1022                executable: false,
1023            },
1024            substrate::archive::ArchiveEntry {
1025                path: "src/legit-cmn-file.txt".to_string(),
1026                kind: substrate::archive::EntryKind::File,
1027                data: b"ok\n".to_vec(),
1028                executable: false,
1029            },
1030            substrate::archive::ArchiveEntry {
1031                path: "docs/git-notes.md".to_string(),
1032                kind: substrate::archive::EntryKind::File,
1033                data: b"ok\n".to_vec(),
1034                executable: false,
1035            },
1036        ];
1037
1038        write_entries_to_disk(&entries, tmp.path(), &reject).unwrap();
1039
1040        assert!(tmp.path().join(".gitignore").exists());
1041        assert!(tmp.path().join(".gitattributes").exists());
1042        assert!(tmp.path().join("src/legit-cmn-file.txt").exists());
1043        assert!(tmp.path().join("docs/git-notes.md").exists());
1044    }
1045
1046    #[tokio::test(flavor = "current_thread")]
1047    #[allow(clippy::await_holding_lock)]
1048    async fn archive_endpoint_retry_bad_then_good_does_not_mark_toxic() {
1049        let _lock = crate::config::ENV_LOCK.lock().unwrap();
1050        let home = tempfile::tempdir().unwrap();
1051        std::env::set_var("CMN_HOME", home.path());
1052        let cache = crate::cache::CacheDir::new().unwrap();
1053        let domain_cache = cache.domain("example.com");
1054
1055        let manifest = serde_json::json!({
1056            "$schema": "https://cmn.dev/schemas/v1/spore.json",
1057            "capsule": {
1058                "uri": "cmn://example.com/b3.placeholder",
1059                "core": {
1060                    "name": "retry-test",
1061                    "domain": "example.com",
1062                    "key": "ed25519.5XmkQ9vZP8nL",
1063                    "synopsis": "Retry test",
1064                    "intent": ["Testing"],
1065                    "license": "MIT",
1066                    "mutations": [],
1067                    "size_bytes": 8,
1068                    "updated_at_epoch_ms": 1234567890000_u64,
1069                    "bonds": [],
1070                    "tree": {
1071                        "algorithm": "blob_tree_blake3_nfc",
1072                        "exclude_names": [],
1073                        "follow_rules": []
1074                    }
1075                },
1076                "core_signature": "ed25519.fakesig",
1077                "dist": [{"type": "archive"}]
1078            },
1079            "capsule_signature": "ed25519.fakesig"
1080        });
1081
1082        let good_content = tempfile::tempdir().unwrap();
1083        std::fs::write(good_content.path().join("README.md"), "expected").unwrap();
1084        let spore = substrate::decode_spore(&manifest).unwrap();
1085        let entries = crate::tree::walk_dir(
1086            good_content.path(),
1087            &spore.tree().exclude_names,
1088            &spore.tree().follow_rules,
1089        )
1090        .unwrap();
1091        let tree_hash = spore.tree().compute_hash(&entries).unwrap();
1092        let valid_hash = spore.computed_uri_hash_from_tree_hash(&tree_hash).unwrap();
1093
1094        let endpoints = vec![
1095            substrate::CmnEndpoint {
1096                kind: "archive".to_string(),
1097                url: "https://bad.example.com/archive/{hash}.tar.zst".to_string(),
1098                hash: String::new(),
1099                hashes: vec![],
1100                format: Some("tar+zstd".to_string()),
1101                delta_url: None,
1102            },
1103            substrate::CmnEndpoint {
1104                kind: "archive".to_string(),
1105                url: "https://good.example.com/archive/{hash}.tar.zst".to_string(),
1106                hash: String::new(),
1107                hashes: vec![],
1108                format: Some("tar+zstd".to_string()),
1109                delta_url: None,
1110            },
1111        ];
1112        let target_parent = tempfile::tempdir().unwrap();
1113        let target = target_parent.path().join("spawned");
1114        let mut attempts = 0usize;
1115        let mut seen_urls = Vec::new();
1116
1117        try_archive_endpoints(&crate::NoopSink, &valid_hash, &endpoints, |_, url| {
1118            seen_urls.push(url.clone());
1119            attempts += 1;
1120            let attempt = attempts;
1121            let valid_hash = valid_hash.clone();
1122            let target = target.clone();
1123            let manifest = &manifest;
1124            let domain_cache = &domain_cache;
1125
1126            async move {
1127                std::fs::create_dir_all(&target).unwrap();
1128                let content = if attempt == 1 { "tampered" } else { "expected" };
1129                std::fs::write(target.join("README.md"), content).unwrap();
1130
1131                verify_downloaded_content(
1132                    &crate::NoopSink,
1133                    &target,
1134                    &target,
1135                    manifest,
1136                    &valid_hash,
1137                    domain_cache,
1138                )
1139                .map_err(|e| {
1140                    ArchiveEndpointAttemptError::Retry(format!(
1141                        "Failed to verify content from {}: {}",
1142                        url, e
1143                    ))
1144                })?;
1145
1146                Ok(())
1147            }
1148        })
1149        .await
1150        .unwrap();
1151
1152        assert_eq!(attempts, 2);
1153        assert_eq!(
1154            seen_urls,
1155            vec![
1156                format!("https://bad.example.com/archive/{}.tar.zst", valid_hash),
1157                format!("https://good.example.com/archive/{}.tar.zst", valid_hash)
1158            ]
1159        );
1160        assert_eq!(
1161            std::fs::read_to_string(target.join("README.md")).unwrap(),
1162            "expected"
1163        );
1164        assert!(
1165            domain_cache.load_taste(&valid_hash).is_none(),
1166            "bad unverified endpoint must not persist a toxic verdict"
1167        );
1168
1169        std::env::remove_var("CMN_HOME");
1170    }
1171}