Skip to main content

greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18#[cfg(feature = "telemetry")]
19use greentic_config_types::TelemetryExporterKind;
20use greentic_config_types::{
21    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
22};
23#[cfg(feature = "telemetry")]
24use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
25use runner_core::env::PackConfig;
26use serde_json::json;
27use tokio::signal;
28
29pub mod boot;
30pub mod cache;
31pub mod component_api;
32pub mod config;
33pub mod engine;
34pub mod fault;
35pub mod gtbind;
36pub mod http;
37pub mod ingress;
38pub mod pack;
39pub mod provider;
40pub mod provider_core;
41pub mod provider_core_only;
42pub mod routing;
43pub mod runner;
44pub mod runtime;
45pub mod runtime_wasmtime;
46pub mod secrets;
47pub mod storage;
48pub mod telemetry;
49#[cfg(feature = "fault-injection")]
50pub mod testing;
51pub mod trace;
52pub mod validate;
53pub mod verify;
54pub mod wasi;
55pub mod watcher;
56
57mod activity;
58mod host;
59pub mod oauth;
60
61pub use activity::{Activity, ActivityKind};
62pub use config::HostConfig;
63pub use gtbind::{PackBinding, TenantBindings};
64pub use host::TelemetryCfg;
65pub use host::{HostBuilder, RunnerHost, TenantHandle};
66pub use wasi::{PreopenSpec, RunnerWasiPolicy};
67
68pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
69
70pub use http::auth::AdminAuth;
71pub use routing::RoutingConfig;
72use routing::TenantRouting;
73pub use runner::HostServer;
74
75/// User-facing configuration for running the unified host.
76#[derive(Clone)]
77pub struct RunnerConfig {
78    pub tenant_bindings: HashMap<String, TenantBindings>,
79    pub pack: PackConfig,
80    pub port: u16,
81    pub refresh_interval: Duration,
82    pub routing: RoutingConfig,
83    pub admin: AdminAuth,
84    pub telemetry: Option<TelemetryCfg>,
85    pub secrets_backend: SecretsBackend,
86    pub wasi_policy: RunnerWasiPolicy,
87    pub resolved_config: ResolvedConfig,
88    pub trace: trace::TraceConfig,
89    pub validation: validate::ValidationConfig,
90}
91
92impl RunnerConfig {
93    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
94    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
95        if bindings.is_empty() {
96            anyhow::bail!("at least one gtbind file is required");
97        }
98        let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
99        if tenant_bindings.is_empty() {
100            anyhow::bail!("no gtbind files loaded");
101        }
102        let mut pack = pack_config_from(
103            &resolved_config.config.packs,
104            &resolved_config.config.paths,
105            &resolved_config.config.network,
106        )?;
107        maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
108        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
109        let port = std::env::var("PORT")
110            .ok()
111            .and_then(|value| value.parse().ok())
112            .unwrap_or(8080);
113        let default_tenant = resolved_config
114            .config
115            .dev
116            .as_ref()
117            .map(|dev| dev.default_tenant.clone())
118            .unwrap_or_else(|| "demo".into());
119        let routing = RoutingConfig::from_env_with_default(default_tenant);
120        let paths = &resolved_config.config.paths;
121        ensure_paths_exist(paths)?;
122        let mut wasi_policy = default_wasi_policy(paths);
123        let mut env_allow = HashSet::new();
124        for binding in tenant_bindings.values() {
125            env_allow.extend(binding.env_passthrough.iter().cloned());
126        }
127        for key in env_allow {
128            wasi_policy = wasi_policy.allow_env(key);
129        }
130
131        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
132            s.events
133                .as_ref()
134                .and_then(|svc| svc.headers.as_ref())
135                .and_then(|headers| headers.get("x-admin-token").cloned())
136        }));
137        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
138        Ok(Self {
139            tenant_bindings,
140            pack,
141            port,
142            refresh_interval: refresh,
143            routing,
144            admin,
145            telemetry: telemetry_from(&resolved_config.config.telemetry),
146            secrets_backend,
147            wasi_policy,
148            resolved_config,
149            trace: trace::TraceConfig::from_env(),
150            validation: validate::ValidationConfig::from_env(),
151        })
152    }
153
154    /// Override the HTTP port used by the host server.
155    pub fn with_port(mut self, port: u16) -> Self {
156        self.port = port;
157        self
158    }
159
160    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
161        self.wasi_policy = policy;
162        self
163    }
164}
165
166fn maybe_write_gtbind_index(
167    tenant_bindings: &HashMap<String, TenantBindings>,
168    paths: &PathsConfig,
169    pack: &mut PackConfig,
170) -> Result<()> {
171    let mut uses_locators = false;
172    for binding in tenant_bindings.values() {
173        for pack_binding in &binding.packs {
174            if pack_binding.pack_locator.is_some() {
175                uses_locators = true;
176            }
177        }
178    }
179    if !uses_locators {
180        return Ok(());
181    }
182
183    let mut entries = serde_json::Map::new();
184    for binding in tenant_bindings.values() {
185        let mut packs = Vec::new();
186        for pack_binding in &binding.packs {
187            let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
188                anyhow::anyhow!(
189                    "gtbind {} missing pack_locator for pack {}",
190                    binding.tenant,
191                    pack_binding.pack_id
192                )
193            })?;
194            let (name, version_or_digest) =
195                pack_binding.pack_ref.split_once('@').ok_or_else(|| {
196                    anyhow::anyhow!(
197                        "gtbind {} invalid pack_ref {} (expected name@version)",
198                        binding.tenant,
199                        pack_binding.pack_ref
200                    )
201                })?;
202            if name != pack_binding.pack_id {
203                anyhow::bail!(
204                    "gtbind {} pack_ref {} does not match pack_id {}",
205                    binding.tenant,
206                    pack_binding.pack_ref,
207                    pack_binding.pack_id
208                );
209            }
210            let mut entry = serde_json::Map::new();
211            entry.insert("name".to_string(), json!(name));
212            if version_or_digest.contains(':') {
213                entry.insert("digest".to_string(), json!(version_or_digest));
214            } else {
215                entry.insert("version".to_string(), json!(version_or_digest));
216            }
217            entry.insert("locator".to_string(), json!(locator));
218            packs.push(serde_json::Value::Object(entry));
219        }
220        let main_pack = packs
221            .first()
222            .cloned()
223            .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
224        let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
225        entries.insert(
226            binding.tenant.clone(),
227            json!({
228                "main_pack": main_pack,
229                "overlays": overlays,
230            }),
231        );
232    }
233
234    let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
235    if let Some(parent) = index_path.parent() {
236        fs::create_dir_all(parent)
237            .with_context(|| format!("failed to create {}", parent.display()))?;
238    }
239    let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
240    fs::write(&index_path, serialized)
241        .with_context(|| format!("failed to write {}", index_path.display()))?;
242    pack.index_location = runner_core::env::IndexLocation::File(index_path);
243    Ok(())
244}
245
246fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
247    let raw = value.unwrap_or_else(|| "30s".into());
248    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
249}
250
251fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
252    let mut policy = RunnerWasiPolicy::default()
253        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
254        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
255        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
256        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
257    policy = policy
258        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
259        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
260        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
261    policy
262}
263
264fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
265    for dir in [
266        &paths.greentic_root,
267        &paths.state_dir,
268        &paths.cache_dir,
269        &paths.logs_dir,
270    ] {
271        fs::create_dir_all(dir)
272            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
273    }
274    Ok(())
275}
276
277fn pack_config_from(
278    packs: &Option<PacksConfig>,
279    paths: &PathsConfig,
280    network: &NetworkConfig,
281) -> Result<PackConfig> {
282    if let Some(cfg) = packs {
283        let cache_dir = cfg.cache_dir.clone();
284        let index_location = match &cfg.source {
285            PackSourceConfig::LocalIndex { path } => {
286                runner_core::env::IndexLocation::File(path.clone())
287            }
288            PackSourceConfig::HttpIndex { url } => {
289                runner_core::env::IndexLocation::from_value(url)?
290            }
291            PackSourceConfig::OciRegistry { reference } => {
292                runner_core::env::IndexLocation::from_value(reference)?
293            }
294        };
295        let public_key = cfg
296            .trust
297            .as_ref()
298            .and_then(|trust| trust.public_keys.first().cloned());
299        return Ok(PackConfig {
300            source: runner_core::env::PackSource::Fs,
301            index_location,
302            cache_dir,
303            public_key,
304            network: Some(network.clone()),
305        });
306    }
307    let mut cfg = PackConfig::default_for_paths(paths)?;
308    cfg.network = Some(network.clone());
309    Ok(cfg)
310}
311
312#[cfg(feature = "telemetry")]
313fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
314    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
315        return None;
316    }
317    let mut export = TelemetryExportConfig::json_default();
318    export.mode = match cfg.exporter {
319        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
320        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
321        TelemetryExporterKind::None => return None,
322    };
323    export.endpoint = cfg.endpoint.clone();
324    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
325    Some(TelemetryCfg {
326        config: greentic_telemetry::TelemetryConfig {
327            service_name: "greentic-runner".into(),
328        },
329        export,
330    })
331}
332
333#[cfg(not(feature = "telemetry"))]
334fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
335    None
336}
337
338/// Run the unified Greentic runner host until shutdown.
339pub async fn run(cfg: RunnerConfig) -> Result<()> {
340    let RunnerConfig {
341        tenant_bindings,
342        pack,
343        port,
344        refresh_interval,
345        routing,
346        admin,
347        telemetry,
348        secrets_backend,
349        wasi_policy,
350        resolved_config: _resolved_config,
351        trace,
352        validation,
353    } = cfg;
354    #[cfg(not(feature = "telemetry"))]
355    let _ = telemetry;
356
357    let mut builder = HostBuilder::new();
358    for bindings in tenant_bindings.into_values() {
359        let mut host_config = HostConfig::from_gtbind(bindings);
360        host_config.trace = trace.clone();
361        host_config.validation = validation.clone();
362        builder = builder.with_config(host_config);
363    }
364    #[cfg(feature = "telemetry")]
365    if let Some(telemetry) = telemetry.clone() {
366        builder = builder.with_telemetry(telemetry);
367    }
368    builder = builder
369        .with_wasi_policy(wasi_policy.clone())
370        .with_secrets_manager(
371            secrets_backend
372                .build_manager()
373                .context("failed to initialise secrets backend")?,
374        );
375
376    let host = Arc::new(builder.build()?);
377    host.start().await?;
378
379    let (watcher, reload_handle) =
380        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
381
382    let routing = TenantRouting::new(routing.clone());
383    let server = HostServer::new(
384        port,
385        host.active_packs(),
386        routing,
387        host.health_state(),
388        Some(reload_handle),
389        admin.clone(),
390    )?;
391
392    tokio::select! {
393        result = server.serve() => {
394            result?;
395        }
396        _ = signal::ctrl_c() => {
397            tracing::info!("received shutdown signal");
398        }
399    }
400
401    drop(watcher);
402    host.stop().await?;
403    Ok(())
404}