Skip to main content

heldar_kernel/services/
registry.rs

1//! Plugin registry service (Phase C): fetch + verify + cache remote catalogs, and merge the bundled +
2//! remote catalogs with live module state into the store view.
3//!
4//! Remote catalogs are fetched with a dedicated SSRF-guarded client (no redirects, scheme + literal-IP
5//! checks, size cap, timeout) and Ed25519-verified against the pinned + operator keys. An unverified
6//! source contributes ZERO entries unless `registry_allow_unverified` is set. Everything is cached
7//! in-memory so the store renders offline (and instantly) between refreshes; the bundled catalog is
8//! always present. The merge cross-references [`AppState::modules`] (compiled) and the
9//! `module_registrations` table (installed sidecars) to compute each entry's shelf + state.
10
11use std::collections::HashSet;
12use std::net::IpAddr;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17
18use crate::config::Config;
19use crate::modules::{ModuleManifest, ModuleRegistration, MountKind};
20use crate::registry::{
21    bundled_catalog, verify_detached, CatalogDoc, CatalogEntry, EntryState, InstallSpec, Keyset,
22    RegistryEntryView, RegistrySourceView, RegistryView, Shelf, SignatureDoc,
23};
24
25/// Hard cap on a fetched catalog/signature body (defense-in-depth against a hostile/huge response).
26const MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
27
28/// One remote source's last-known state (kept in memory so the store renders offline).
29struct CachedSource {
30    url: String,
31    name: String,
32    verified: bool,
33    first_party: bool,
34    key_id: Option<String>,
35    error: Option<String>,
36    fetched_at: Option<DateTime<Utc>>,
37    entries: Vec<CatalogEntry>,
38}
39
40/// The plugin store's catalog engine, held in [`crate::state::AppState`].
41pub struct CatalogService {
42    enabled: bool,
43    urls: Vec<String>,
44    refresh_s: u64,
45    allow_unverified: bool,
46    allow_private: bool,
47    keyset: Keyset,
48    client: reqwest::Client,
49    remote: RwLock<Vec<CachedSource>>,
50}
51
52impl CatalogService {
53    pub fn new(cfg: &Config) -> Self {
54        let client = reqwest::Client::builder()
55            .timeout(Duration::from_secs(cfg.registry_fetch_timeout_s.max(1)))
56            .redirect(reqwest::redirect::Policy::none())
57            .build()
58            .unwrap_or_default();
59        let remote = cfg
60            .registry_urls
61            .iter()
62            .map(|u| CachedSource {
63                url: u.clone(),
64                name: u.clone(),
65                verified: false,
66                first_party: false,
67                key_id: None,
68                error: Some("not yet fetched".into()),
69                fetched_at: None,
70                entries: Vec::new(),
71            })
72            .collect();
73        Self {
74            enabled: cfg.registry_enabled,
75            urls: cfg.registry_urls.clone(),
76            refresh_s: cfg.registry_refresh_s.max(30),
77            allow_unverified: cfg.registry_allow_unverified,
78            allow_private: cfg.registry_allow_private,
79            keyset: Keyset::load(&cfg.registry_trusted_keys),
80            client,
81            remote: RwLock::new(remote),
82        }
83    }
84
85    /// Re-fetch + verify every configured remote registry, updating the in-memory cache. Best-effort:
86    /// a failing source keeps its prior cached entries and records the error.
87    pub async fn refresh(&self) {
88        if !self.enabled {
89            return;
90        }
91        for url in &self.urls {
92            let result = self.fetch_one(url).await;
93            let mut guard = self.remote.write().unwrap();
94            if let Some(slot) = guard.iter_mut().find(|s| &s.url == url) {
95                match result {
96                    Ok(fresh) => *slot = fresh,
97                    Err(e) => {
98                        slot.error = Some(e);
99                        slot.fetched_at = Some(Utc::now());
100                        // keep last-good entries + verified flag for offline resilience
101                    }
102                }
103            }
104        }
105    }
106
107    async fn fetch_one(&self, url: &str) -> Result<CachedSource, String> {
108        validate_registry_url(url, self.allow_private)?;
109        let raw = self.get_capped(url).await?;
110        let sig_raw = self.get_capped(&format!("{url}.sig")).await.ok();
111
112        let doc: CatalogDoc =
113            serde_json::from_slice(&raw).map_err(|e| format!("catalog parse: {e}"))?;
114        if doc.format != "heldar-catalog/v1" {
115            return Err(format!("unsupported catalog format `{}`", doc.format));
116        }
117
118        // Verify the detached signature over the EXACT bytes. No .sig => unverified.
119        let verification = match sig_raw
120            .as_deref()
121            .and_then(|b| serde_json::from_slice::<SignatureDoc>(b).ok())
122        {
123            Some(sig) => verify_detached(&raw, &sig, &self.keyset, doc.expires_at, Utc::now()),
124            None => crate::registry::Verification {
125                verified: false,
126                key_id: None,
127                publisher: None,
128                first_party: false,
129                reason: Some("no_signature".into()),
130            },
131        };
132
133        // Fail-closed: drop entries from an unverified source unless explicitly allowed.
134        let entries = if verification.verified || self.allow_unverified {
135            doc.entries
136        } else {
137            Vec::new()
138        };
139        let error = if verification.verified {
140            None
141        } else {
142            Some(format!(
143                "unverified ({})",
144                verification.reason.as_deref().unwrap_or("unknown")
145            ))
146        };
147        Ok(CachedSource {
148            url: url.to_string(),
149            name: if doc.name.is_empty() {
150                url.to_string()
151            } else {
152                doc.name
153            },
154            verified: verification.verified,
155            first_party: verification.first_party,
156            key_id: verification.key_id,
157            error,
158            fetched_at: Some(Utc::now()),
159            entries,
160        })
161    }
162
163    /// GET with a STREAMING size cap: bytes are accumulated chunk-by-chunk and the fetch aborts the
164    /// instant the running total exceeds `MAX_BODY_BYTES`, so a chunked / length-omitting hostile body
165    /// can never force an unbounded allocation (the content-length pre-check is only an early-out).
166    async fn get_capped(&self, url: &str) -> Result<Vec<u8>, String> {
167        let mut resp = self
168            .client
169            .get(url)
170            .send()
171            .await
172            .map_err(|e| format!("fetch {url}: {e}"))?;
173        if !resp.status().is_success() {
174            return Err(format!("fetch {url}: HTTP {}", resp.status()));
175        }
176        if let Some(len) = resp.content_length() {
177            if len as usize > MAX_BODY_BYTES {
178                return Err(format!("fetch {url}: body too large ({len} bytes)"));
179            }
180        }
181        let mut buf: Vec<u8> = Vec::new();
182        while let Some(chunk) = resp.chunk().await.map_err(|e| format!("read {url}: {e}"))? {
183            if buf.len() + chunk.len() > MAX_BODY_BYTES {
184                return Err(format!("fetch {url}: body exceeds {MAX_BODY_BYTES} bytes"));
185            }
186            buf.extend_from_slice(&chunk);
187        }
188        Ok(buf)
189    }
190
191    /// Merge bundled + remote catalogs with live state into the store view. `modules` = AppState.modules
192    /// (compiled-in), `registrations` = installed sidecars.
193    pub fn view(
194        &self,
195        modules: &[ModuleManifest],
196        registrations: &[ModuleRegistration],
197    ) -> RegistryView {
198        let compiled_ids: HashSet<&str> = modules
199            .iter()
200            .filter(|m| m.mount == MountKind::Bundled)
201            .map(|m| m.id.as_str())
202            .collect();
203        let installed: std::collections::HashMap<&str, &str> = registrations
204            .iter()
205            .map(|r| (r.id.as_str(), r.health.as_str()))
206            .collect();
207
208        let mut seen: HashSet<String> = HashSet::new();
209        let mut entries: Vec<RegistryEntryView> = Vec::new();
210        let mut sources: Vec<RegistrySourceView> = Vec::new();
211
212        // 1) bundled — trusted by construction.
213        let bundled = bundled_catalog();
214        sources.push(RegistrySourceView {
215            source: "bundled".into(),
216            name: if bundled.name.is_empty() {
217                "bundled".into()
218            } else {
219                bundled.name.clone()
220            },
221            verified: true,
222            first_party: true,
223            key_id: None,
224            error: None,
225            fetched_at: None,
226            entry_count: bundled.entries.len(),
227        });
228        for e in &bundled.entries {
229            if seen.insert(e.id.clone()) {
230                entries.push(make_view(
231                    e.clone(),
232                    "bundled",
233                    true,
234                    &compiled_ids,
235                    &installed,
236                ));
237            }
238        }
239
240        // 2) remote sources (entries only when verified / allowed; fail-closed otherwise).
241        if self.enabled {
242            let guard = self.remote.read().unwrap();
243            for cs in guard.iter() {
244                sources.push(RegistrySourceView {
245                    source: cs.url.clone(),
246                    name: cs.name.clone(),
247                    verified: cs.verified,
248                    first_party: cs.first_party,
249                    key_id: cs.key_id.clone(),
250                    error: cs.error.clone(),
251                    fetched_at: cs.fetched_at,
252                    entry_count: cs.entries.len(),
253                });
254                for e in &cs.entries {
255                    if seen.insert(e.id.clone()) {
256                        entries.push(make_view(
257                            e.clone(),
258                            &cs.url,
259                            cs.verified,
260                            &compiled_ids,
261                            &installed,
262                        ));
263                    }
264                }
265            }
266        }
267
268        // 3) bring-your-own: installed sidecars present in no catalog -> Import shelf.
269        for reg in registrations {
270            if seen.insert(reg.id.clone()) {
271                let e = CatalogEntry {
272                    id: reg.id.clone(),
273                    name: reg.name.clone(),
274                    publisher: reg.publisher.clone(),
275                    kind: crate::modules::ModuleKind::Imported,
276                    summary: if reg.description.is_empty() {
277                        "Self-installed sidecar plugin.".into()
278                    } else {
279                        reg.description.clone()
280                    },
281                    description: None,
282                    version: Some(reg.version.clone()),
283                    icon: reg.nav.0.first().map(|n| n.icon.clone()),
284                    homepage: None,
285                    categories: Vec::new(),
286                    install: InstallSpec::Sidecar {
287                        image: None,
288                        default_base_url: reg.base_url.clone(),
289                        subscribes: reg.subscribes.0.clone(),
290                        role: Some(reg.role.clone()),
291                        nav: reg.nav.0.clone(),
292                        docs: None,
293                    },
294                };
295                // BYO sidecars have no signed catalog listing -> never "verified".
296                entries.push(make_view(e, "local", false, &compiled_ids, &installed));
297            }
298        }
299
300        // 4) headless plugins loaded from disk (e.g. Wasm DetectionConsumers): not catalog entries and
301        // not store-installable in v1, but surfaced so the operator sees what's running. State=Loaded,
302        // mount=Headless (the dashboard shows a compute treatment, no Open/Install).
303        for m in modules.iter().filter(|m| m.mount == MountKind::Headless) {
304            if seen.insert(m.id.clone()) {
305                let e = CatalogEntry {
306                    id: m.id.clone(),
307                    name: m.name.clone(),
308                    publisher: m.publisher.clone(),
309                    kind: m.kind,
310                    summary: m.description.clone(),
311                    description: None,
312                    version: Some(m.version.clone()).filter(|v| !v.is_empty()),
313                    icon: m.nav.first().map(|n| n.icon.clone()),
314                    homepage: None,
315                    categories: vec!["headless".into()],
316                    install: InstallSpec::Builtin {
317                        availability: Some("loaded".into()),
318                        contact: None,
319                    },
320                };
321                entries.push(RegistryEntryView {
322                    shelf: Shelf::from(e.kind),
323                    state: EntryState::Loaded,
324                    verified: false,
325                    source: "local".into(),
326                    mount: Some(MountKind::Headless),
327                    entry: e,
328                });
329            }
330        }
331
332        RegistryView {
333            enabled: self.enabled,
334            sources,
335            entries,
336        }
337    }
338}
339
340/// Compute an entry's shelf + state + render it.
341fn make_view(
342    entry: CatalogEntry,
343    source: &str,
344    verified: bool,
345    compiled_ids: &HashSet<&str>,
346    installed: &std::collections::HashMap<&str, &str>,
347) -> RegistryEntryView {
348    let shelf = Shelf::from(entry.kind);
349    let state = match &entry.install {
350        InstallSpec::Builtin { .. } => {
351            if compiled_ids.contains(entry.id.as_str()) {
352                EntryState::Included
353            } else {
354                EntryState::NotInBuild
355            }
356        }
357        InstallSpec::Sidecar { .. } => match installed.get(entry.id.as_str()) {
358            Some(&"unreachable") => EntryState::Unreachable,
359            Some(_) => EntryState::Installed,
360            None => EntryState::Available,
361        },
362    };
363    RegistryEntryView {
364        entry,
365        shelf,
366        state,
367        verified,
368        source: source.to_string(),
369        mount: None,
370    }
371}
372
373/// SSRF guard for an admin-configured registry URL: scheme allowlist + literal-IP rejection of
374/// loopback/private/link-local (unless `allow_private`). Hostname→IP resolution rebinding is out of
375/// scope for v1 (admin-trusted URLs, redirects disabled); documented in the registry guide.
376fn validate_registry_url(url: &str, allow_private: bool) -> Result<(), String> {
377    let parsed = reqwest::Url::parse(url).map_err(|e| format!("bad registry url: {e}"))?;
378    match parsed.scheme() {
379        "https" => {}
380        "http" if allow_private => {}
381        "http" => {
382            return Err(
383                "registry url must be https (set HELDAR_REGISTRY_ALLOW_PRIVATE for http)".into(),
384            )
385        }
386        s => return Err(format!("unsupported registry url scheme `{s}`")),
387    }
388    if allow_private {
389        return Ok(());
390    }
391    let Some(host) = parsed.host_str() else {
392        return Err("registry url has no host".into());
393    };
394    // `url` returns IPv6 literals bracketed (e.g. `[::1]`); strip the brackets before parsing so the
395    // literal-IP guard actually fires for the v6 family. A hostname that resolves to a private IP (DNS
396    // rebinding) is out of scope for v1 (admin-trusted URLs, redirects disabled); see the registry doc.
397    let host_ip = host
398        .strip_prefix('[')
399        .and_then(|h| h.strip_suffix(']'))
400        .unwrap_or(host);
401    if let Ok(ip) = host_ip.parse::<IpAddr>() {
402        return reject_private(ip);
403    }
404    Ok(())
405}
406
407/// Reject loopback/private/link-local/unspecified literals. v4-mapped/compat v6 (`::ffff:127.0.0.1`)
408/// is canonicalized to v4 first so it can't smuggle a private v4 past the v6 arm; native v6 also
409/// rejects unique-local (`fc00::/7`) and link-local (`fe80::/10`).
410fn reject_private(ip: IpAddr) -> Result<(), String> {
411    let ip = match ip {
412        IpAddr::V6(v6) => v6
413            .to_ipv4_mapped()
414            .map(IpAddr::V4)
415            .unwrap_or(IpAddr::V6(v6)),
416        v4 => v4,
417    };
418    let bad = match ip {
419        IpAddr::V4(v4) => {
420            v4.is_loopback()
421                || v4.is_private()
422                || v4.is_link_local()
423                || v4.is_unspecified()
424                || v4.is_broadcast()
425        }
426        IpAddr::V6(v6) => {
427            v6.is_loopback()
428                || v6.is_unspecified()
429                || v6.is_unique_local()
430                || v6.is_unicast_link_local()
431        }
432    };
433    if bad {
434        Err(format!("registry url resolves to a non-public address ({ip}); set HELDAR_REGISTRY_ALLOW_PRIVATE to allow"))
435    } else {
436        Ok(())
437    }
438}
439
440/// Background loop: refresh remote registries on the configured cadence. No-op (parks) when the
441/// registry is disabled or no URLs are configured, so it never tight-loops.
442pub async fn run(svc: std::sync::Arc<CatalogService>) {
443    if !svc.enabled || svc.urls.is_empty() {
444        std::future::pending::<()>().await;
445    }
446    // `interval`'s first tick fires immediately, so the loop's first iteration is the initial fetch —
447    // no separate pre-loop refresh (which would double-fetch on startup).
448    let mut tick = tokio::time::interval(Duration::from_secs(svc.refresh_s));
449    loop {
450        tick.tick().await;
451        svc.refresh().await;
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::validate_registry_url;
458
459    fn rejected(url: &str) -> bool {
460        validate_registry_url(url, false).is_err()
461    }
462
463    #[test]
464    fn rejects_literal_private_and_loopback_ips() {
465        // IPv4 literals (incl. obfuscated forms the url crate normalizes).
466        assert!(rejected("https://127.0.0.1/c.json"));
467        assert!(rejected("https://10.0.0.5/c.json"));
468        assert!(rejected("https://169.254.169.254/c.json")); // cloud metadata
469        assert!(rejected("https://2130706433/c.json")); // decimal 127.0.0.1
470        assert!(rejected("https://0.0.0.0/c.json"));
471        // IPv6 literals — the family that used to bypass the guard entirely.
472        assert!(rejected("https://[::1]/c.json")); // loopback
473        assert!(rejected("https://[fd00::1]/c.json")); // unique-local
474        assert!(rejected("https://[fe80::1]/c.json")); // link-local
475        assert!(rejected("https://[::ffff:127.0.0.1]/c.json")); // v4-mapped loopback
476        assert!(rejected("https://[::ffff:169.254.169.254]/c.json")); // v4-mapped metadata
477        assert!(rejected("https://[::]/c.json")); // unspecified
478    }
479
480    #[test]
481    fn allows_public_hosts_and_ips() {
482        assert!(validate_registry_url("https://registry.example.com/c.json", false).is_ok());
483        assert!(validate_registry_url("https://8.8.8.8/c.json", false).is_ok());
484        assert!(validate_registry_url("https://[2606:4700::1111]/c.json", false).is_ok());
485    }
486
487    #[test]
488    fn http_requires_allow_private_and_then_anything_passes() {
489        assert!(rejected("http://registry.example.com/c.json")); // http blocked by default
490        assert!(validate_registry_url("http://127.0.0.1:9400/c.json", true).is_ok());
491        // local dev
492    }
493}