Skip to main content

hypha/visitor/
mod.rs

1//! Visitor module for tasting and resolving spores from the network.
2//!
3//! Resolution flow:
4//! 1. Parse CMN URI (cmn://domain/hash)
5//! 2. Get cmn.json (from cache or fetch)
6//! 3. Use endpoint template to build actual URL
7//! 4. Fetch and verify spore manifest
8//! 5. Verify signature against public key from cmn.json
9//! 6. Download content and verify hash matches URI
10
11use serde::Serialize;
12use serde_json::json;
13use std::path::Path;
14use std::process::ExitCode;
15
16use crate::api::Output;
17use crate::cache::{CacheDir, DomainCache, TasteVerdictCache};
18use substrate::{CmnCapsuleEntry, CmnEndpoint, CmnEntry, CmnUri, PrettyJson};
19
20mod absorb;
21mod bond;
22mod common;
23mod crypto;
24pub(crate) mod extract;
25mod grow;
26mod lineage;
27mod search;
28mod sense;
29mod spawn;
30pub(crate) mod steps;
31mod taste;
32
33use common::*;
34
35/// Structured error for archive extraction and file copy operations.
36#[derive(Debug, thiserror::Error)]
37pub enum ExtractError {
38    /// Content is actively dangerous (symlinks, path traversal, zip bombs).
39    /// Triggers automatic toxic verdict + cleanup.
40    #[error("MALICIOUS: {0}")]
41    Malicious(String),
42    /// Non-malicious failure (I/O error, unsupported format, etc.).
43    #[error("{0}")]
44    Failed(String),
45}
46
47impl ExtractError {
48    pub fn is_malicious(&self) -> bool {
49        matches!(self, Self::Malicious(_))
50    }
51}
52
53impl From<String> for ExtractError {
54    fn from(s: String) -> Self {
55        Self::Failed(s)
56    }
57}
58
59impl From<substrate::archive::ExtractError> for ExtractError {
60    fn from(e: substrate::archive::ExtractError) -> Self {
61        match e {
62            substrate::archive::ExtractError::Malicious(msg) => Self::Malicious(msg),
63            substrate::archive::ExtractError::Failed(msg) => Self::Failed(msg),
64        }
65    }
66}
67
68// Re-export extract module items for internal use
69pub(crate) use extract::{
70    decode_delta_to_raw_tar_file, download_and_extract_to_dir, download_file,
71    load_old_archive_dictionary, DeltaByteBudget, ExtractLimits,
72};
73
74// Re-export all public items so external callers don't break
75pub use absorb::{absorb, handle_absorb};
76pub use bond::{bond_fetch, handle_bond_fetch};
77pub(crate) use common::decode_spore_manifest;
78pub use crypto::{
79    embedded_spore_author_key, fetch_spore_manifest, get_cmn_entry, verify_content_hash,
80    verify_manifest_both_signatures, verify_manifest_two_key_signatures,
81    verify_spore_with_key_trust,
82};
83pub use grow::{grow, handle_grow};
84pub use lineage::{handle_lineage, lineage_in, lineage_out};
85pub use search::{handle_search, search, search_with_bond};
86pub use sense::{handle_sense, sense};
87pub use spawn::{handle_spawn, spawn};
88pub use taste::{check_taste, check_taste_verdict_for_replicate, handle_taste, taste};
89
90// Cross-submodule imports: these are brought into scope here so that
91// submodules using `use super::*` can access sibling module functions.
92use bond::bond_in_dir;
93use crypto::{verify_manifest_capsule_signature, verify_manifest_core_signature};
94use spawn::{
95    cache_archive_raw_file, download_and_apply_delta,
96    download_and_extract_tarball_cached_with_progress, extract_archive,
97};
98use substrate::client::BondNode;
99
100/// Thin wrapper around substrate::client::fetch_lineage for internal callers.
101async fn fetch_bonds(
102    synapse_url: &str,
103    hash: &str,
104    direction: &str,
105    max_depth: u32,
106    token: Option<&str>,
107) -> Result<substrate::client::BondsResponse, crate::HyphaError> {
108    let client = substrate::client::http_client(30).map_err(|e| {
109        crate::HyphaError::new(
110            "synapse_error",
111            format!("Failed to create HTTP client: {}", e),
112        )
113    })?;
114    substrate::client::fetch_lineage(
115        &client,
116        synapse_url,
117        hash,
118        direction,
119        max_depth,
120        fetch_opts(token),
121    )
122    .await
123    .map_err(|e| crate::HyphaError::new("synapse_error", e.to_string()))
124}
125
126/// Clone git repository to a directory (shallow)
127pub async fn clone_git_to_dir(
128    url: &str,
129    git_ref: Option<&str>,
130    dest: &std::path::Path,
131) -> Result<(), crate::git::GitError> {
132    std::fs::create_dir_all(dest)?;
133
134    let url = url.to_string();
135    let git_ref = git_ref.map(|s| s.to_string());
136    let dest = dest.to_path_buf();
137    tokio::task::spawn_blocking(move || {
138        crate::git::clone_repo(&url, &dest, true)?;
139        if let Some(r) = git_ref.as_deref() {
140            crate::git::checkout_ref(&dest, r)?;
141        }
142        Ok::<(), crate::git::GitError>(())
143    })
144    .await
145    .map_err(|e| crate::git::GitError::Command(format!("Git clone task failed: {}", e)))??;
146
147    Ok(())
148}
149
150/// Mark a spore as toxic in the local taste cache.
151/// Called automatically when malicious archive content is detected.
152fn mark_toxic(domain_cache: &crate::cache::DomainCache, hash: &str, reason: &str) {
153    let verdict = TasteVerdictCache {
154        verdict: substrate::TasteVerdict::Toxic,
155        notes: Some(format!("Auto-detected: {}", reason)),
156        tasted_at_epoch_ms: crate::time::now_epoch_ms(),
157    };
158    let _ = domain_cache.save_taste(hash, &verdict);
159}
160
161/// Remove a directory, emitting a warning if the removal itself fails.
162fn warn_remove_dir(sink: &dyn crate::EventSink, path: &std::path::Path) {
163    if let Err(e) = std::fs::remove_dir_all(path) {
164        sink.emit(crate::HyphaEvent::Warn {
165            message: format!("Failed to clean up directory {}: {}", path.display(), e),
166        });
167    }
168}
169
170/// Fetch a spore to cache — library-level helper.
171///
172/// Ensures the spore is downloaded, verified, and cached.
173/// If already cached, returns immediately.
174async fn fetch_spore_to_cache(
175    sink: &dyn crate::EventSink,
176    cache: &CacheDir,
177    uri_str: &str,
178) -> Result<(), crate::HyphaError> {
179    let uri = CmnUri::parse(uri_str).map_err(|e| crate::HyphaError::new("invalid_uri", e))?;
180
181    let hash = uri
182        .hash
183        .as_deref()
184        .ok_or_else(|| crate::HyphaError::new("invalid_uri", "spore URI must include a hash"))?;
185
186    let domain_cache = cache.domain(&uri.domain);
187    let target_path = cache.spore_path(&uri.domain, hash);
188
189    // Already cached — requires both the directory and content/ to exist
190    if target_path.exists() {
191        if target_path.join("content").exists() {
192            sink.emit(crate::HyphaEvent::Progress {
193                current: 6,
194                total: 6,
195                message: "Cached".to_string(),
196            });
197            return Ok(());
198        }
199        // Partial cache (e.g. spore.json saved but content download failed) — clean up
200        let _ = std::fs::remove_dir_all(&target_path);
201    }
202
203    // Step 1: cmn.json
204    sink.emit(crate::HyphaEvent::Progress {
205        current: 1,
206        total: 6,
207        message: "Fetching cmn.json".to_string(),
208    });
209    let entry = get_cmn_entry(sink, &domain_cache, cache.cmn_ttl_ms).await?;
210
211    let capsule = primary_capsule(&entry)?;
212    let public_key = capsule.key.clone();
213    let ep = &capsule.endpoints;
214
215    // Step 2: Fetching manifest (domain → synapse fallback)
216    sink.emit(crate::HyphaEvent::Progress {
217        current: 2,
218        total: 6,
219        message: "Fetching spore manifest".to_string(),
220    });
221    let cfg = crate::config::HyphaConfig::load();
222    let manifest = match fetch_spore_manifest(capsule, hash).await {
223        Ok(m) => m,
224        Err(domain_err) if can_synapse_fallback(&domain_cache, &public_key, &cfg.cache) => {
225            if let Some((synapse_url, synapse_token)) = resolve_default_synapse_url(&cfg) {
226                sink.emit(crate::HyphaEvent::Warn {
227                    message: format!(
228                        "Domain unreachable for spore manifest, trying synapse: {}",
229                        domain_err
230                    ),
231                });
232                let client = substrate::client::http_client(30).map_err(|e| {
233                    crate::HyphaError::new("manifest_failed", format!("HTTP client error: {e}"))
234                })?;
235                let resp = substrate::client::fetch_synapse_spore(
236                    &client,
237                    &synapse_url,
238                    hash,
239                    fetch_opts(synapse_token.as_deref()),
240                )
241                .await
242                .map_err(|e| {
243                    crate::HyphaError::new(
244                        "manifest_failed",
245                        format!("Domain: {domain_err}; Synapse: {e}"),
246                    )
247                })?;
248                resp.result.spore
249            } else {
250                return Err(domain_err);
251            }
252        }
253        Err(e) => return Err(e),
254    };
255
256    // Step 3: Verifying spore with key trust
257    sink.emit(crate::HyphaEvent::Progress {
258        current: 3,
259        total: 6,
260        message: "Verifying spore".to_string(),
261    });
262    let key_trust_ttl_ms = cfg.cache.key_trust_ttl_s * 1000;
263    let clock_skew_tolerance_ms = cfg.cache.clock_skew_tolerance_s * 1000;
264    let key_trust_refresh_mode = cfg.cache.key_trust_refresh_mode;
265    let key_trust_synapse_witness_mode = cfg.cache.key_trust_synapse_witness_mode;
266    let resolved_synapse = resolve_default_synapse_url(&cfg);
267    let synapse_url = resolved_synapse.as_ref().map(|(url, _)| url.as_str());
268    let synapse_token = resolved_synapse
269        .as_ref()
270        .and_then(|(_, tok)| tok.as_deref());
271    verify_spore_with_key_trust(
272        sink,
273        &manifest,
274        &public_key,
275        &domain_cache,
276        cache.cmn_ttl_ms,
277        key_trust_ttl_ms,
278        clock_skew_tolerance_ms,
279        key_trust_refresh_mode,
280        key_trust_synapse_witness_mode,
281        false,
282        synapse_url,
283        synapse_token,
284    )
285    .await?;
286    let spore = decode_spore_manifest(&manifest)?;
287
288    let dist = spore.distributions();
289    if dist.is_empty() {
290        return Err(crate::HyphaError::new(
291            "manifest_failed",
292            "No distribution options in spore manifest",
293        ));
294    }
295
296    // Create target directory
297    std::fs::create_dir_all(&target_path).map_err(|e| {
298        crate::HyphaError::new("dir_error", format!("Failed to create directory: {}", e))
299    })?;
300
301    // Save manifest
302    let manifest_path = target_path.join("spore.json");
303    std::fs::write(
304        &manifest_path,
305        serde_json::to_string_pretty(&spore).unwrap_or_default(),
306    )
307    .map_err(|e| {
308        crate::HyphaError::new("write_error", format!("Failed to save manifest: {}", e))
309    })?;
310
311    // Step 5: Downloading content
312    sink.emit(crate::HyphaEvent::Progress {
313        current: 5,
314        total: 6,
315        message: "Downloading content".to_string(),
316    });
317    let domain_cache = cache.domain(&uri.domain);
318
319    let archive_endpoints = ep
320        .iter()
321        .filter(|endpoint| endpoint.kind == "archive")
322        .collect::<Vec<_>>();
323    let mut downloaded = false;
324    for dist_entry in dist {
325        if dist_has_type(dist_entry, "archive") {
326            for archive_ep in &archive_endpoints {
327                let archive_url = build_archive_url_from_endpoint(archive_ep, hash)?;
328                match download_and_extract_tarball_cached_with_progress(
329                    &archive_url,
330                    &target_path,
331                    cache,
332                    &uri.domain,
333                    hash,
334                    archive_ep.format.as_deref(),
335                    sink,
336                )
337                .await
338                {
339                    Ok(_) => {
340                        downloaded = true;
341                        break;
342                    }
343                    Err(e) if e.is_malicious() => {
344                        warn_remove_dir(sink, &target_path);
345                        let msg = e.to_string();
346                        mark_toxic(&domain_cache, hash, &msg);
347                        return Err(crate::HyphaError::new("TOXIC", msg));
348                    }
349                    Err(e) => {
350                        sink.emit(crate::HyphaEvent::Warn {
351                            message: format!("Failed to download from {}: {}", archive_url, e),
352                        });
353                    }
354                }
355            }
356            if downloaded {
357                break;
358            }
359        } else if let Some(git_url) = dist_git_url(dist_entry) {
360            let git_ref = dist_git_ref(dist_entry);
361            match clone_git_repo(git_url, git_ref, &target_path).await {
362                Ok(_) => {
363                    downloaded = true;
364                    break;
365                }
366                Err(e) => {
367                    sink.emit(crate::HyphaEvent::Warn {
368                        message: format!("Failed to clone from {}: {}", git_url, e),
369                    });
370                }
371            }
372        }
373    }
374
375    if !downloaded {
376        warn_remove_dir(sink, &target_path);
377        return Err(crate::HyphaError::new(
378            "fetch_failed",
379            "Failed to download from any distribution source",
380        ));
381    }
382
383    // Step 6: Verifying content hash
384    sink.emit(crate::HyphaEvent::Progress {
385        current: 6,
386        total: 6,
387        message: "Verifying content hash".to_string(),
388    });
389    let content_path = target_path.join("content");
390    if let Err(e) = verify_content_hash(&content_path, hash, &manifest) {
391        warn_remove_dir(sink, &target_path);
392        let msg = e.to_string();
393        mark_toxic(&domain_cache, hash, &msg);
394        return Err(crate::HyphaError::new("TOXIC", msg));
395    }
396
397    Ok(())
398}
399
400/// Clone a git repository to the cache path (shallow clone for fetch)
401async fn clone_git_repo(
402    url: &str,
403    git_ref: Option<&str>,
404    dest: &std::path::Path,
405) -> Result<(), crate::git::GitError> {
406    let content_dir = dest.join("content");
407    std::fs::create_dir_all(&content_dir)?;
408
409    let url = url.to_string();
410    let git_ref = git_ref.map(|s| s.to_string());
411    tokio::task::spawn_blocking(move || {
412        crate::git::clone_repo(&url, &content_dir, true)?;
413        if let Some(r) = git_ref.as_deref() {
414            crate::git::checkout_ref(&content_dir, r)?;
415        }
416        Ok::<(), crate::git::GitError>(())
417    })
418    .await
419    .map_err(|e| crate::git::GitError::Command(format!("Git clone task failed: {}", e)))??;
420
421    Ok(())
422}
423
424// URI parsing tests are in substrate/src/uri.rs
425
426#[cfg(test)]
427#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
428mod tests {
429
430    use super::*;
431
432    fn sanitize_for_path(input: &str) -> String {
433        substrate::local_dir_name(None, Some(input), "spore")
434    }
435
436    #[test]
437    fn test_sanitize_for_path_basic() {
438        assert_eq!(sanitize_for_path("cmn-spec"), "cmn-spec");
439        assert_eq!(sanitize_for_path("my_project"), "my_project");
440    }
441
442    #[test]
443    fn test_sanitize_for_path_spaces() {
444        assert_eq!(
445            sanitize_for_path("CMN Protocol Specification"),
446            "CMN-Protocol-Specification"
447        );
448        assert_eq!(sanitize_for_path("a  b"), "a--b");
449    }
450
451    #[test]
452    fn test_sanitize_for_path_forbidden_chars() {
453        assert_eq!(sanitize_for_path("foo/bar"), "foo-bar");
454        assert_eq!(sanitize_for_path("a:b*c?d"), "a-b-c-d");
455    }
456
457    #[test]
458    fn test_sanitize_for_path_unicode_preserved() {
459        assert_eq!(sanitize_for_path("CMN协议规范"), "CMN协议规范");
460        assert_eq!(sanitize_for_path("数据库工具"), "数据库工具");
461        assert_eq!(sanitize_for_path("cafe\u{301}-utils"), "cafe\u{301}-utils");
462    }
463
464    #[test]
465    fn test_sanitize_for_path_empty_fallback() {
466        assert_eq!(sanitize_for_path(""), "spore");
467        assert_eq!(sanitize_for_path("---"), "spore");
468    }
469
470    #[test]
471    fn test_sanitize_for_path_traversal_safe() {
472        assert_eq!(sanitize_for_path(".."), "spore");
473        assert_eq!(sanitize_for_path("."), "spore");
474        assert_eq!(sanitize_for_path("../etc"), "-etc");
475        assert_eq!(sanitize_for_path(".git"), "git");
476        assert_eq!(sanitize_for_path(".cmn"), "cmn");
477        assert_eq!(sanitize_for_path("...hidden"), "hidden");
478    }
479
480    #[test]
481    fn test_sanitize_for_path_control_chars() {
482        assert_eq!(sanitize_for_path("foo\0bar"), "foo-bar");
483        assert_eq!(sanitize_for_path("\x01\x02"), "spore");
484        assert_eq!(sanitize_for_path("ok\x7f"), "ok");
485    }
486
487    #[test]
488    fn test_spawned_from_hash_present() {
489        let manifest = serde_json::json!({
490            "$schema": "https://cmn.dev/schemas/v1/spore.json",
491            "capsule": {
492                "uri": "cmn://example.com/b3.child",
493                "core": {
494                    "name": "test",
495                    "domain": "example.com",
496                    "key": "ed25519.5XmkQ9vZP8nL",
497                    "synopsis": "Test",
498                    "intent": ["Testing"],
499                    "license": "MIT",
500                    "mutations": [],
501                    "size_bytes": 512,
502                    "updated_at_epoch_ms": 1700000000000_u64,
503                    "bonds": [
504                        {"uri": "cmn://example.com/b3.3yMR7vZQ9hL", "relation": "spawned_from"}
505                    ],
506                    "tree": { "algorithm": "blob_tree_blake3_nfc", "exclude_names": [], "follow_rules": [] }
507                },
508                "core_signature": "sig",
509                "dist": [{"type": "archive"}]
510            },
511            "capsule_signature": "sig"
512        });
513        assert_eq!(
514            grow::spawned_from_hash(&manifest),
515            Some("b3.3yMR7vZQ9hL".to_string())
516        );
517    }
518
519    #[test]
520    fn test_spawned_from_hash_missing() {
521        let manifest = serde_json::json!({
522            "$schema": "https://cmn.dev/schemas/v1/spore.json",
523            "capsule": {
524                "uri": "cmn://example.com/b3.child",
525                "core": {
526                    "name": "test",
527                    "domain": "example.com",
528                    "key": "ed25519.5XmkQ9vZP8nL",
529                    "synopsis": "Test",
530                    "intent": ["Testing"],
531                    "license": "MIT",
532                    "mutations": [],
533                    "size_bytes": 512,
534                    "updated_at_epoch_ms": 1700000000000_u64,
535                    "bonds": [
536                        {"uri": "cmn://example.com/b3.8cQnH4xPmZ2v", "relation": "depends_on"}
537                    ],
538                    "tree": { "algorithm": "blob_tree_blake3_nfc", "exclude_names": [], "follow_rules": [] }
539                },
540                "core_signature": "sig",
541                "dist": [{"type": "archive"}]
542            },
543            "capsule_signature": "sig"
544        });
545        assert_eq!(grow::spawned_from_hash(&manifest), None);
546    }
547
548    #[test]
549    fn test_spawned_from_hash_no_bonds() {
550        let manifest = serde_json::json!({
551            "$schema": "https://cmn.dev/schemas/v1/spore.json",
552            "capsule": {
553                "uri": "cmn://example.com/b3.child",
554                "core": {
555                    "name": "test",
556                    "domain": "example.com",
557                    "synopsis": "Test",
558                    "intent": ["Testing"],
559                    "license": "MIT"
560                },
561                "core_signature": "sig"
562            },
563            "capsule_signature": "sig"
564        });
565        assert_eq!(grow::spawned_from_hash(&manifest), None);
566    }
567
568    #[test]
569    fn test_spawned_from_hash_empty_manifest() {
570        let manifest = serde_json::json!({});
571        assert_eq!(grow::spawned_from_hash(&manifest), None);
572    }
573
574    fn test_client() -> reqwest::Client {
575        reqwest::Client::builder()
576            .timeout(std::time::Duration::from_secs(1))
577            .build()
578            .unwrap()
579    }
580
581    /// Verify substrate::client::search accepts the bond_filter parameter.
582    /// Uses a non-routable address so the HTTP call fails fast.
583    #[tokio::test]
584    async fn test_fetch_search_with_bond() {
585        let result = substrate::client::search(
586            &test_client(),
587            "http://127.0.0.1:1",
588            "test",
589            None,
590            None,
591            Some("spawned_from:cmn://d.dev/b3.3yMR7vZQ9hL"),
592            5,
593            Default::default(),
594        )
595        .await;
596        assert!(result.is_err());
597    }
598
599    /// Verify substrate::client::search works without bond_filter.
600    #[tokio::test]
601    async fn test_fetch_search_without_bond() {
602        let result = substrate::client::search(
603            &test_client(),
604            "http://127.0.0.1:1",
605            "test",
606            Some("cmn.dev"),
607            Some("MIT"),
608            None,
609            10,
610            Default::default(),
611        )
612        .await;
613        assert!(result.is_err());
614    }
615
616    /// Verify substrate::client::search with comma-separated bond filters.
617    #[tokio::test]
618    async fn test_fetch_search_with_multi_bond() {
619        let result = substrate::client::search(
620            &test_client(),
621            "http://127.0.0.1:1",
622            "tools",
623            None,
624            None,
625            Some("spawned_from:cmn://a.dev/b3.3yMR7vZQ9hL,follows:cmn://b.dev/b3.8cQnH4xPmZ2v"),
626            20,
627            Default::default(),
628        )
629        .await;
630        assert!(result.is_err());
631    }
632
633    /// search_with_bond with bond_filter=None delegates to the same path as search().
634    /// Both should produce the same error when pointed at an unreachable synapse.
635    #[tokio::test]
636    async fn test_search_with_bond_none_delegates() {
637        let result_with_ref = search_with_bond(
638            "test",
639            Some("http://127.0.0.1:1"),
640            None,
641            None,
642            None,
643            None,
644            20,
645            &crate::NoopSink,
646        )
647        .await;
648        let result_plain = search(
649            "test",
650            Some("http://127.0.0.1:1"),
651            None,
652            None,
653            None,
654            20,
655            &crate::NoopSink,
656        )
657        .await;
658        assert!(result_with_ref.is_err());
659        assert!(result_plain.is_err());
660    }
661
662    /// search_with_bond with a bond_filter should also fail at the HTTP level
663    /// (not at argument handling).
664    #[tokio::test]
665    async fn test_search_with_bond_passes_bond_through() {
666        let result = search_with_bond(
667            "http client",
668            Some("http://127.0.0.1:1"),
669            None,
670            Some("cmn.dev"),
671            Some("MIT"),
672            Some("spawned_from:cmn://cmn.dev/b3.3yMR7vZQ9hL"),
673            10,
674            &crate::NoopSink,
675        )
676        .await;
677        assert!(result.is_err());
678        let err = result.unwrap_err().to_string();
679        // Should fail at HTTP, not at bond parsing
680        assert!(
681            err.contains("synapse_error"),
682            "should fail at HTTP level: {}",
683            err
684        );
685    }
686}