1use std::collections::HashSet;
12use std::net::IpAddr;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17
18use crate::config::Config;
19use crate::modules::{ModuleManifest, ModuleRegistration, MountKind};
20use crate::registry::{
21 bundled_catalog, verify_detached, CatalogDoc, CatalogEntry, EntryState, InstallSpec, Keyset,
22 RegistryEntryView, RegistrySourceView, RegistryView, Shelf, SignatureDoc,
23};
24
25const MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
27
28struct CachedSource {
30 url: String,
31 name: String,
32 verified: bool,
33 first_party: bool,
34 key_id: Option<String>,
35 error: Option<String>,
36 fetched_at: Option<DateTime<Utc>>,
37 entries: Vec<CatalogEntry>,
38}
39
40pub struct CatalogService {
42 enabled: bool,
43 urls: Vec<String>,
44 refresh_s: u64,
45 allow_unverified: bool,
46 allow_private: bool,
47 keyset: Keyset,
48 client: reqwest::Client,
49 remote: RwLock<Vec<CachedSource>>,
50}
51
52impl CatalogService {
53 pub fn new(cfg: &Config) -> Self {
54 let client = reqwest::Client::builder()
55 .timeout(Duration::from_secs(cfg.registry_fetch_timeout_s.max(1)))
56 .redirect(reqwest::redirect::Policy::none())
57 .build()
58 .unwrap_or_default();
59 let remote = cfg
60 .registry_urls
61 .iter()
62 .map(|u| CachedSource {
63 url: u.clone(),
64 name: u.clone(),
65 verified: false,
66 first_party: false,
67 key_id: None,
68 error: Some("not yet fetched".into()),
69 fetched_at: None,
70 entries: Vec::new(),
71 })
72 .collect();
73 Self {
74 enabled: cfg.registry_enabled,
75 urls: cfg.registry_urls.clone(),
76 refresh_s: cfg.registry_refresh_s.max(30),
77 allow_unverified: cfg.registry_allow_unverified,
78 allow_private: cfg.registry_allow_private,
79 keyset: Keyset::load(&cfg.registry_trusted_keys),
80 client,
81 remote: RwLock::new(remote),
82 }
83 }
84
85 pub async fn refresh(&self) {
88 if !self.enabled {
89 return;
90 }
91 for url in &self.urls {
92 let result = self.fetch_one(url).await;
93 let mut guard = self.remote.write().unwrap();
94 if let Some(slot) = guard.iter_mut().find(|s| &s.url == url) {
95 match result {
96 Ok(fresh) => *slot = fresh,
97 Err(e) => {
98 slot.error = Some(e);
99 slot.fetched_at = Some(Utc::now());
100 }
102 }
103 }
104 }
105 }
106
107 async fn fetch_one(&self, url: &str) -> Result<CachedSource, String> {
108 validate_registry_url(url, self.allow_private)?;
109 let raw = self.get_capped(url).await?;
110 let sig_raw = self.get_capped(&format!("{url}.sig")).await.ok();
111
112 let doc: CatalogDoc =
113 serde_json::from_slice(&raw).map_err(|e| format!("catalog parse: {e}"))?;
114 if doc.format != "heldar-catalog/v1" {
115 return Err(format!("unsupported catalog format `{}`", doc.format));
116 }
117
118 let verification = match sig_raw
120 .as_deref()
121 .and_then(|b| serde_json::from_slice::<SignatureDoc>(b).ok())
122 {
123 Some(sig) => verify_detached(&raw, &sig, &self.keyset, doc.expires_at, Utc::now()),
124 None => crate::registry::Verification {
125 verified: false,
126 key_id: None,
127 publisher: None,
128 first_party: false,
129 reason: Some("no_signature".into()),
130 },
131 };
132
133 let entries = if verification.verified || self.allow_unverified {
135 doc.entries
136 } else {
137 Vec::new()
138 };
139 let error = if verification.verified {
140 None
141 } else {
142 Some(format!(
143 "unverified ({})",
144 verification.reason.as_deref().unwrap_or("unknown")
145 ))
146 };
147 Ok(CachedSource {
148 url: url.to_string(),
149 name: if doc.name.is_empty() {
150 url.to_string()
151 } else {
152 doc.name
153 },
154 verified: verification.verified,
155 first_party: verification.first_party,
156 key_id: verification.key_id,
157 error,
158 fetched_at: Some(Utc::now()),
159 entries,
160 })
161 }
162
163 async fn get_capped(&self, url: &str) -> Result<Vec<u8>, String> {
167 let mut resp = self
168 .client
169 .get(url)
170 .send()
171 .await
172 .map_err(|e| format!("fetch {url}: {e}"))?;
173 if !resp.status().is_success() {
174 return Err(format!("fetch {url}: HTTP {}", resp.status()));
175 }
176 if let Some(len) = resp.content_length() {
177 if len as usize > MAX_BODY_BYTES {
178 return Err(format!("fetch {url}: body too large ({len} bytes)"));
179 }
180 }
181 let mut buf: Vec<u8> = Vec::new();
182 while let Some(chunk) = resp.chunk().await.map_err(|e| format!("read {url}: {e}"))? {
183 if buf.len() + chunk.len() > MAX_BODY_BYTES {
184 return Err(format!("fetch {url}: body exceeds {MAX_BODY_BYTES} bytes"));
185 }
186 buf.extend_from_slice(&chunk);
187 }
188 Ok(buf)
189 }
190
191 pub fn view(
194 &self,
195 modules: &[ModuleManifest],
196 registrations: &[ModuleRegistration],
197 ) -> RegistryView {
198 let compiled_ids: HashSet<&str> = modules
199 .iter()
200 .filter(|m| m.mount == MountKind::Bundled)
201 .map(|m| m.id.as_str())
202 .collect();
203 let installed: std::collections::HashMap<&str, &str> = registrations
204 .iter()
205 .map(|r| (r.id.as_str(), r.health.as_str()))
206 .collect();
207
208 let mut seen: HashSet<String> = HashSet::new();
209 let mut entries: Vec<RegistryEntryView> = Vec::new();
210 let mut sources: Vec<RegistrySourceView> = Vec::new();
211
212 let bundled = bundled_catalog();
214 sources.push(RegistrySourceView {
215 source: "bundled".into(),
216 name: if bundled.name.is_empty() {
217 "bundled".into()
218 } else {
219 bundled.name.clone()
220 },
221 verified: true,
222 first_party: true,
223 key_id: None,
224 error: None,
225 fetched_at: None,
226 entry_count: bundled.entries.len(),
227 });
228 for e in &bundled.entries {
229 if seen.insert(e.id.clone()) {
230 entries.push(make_view(
231 e.clone(),
232 "bundled",
233 true,
234 &compiled_ids,
235 &installed,
236 ));
237 }
238 }
239
240 if self.enabled {
242 let guard = self.remote.read().unwrap();
243 for cs in guard.iter() {
244 sources.push(RegistrySourceView {
245 source: cs.url.clone(),
246 name: cs.name.clone(),
247 verified: cs.verified,
248 first_party: cs.first_party,
249 key_id: cs.key_id.clone(),
250 error: cs.error.clone(),
251 fetched_at: cs.fetched_at,
252 entry_count: cs.entries.len(),
253 });
254 for e in &cs.entries {
255 if seen.insert(e.id.clone()) {
256 entries.push(make_view(
257 e.clone(),
258 &cs.url,
259 cs.verified,
260 &compiled_ids,
261 &installed,
262 ));
263 }
264 }
265 }
266 }
267
268 for reg in registrations {
270 if seen.insert(reg.id.clone()) {
271 let e = CatalogEntry {
272 id: reg.id.clone(),
273 name: reg.name.clone(),
274 publisher: reg.publisher.clone(),
275 kind: crate::modules::ModuleKind::Imported,
276 summary: if reg.description.is_empty() {
277 "Self-installed sidecar plugin.".into()
278 } else {
279 reg.description.clone()
280 },
281 description: None,
282 version: Some(reg.version.clone()),
283 icon: reg.nav.0.first().map(|n| n.icon.clone()),
284 homepage: None,
285 categories: Vec::new(),
286 install: InstallSpec::Sidecar {
287 image: None,
288 default_base_url: reg.base_url.clone(),
289 subscribes: reg.subscribes.0.clone(),
290 role: Some(reg.role.clone()),
291 nav: reg.nav.0.clone(),
292 docs: None,
293 },
294 };
295 entries.push(make_view(e, "local", false, &compiled_ids, &installed));
297 }
298 }
299
300 for m in modules.iter().filter(|m| m.mount == MountKind::Headless) {
304 if seen.insert(m.id.clone()) {
305 let e = CatalogEntry {
306 id: m.id.clone(),
307 name: m.name.clone(),
308 publisher: m.publisher.clone(),
309 kind: m.kind,
310 summary: m.description.clone(),
311 description: None,
312 version: Some(m.version.clone()).filter(|v| !v.is_empty()),
313 icon: m.nav.first().map(|n| n.icon.clone()),
314 homepage: None,
315 categories: vec!["headless".into()],
316 install: InstallSpec::Builtin {
317 availability: Some("loaded".into()),
318 contact: None,
319 },
320 };
321 entries.push(RegistryEntryView {
322 shelf: Shelf::from(e.kind),
323 state: EntryState::Loaded,
324 verified: false,
325 source: "local".into(),
326 mount: Some(MountKind::Headless),
327 entry: e,
328 });
329 }
330 }
331
332 RegistryView {
333 enabled: self.enabled,
334 sources,
335 entries,
336 }
337 }
338}
339
340fn make_view(
342 entry: CatalogEntry,
343 source: &str,
344 verified: bool,
345 compiled_ids: &HashSet<&str>,
346 installed: &std::collections::HashMap<&str, &str>,
347) -> RegistryEntryView {
348 let shelf = Shelf::from(entry.kind);
349 let state = match &entry.install {
350 InstallSpec::Builtin { .. } => {
351 if compiled_ids.contains(entry.id.as_str()) {
352 EntryState::Included
353 } else {
354 EntryState::NotInBuild
355 }
356 }
357 InstallSpec::Sidecar { .. } => match installed.get(entry.id.as_str()) {
358 Some(&"unreachable") => EntryState::Unreachable,
359 Some(_) => EntryState::Installed,
360 None => EntryState::Available,
361 },
362 };
363 RegistryEntryView {
364 entry,
365 shelf,
366 state,
367 verified,
368 source: source.to_string(),
369 mount: None,
370 }
371}
372
373fn validate_registry_url(url: &str, allow_private: bool) -> Result<(), String> {
377 let parsed = reqwest::Url::parse(url).map_err(|e| format!("bad registry url: {e}"))?;
378 match parsed.scheme() {
379 "https" => {}
380 "http" if allow_private => {}
381 "http" => {
382 return Err(
383 "registry url must be https (set HELDAR_REGISTRY_ALLOW_PRIVATE for http)".into(),
384 )
385 }
386 s => return Err(format!("unsupported registry url scheme `{s}`")),
387 }
388 if allow_private {
389 return Ok(());
390 }
391 let Some(host) = parsed.host_str() else {
392 return Err("registry url has no host".into());
393 };
394 let host_ip = host
398 .strip_prefix('[')
399 .and_then(|h| h.strip_suffix(']'))
400 .unwrap_or(host);
401 if let Ok(ip) = host_ip.parse::<IpAddr>() {
402 return reject_private(ip);
403 }
404 Ok(())
405}
406
407fn reject_private(ip: IpAddr) -> Result<(), String> {
411 let ip = match ip {
412 IpAddr::V6(v6) => v6
413 .to_ipv4_mapped()
414 .map(IpAddr::V4)
415 .unwrap_or(IpAddr::V6(v6)),
416 v4 => v4,
417 };
418 let bad = match ip {
419 IpAddr::V4(v4) => {
420 v4.is_loopback()
421 || v4.is_private()
422 || v4.is_link_local()
423 || v4.is_unspecified()
424 || v4.is_broadcast()
425 }
426 IpAddr::V6(v6) => {
427 v6.is_loopback()
428 || v6.is_unspecified()
429 || v6.is_unique_local()
430 || v6.is_unicast_link_local()
431 }
432 };
433 if bad {
434 Err(format!("registry url resolves to a non-public address ({ip}); set HELDAR_REGISTRY_ALLOW_PRIVATE to allow"))
435 } else {
436 Ok(())
437 }
438}
439
440pub async fn run(svc: std::sync::Arc<CatalogService>) {
443 if !svc.enabled || svc.urls.is_empty() {
444 std::future::pending::<()>().await;
445 }
446 let mut tick = tokio::time::interval(Duration::from_secs(svc.refresh_s));
449 loop {
450 tick.tick().await;
451 svc.refresh().await;
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::validate_registry_url;
458
459 fn rejected(url: &str) -> bool {
460 validate_registry_url(url, false).is_err()
461 }
462
463 #[test]
464 fn rejects_literal_private_and_loopback_ips() {
465 assert!(rejected("https://127.0.0.1/c.json"));
467 assert!(rejected("https://10.0.0.5/c.json"));
468 assert!(rejected("https://169.254.169.254/c.json")); assert!(rejected("https://2130706433/c.json")); assert!(rejected("https://0.0.0.0/c.json"));
471 assert!(rejected("https://[::1]/c.json")); assert!(rejected("https://[fd00::1]/c.json")); assert!(rejected("https://[fe80::1]/c.json")); assert!(rejected("https://[::ffff:127.0.0.1]/c.json")); assert!(rejected("https://[::ffff:169.254.169.254]/c.json")); assert!(rejected("https://[::]/c.json")); }
479
480 #[test]
481 fn allows_public_hosts_and_ips() {
482 assert!(validate_registry_url("https://registry.example.com/c.json", false).is_ok());
483 assert!(validate_registry_url("https://8.8.8.8/c.json", false).is_ok());
484 assert!(validate_registry_url("https://[2606:4700::1111]/c.json", false).is_ok());
485 }
486
487 #[test]
488 fn http_requires_allow_private_and_then_anything_passes() {
489 assert!(rejected("http://registry.example.com/c.json")); assert!(validate_registry_url("http://127.0.0.1:9400/c.json", true).is_ok());
491 }
493}