greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18#[cfg(feature = "telemetry")]
19use greentic_config_types::TelemetryExporterKind;
20use greentic_config_types::{
21    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
22};
23#[cfg(feature = "telemetry")]
24use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
25use runner_core::env::PackConfig;
26use serde_json::json;
27use tokio::signal;
28
29pub mod boot;
30pub mod cache;
31pub mod component_api;
32pub mod config;
33pub mod engine;
34pub mod fault;
35pub mod gtbind;
36pub mod http;
37pub mod ingress;
38pub mod pack;
39pub mod provider;
40pub mod provider_core;
41pub mod provider_core_only;
42pub mod routing;
43pub mod runner;
44pub mod runtime;
45pub mod runtime_wasmtime;
46pub mod secrets;
47pub mod storage;
48pub mod telemetry;
49pub mod trace;
50pub mod validate;
51pub mod verify;
52pub mod wasi;
53pub mod watcher;
54
55mod activity;
56mod host;
57pub mod oauth;
58
59pub use activity::{Activity, ActivityKind};
60pub use config::HostConfig;
61pub use gtbind::{PackBinding, TenantBindings};
62pub use host::TelemetryCfg;
63pub use host::{HostBuilder, RunnerHost, TenantHandle};
64pub use wasi::{PreopenSpec, RunnerWasiPolicy};
65
66pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
67
68pub use http::auth::AdminAuth;
69pub use routing::RoutingConfig;
70use routing::TenantRouting;
71pub use runner::HostServer;
72
73/// User-facing configuration for running the unified host.
74#[derive(Clone)]
75pub struct RunnerConfig {
76    pub tenant_bindings: HashMap<String, TenantBindings>,
77    pub pack: PackConfig,
78    pub port: u16,
79    pub refresh_interval: Duration,
80    pub routing: RoutingConfig,
81    pub admin: AdminAuth,
82    pub telemetry: Option<TelemetryCfg>,
83    pub secrets_backend: SecretsBackend,
84    pub wasi_policy: RunnerWasiPolicy,
85    pub resolved_config: ResolvedConfig,
86    pub trace: trace::TraceConfig,
87    pub validation: validate::ValidationConfig,
88}
89
90impl RunnerConfig {
91    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
92    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
93        if bindings.is_empty() {
94            anyhow::bail!("at least one gtbind file is required");
95        }
96        let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
97        if tenant_bindings.is_empty() {
98            anyhow::bail!("no gtbind files loaded");
99        }
100        let mut pack = pack_config_from(
101            &resolved_config.config.packs,
102            &resolved_config.config.paths,
103            &resolved_config.config.network,
104        )?;
105        maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
106        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
107        let port = std::env::var("PORT")
108            .ok()
109            .and_then(|value| value.parse().ok())
110            .unwrap_or(8080);
111        let default_tenant = resolved_config
112            .config
113            .dev
114            .as_ref()
115            .map(|dev| dev.default_tenant.clone())
116            .unwrap_or_else(|| "demo".into());
117        let routing = RoutingConfig::from_env_with_default(default_tenant);
118        let paths = &resolved_config.config.paths;
119        ensure_paths_exist(paths)?;
120        let mut wasi_policy = default_wasi_policy(paths);
121        let mut env_allow = HashSet::new();
122        for binding in tenant_bindings.values() {
123            env_allow.extend(binding.env_passthrough.iter().cloned());
124        }
125        for key in env_allow {
126            wasi_policy = wasi_policy.allow_env(key);
127        }
128
129        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
130            s.events
131                .as_ref()
132                .and_then(|svc| svc.headers.as_ref())
133                .and_then(|headers| headers.get("x-admin-token").cloned())
134        }));
135        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
136        Ok(Self {
137            tenant_bindings,
138            pack,
139            port,
140            refresh_interval: refresh,
141            routing,
142            admin,
143            telemetry: telemetry_from(&resolved_config.config.telemetry),
144            secrets_backend,
145            wasi_policy,
146            resolved_config,
147            trace: trace::TraceConfig::from_env(),
148            validation: validate::ValidationConfig::from_env(),
149        })
150    }
151
152    /// Override the HTTP port used by the host server.
153    pub fn with_port(mut self, port: u16) -> Self {
154        self.port = port;
155        self
156    }
157
158    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
159        self.wasi_policy = policy;
160        self
161    }
162}
163
164fn maybe_write_gtbind_index(
165    tenant_bindings: &HashMap<String, TenantBindings>,
166    paths: &PathsConfig,
167    pack: &mut PackConfig,
168) -> Result<()> {
169    let mut uses_locators = false;
170    for binding in tenant_bindings.values() {
171        for pack_binding in &binding.packs {
172            if pack_binding.pack_locator.is_some() {
173                uses_locators = true;
174            }
175        }
176    }
177    if !uses_locators {
178        return Ok(());
179    }
180
181    let mut entries = serde_json::Map::new();
182    for binding in tenant_bindings.values() {
183        let mut packs = Vec::new();
184        for pack_binding in &binding.packs {
185            let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
186                anyhow::anyhow!(
187                    "gtbind {} missing pack_locator for pack {}",
188                    binding.tenant,
189                    pack_binding.pack_id
190                )
191            })?;
192            let (name, version_or_digest) =
193                pack_binding.pack_ref.split_once('@').ok_or_else(|| {
194                    anyhow::anyhow!(
195                        "gtbind {} invalid pack_ref {} (expected name@version)",
196                        binding.tenant,
197                        pack_binding.pack_ref
198                    )
199                })?;
200            if name != pack_binding.pack_id {
201                anyhow::bail!(
202                    "gtbind {} pack_ref {} does not match pack_id {}",
203                    binding.tenant,
204                    pack_binding.pack_ref,
205                    pack_binding.pack_id
206                );
207            }
208            let mut entry = serde_json::Map::new();
209            entry.insert("name".to_string(), json!(name));
210            if version_or_digest.contains(':') {
211                entry.insert("digest".to_string(), json!(version_or_digest));
212            } else {
213                entry.insert("version".to_string(), json!(version_or_digest));
214            }
215            entry.insert("locator".to_string(), json!(locator));
216            packs.push(serde_json::Value::Object(entry));
217        }
218        let main_pack = packs
219            .first()
220            .cloned()
221            .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
222        let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
223        entries.insert(
224            binding.tenant.clone(),
225            json!({
226                "main_pack": main_pack,
227                "overlays": overlays,
228            }),
229        );
230    }
231
232    let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
233    if let Some(parent) = index_path.parent() {
234        fs::create_dir_all(parent)
235            .with_context(|| format!("failed to create {}", parent.display()))?;
236    }
237    let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
238    fs::write(&index_path, serialized)
239        .with_context(|| format!("failed to write {}", index_path.display()))?;
240    pack.index_location = runner_core::env::IndexLocation::File(index_path);
241    Ok(())
242}
243
244fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
245    let raw = value.unwrap_or_else(|| "30s".into());
246    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
247}
248
249fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
250    let mut policy = RunnerWasiPolicy::default()
251        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
252        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
253        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
254        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
255    policy = policy
256        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
257        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
258        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
259    policy
260}
261
262fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
263    for dir in [
264        &paths.greentic_root,
265        &paths.state_dir,
266        &paths.cache_dir,
267        &paths.logs_dir,
268    ] {
269        fs::create_dir_all(dir)
270            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
271    }
272    Ok(())
273}
274
275fn pack_config_from(
276    packs: &Option<PacksConfig>,
277    paths: &PathsConfig,
278    network: &NetworkConfig,
279) -> Result<PackConfig> {
280    if let Some(cfg) = packs {
281        let cache_dir = cfg.cache_dir.clone();
282        let index_location = match &cfg.source {
283            PackSourceConfig::LocalIndex { path } => {
284                runner_core::env::IndexLocation::File(path.clone())
285            }
286            PackSourceConfig::HttpIndex { url } => {
287                runner_core::env::IndexLocation::from_value(url)?
288            }
289            PackSourceConfig::OciRegistry { reference } => {
290                runner_core::env::IndexLocation::from_value(reference)?
291            }
292        };
293        let public_key = cfg
294            .trust
295            .as_ref()
296            .and_then(|trust| trust.public_keys.first().cloned());
297        return Ok(PackConfig {
298            source: runner_core::env::PackSource::Fs,
299            index_location,
300            cache_dir,
301            public_key,
302            network: Some(network.clone()),
303        });
304    }
305    let mut cfg = PackConfig::default_for_paths(paths)?;
306    cfg.network = Some(network.clone());
307    Ok(cfg)
308}
309
310#[cfg(feature = "telemetry")]
311fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
312    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
313        return None;
314    }
315    let mut export = TelemetryExportConfig::json_default();
316    export.mode = match cfg.exporter {
317        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
318        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
319        TelemetryExporterKind::None => return None,
320    };
321    export.endpoint = cfg.endpoint.clone();
322    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
323    Some(TelemetryCfg {
324        config: greentic_telemetry::TelemetryConfig {
325            service_name: "greentic-runner".into(),
326        },
327        export,
328    })
329}
330
331#[cfg(not(feature = "telemetry"))]
332fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
333    None
334}
335
336/// Run the unified Greentic runner host until shutdown.
337pub async fn run(cfg: RunnerConfig) -> Result<()> {
338    let RunnerConfig {
339        tenant_bindings,
340        pack,
341        port,
342        refresh_interval,
343        routing,
344        admin,
345        telemetry,
346        secrets_backend,
347        wasi_policy,
348        resolved_config: _resolved_config,
349        trace,
350        validation,
351    } = cfg;
352    #[cfg(not(feature = "telemetry"))]
353    let _ = telemetry;
354
355    let mut builder = HostBuilder::new();
356    for bindings in tenant_bindings.into_values() {
357        let mut host_config = HostConfig::from_gtbind(bindings);
358        host_config.trace = trace.clone();
359        host_config.validation = validation.clone();
360        builder = builder.with_config(host_config);
361    }
362    #[cfg(feature = "telemetry")]
363    if let Some(telemetry) = telemetry.clone() {
364        builder = builder.with_telemetry(telemetry);
365    }
366    builder = builder
367        .with_wasi_policy(wasi_policy.clone())
368        .with_secrets_manager(
369            secrets_backend
370                .build_manager()
371                .context("failed to initialise secrets backend")?,
372        );
373
374    let host = Arc::new(builder.build()?);
375    host.start().await?;
376
377    let (watcher, reload_handle) =
378        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
379
380    let routing = TenantRouting::new(routing.clone());
381    let server = HostServer::new(
382        port,
383        host.active_packs(),
384        routing,
385        host.health_state(),
386        Some(reload_handle),
387        admin.clone(),
388    )?;
389
390    tokio::select! {
391        result = server.serve() => {
392            result?;
393        }
394        _ = signal::ctrl_c() => {
395            tracing::info!("received shutdown signal");
396        }
397    }
398
399    drop(watcher);
400    host.stop().await?;
401    Ok(())
402}