Skip to main content

greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18#[cfg(feature = "telemetry")]
19use greentic_config_types::TelemetryExporterKind;
20use greentic_config_types::{
21    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
22};
23#[cfg(feature = "telemetry")]
24use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
25use runner_core::env::PackConfig;
26use serde_json::json;
27use tokio::signal;
28
29pub mod boot;
30pub mod cache;
31pub mod component_api;
32pub mod config;
33pub mod engine;
34pub mod fault;
35pub mod gtbind;
36pub mod http;
37pub mod ingress;
38pub mod operator_metrics;
39pub mod operator_registry;
40pub mod pack;
41pub mod provider;
42pub mod provider_core;
43pub mod provider_core_only;
44pub mod routing;
45pub mod runner;
46pub mod runtime;
47pub mod runtime_wasmtime;
48pub mod secrets;
49pub mod storage;
50pub mod telemetry;
51#[cfg(feature = "fault-injection")]
52pub mod testing;
53pub mod trace;
54pub mod validate;
55pub mod verify;
56pub mod wasi;
57pub mod watcher;
58
59mod activity;
60mod host;
61pub mod oauth;
62
63pub use activity::{Activity, ActivityKind};
64pub use config::HostConfig;
65pub use gtbind::{PackBinding, TenantBindings};
66pub use host::TelemetryCfg;
67pub use host::{HostBuilder, RunnerHost, TenantHandle};
68pub use wasi::{PreopenSpec, RunnerWasiPolicy};
69
70pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
71
72pub use http::auth::AdminAuth;
73pub use routing::RoutingConfig;
74use routing::TenantRouting;
75pub use runner::HostServer;
76
77/// User-facing configuration for running the unified host.
78#[derive(Clone)]
79pub struct RunnerConfig {
80    pub tenant_bindings: HashMap<String, TenantBindings>,
81    pub pack: PackConfig,
82    pub port: u16,
83    pub refresh_interval: Duration,
84    pub routing: RoutingConfig,
85    pub admin: AdminAuth,
86    pub telemetry: Option<TelemetryCfg>,
87    pub secrets_backend: SecretsBackend,
88    pub wasi_policy: RunnerWasiPolicy,
89    pub resolved_config: ResolvedConfig,
90    pub trace: trace::TraceConfig,
91    pub validation: validate::ValidationConfig,
92}
93
94impl RunnerConfig {
95    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
96    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
97        if bindings.is_empty() {
98            anyhow::bail!("at least one gtbind file is required");
99        }
100        let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
101        if tenant_bindings.is_empty() {
102            anyhow::bail!("no gtbind files loaded");
103        }
104        let mut pack = pack_config_from(
105            &resolved_config.config.packs,
106            &resolved_config.config.paths,
107            &resolved_config.config.network,
108        )?;
109        maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
110        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
111        let port = std::env::var("PORT")
112            .ok()
113            .and_then(|value| value.parse().ok())
114            .unwrap_or(8080);
115        let default_tenant = resolved_config
116            .config
117            .dev
118            .as_ref()
119            .map(|dev| dev.default_tenant.clone())
120            .unwrap_or_else(|| "demo".into());
121        let routing = RoutingConfig::from_env_with_default(default_tenant);
122        let paths = &resolved_config.config.paths;
123        ensure_paths_exist(paths)?;
124        let mut wasi_policy = default_wasi_policy(paths);
125        let mut env_allow = HashSet::new();
126        for binding in tenant_bindings.values() {
127            env_allow.extend(binding.env_passthrough.iter().cloned());
128        }
129        for key in env_allow {
130            wasi_policy = wasi_policy.allow_env(key);
131        }
132
133        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
134            s.events
135                .as_ref()
136                .and_then(|svc| svc.headers.as_ref())
137                .and_then(|headers| headers.get("x-admin-token").cloned())
138        }));
139        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
140        Ok(Self {
141            tenant_bindings,
142            pack,
143            port,
144            refresh_interval: refresh,
145            routing,
146            admin,
147            telemetry: telemetry_from(&resolved_config.config.telemetry),
148            secrets_backend,
149            wasi_policy,
150            resolved_config,
151            trace: trace::TraceConfig::from_env(),
152            validation: validate::ValidationConfig::from_env(),
153        })
154    }
155
156    /// Override the HTTP port used by the host server.
157    pub fn with_port(mut self, port: u16) -> Self {
158        self.port = port;
159        self
160    }
161
162    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
163        self.wasi_policy = policy;
164        self
165    }
166}
167
168fn maybe_write_gtbind_index(
169    tenant_bindings: &HashMap<String, TenantBindings>,
170    paths: &PathsConfig,
171    pack: &mut PackConfig,
172) -> Result<()> {
173    let mut uses_locators = false;
174    for binding in tenant_bindings.values() {
175        for pack_binding in &binding.packs {
176            if pack_binding.pack_locator.is_some() {
177                uses_locators = true;
178            }
179        }
180    }
181    if !uses_locators {
182        return Ok(());
183    }
184
185    let mut entries = serde_json::Map::new();
186    for binding in tenant_bindings.values() {
187        let mut packs = Vec::new();
188        for pack_binding in &binding.packs {
189            let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
190                anyhow::anyhow!(
191                    "gtbind {} missing pack_locator for pack {}",
192                    binding.tenant,
193                    pack_binding.pack_id
194                )
195            })?;
196            let (name, version_or_digest) =
197                pack_binding.pack_ref.split_once('@').ok_or_else(|| {
198                    anyhow::anyhow!(
199                        "gtbind {} invalid pack_ref {} (expected name@version)",
200                        binding.tenant,
201                        pack_binding.pack_ref
202                    )
203                })?;
204            if name != pack_binding.pack_id {
205                anyhow::bail!(
206                    "gtbind {} pack_ref {} does not match pack_id {}",
207                    binding.tenant,
208                    pack_binding.pack_ref,
209                    pack_binding.pack_id
210                );
211            }
212            let mut entry = serde_json::Map::new();
213            entry.insert("name".to_string(), json!(name));
214            if version_or_digest.contains(':') {
215                entry.insert("digest".to_string(), json!(version_or_digest));
216            } else {
217                entry.insert("version".to_string(), json!(version_or_digest));
218            }
219            entry.insert("locator".to_string(), json!(locator));
220            packs.push(serde_json::Value::Object(entry));
221        }
222        let main_pack = packs
223            .first()
224            .cloned()
225            .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
226        let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
227        entries.insert(
228            binding.tenant.clone(),
229            json!({
230                "main_pack": main_pack,
231                "overlays": overlays,
232            }),
233        );
234    }
235
236    let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
237    if let Some(parent) = index_path.parent() {
238        fs::create_dir_all(parent)
239            .with_context(|| format!("failed to create {}", parent.display()))?;
240    }
241    let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
242    fs::write(&index_path, serialized)
243        .with_context(|| format!("failed to write {}", index_path.display()))?;
244    pack.index_location = runner_core::env::IndexLocation::File(index_path);
245    Ok(())
246}
247
248fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
249    let raw = value.unwrap_or_else(|| "30s".into());
250    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
251}
252
253fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
254    let mut policy = RunnerWasiPolicy::default()
255        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
256        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
257        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
258        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
259    policy = policy
260        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
261        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
262        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
263    policy
264}
265
266fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
267    for dir in [
268        &paths.greentic_root,
269        &paths.state_dir,
270        &paths.cache_dir,
271        &paths.logs_dir,
272    ] {
273        fs::create_dir_all(dir)
274            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
275    }
276    Ok(())
277}
278
279fn pack_config_from(
280    packs: &Option<PacksConfig>,
281    paths: &PathsConfig,
282    network: &NetworkConfig,
283) -> Result<PackConfig> {
284    if let Some(cfg) = packs {
285        let cache_dir = cfg.cache_dir.clone();
286        let index_location = match &cfg.source {
287            PackSourceConfig::LocalIndex { path } => {
288                runner_core::env::IndexLocation::File(path.clone())
289            }
290            PackSourceConfig::HttpIndex { url } => {
291                runner_core::env::IndexLocation::from_value(url)?
292            }
293            PackSourceConfig::OciRegistry { reference } => {
294                runner_core::env::IndexLocation::from_value(reference)?
295            }
296        };
297        let public_key = cfg
298            .trust
299            .as_ref()
300            .and_then(|trust| trust.public_keys.first().cloned());
301        return Ok(PackConfig {
302            source: runner_core::env::PackSource::Fs,
303            index_location,
304            cache_dir,
305            public_key,
306            network: Some(network.clone()),
307        });
308    }
309    let mut cfg = PackConfig::default_for_paths(paths)?;
310    cfg.network = Some(network.clone());
311    Ok(cfg)
312}
313
314#[cfg(feature = "telemetry")]
315fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
316    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
317        return None;
318    }
319    let mut export = TelemetryExportConfig::json_default();
320    export.mode = match cfg.exporter {
321        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
322        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
323        TelemetryExporterKind::None => return None,
324    };
325    export.endpoint = cfg.endpoint.clone();
326    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
327    Some(TelemetryCfg {
328        config: greentic_telemetry::TelemetryConfig {
329            service_name: "greentic-runner".into(),
330        },
331        export,
332    })
333}
334
335#[cfg(not(feature = "telemetry"))]
336fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
337    None
338}
339
340/// Run the unified Greentic runner host until shutdown.
341pub async fn run(cfg: RunnerConfig) -> Result<()> {
342    let RunnerConfig {
343        tenant_bindings,
344        pack,
345        port,
346        refresh_interval,
347        routing,
348        admin,
349        telemetry,
350        secrets_backend,
351        wasi_policy,
352        resolved_config: _resolved_config,
353        trace,
354        validation,
355    } = cfg;
356    #[cfg(not(feature = "telemetry"))]
357    let _ = telemetry;
358
359    let mut builder = HostBuilder::new();
360    for bindings in tenant_bindings.into_values() {
361        let mut host_config = HostConfig::from_gtbind(bindings);
362        host_config.trace = trace.clone();
363        host_config.validation = validation.clone();
364        builder = builder.with_config(host_config);
365    }
366    #[cfg(feature = "telemetry")]
367    if let Some(telemetry) = telemetry.clone() {
368        builder = builder.with_telemetry(telemetry);
369    }
370    builder = builder
371        .with_wasi_policy(wasi_policy.clone())
372        .with_secrets_manager(
373            secrets_backend
374                .build_manager()
375                .context("failed to initialise secrets backend")?,
376        );
377
378    let host = Arc::new(builder.build()?);
379    host.start().await?;
380
381    let (watcher, reload_handle) =
382        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
383
384    let routing = TenantRouting::new(routing.clone());
385    let server = HostServer::new(
386        port,
387        host.active_packs(),
388        routing,
389        host.health_state(),
390        Some(reload_handle),
391        admin.clone(),
392    )?;
393
394    tokio::select! {
395        result = server.serve() => {
396            result?;
397        }
398        _ = signal::ctrl_c() => {
399            tracing::info!("received shutdown signal");
400        }
401    }
402
403    drop(watcher);
404    host.stop().await?;
405    Ok(())
406}