greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::fs;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::secrets::SecretsBackend;
15use anyhow::{Context, Result, anyhow};
16use greentic_config::ResolvedConfig;
17#[cfg(feature = "telemetry")]
18use greentic_config_types::TelemetryExporterKind;
19use greentic_config_types::{
20    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
21};
22#[cfg(feature = "telemetry")]
23use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
24use runner_core::env::PackConfig;
25use tokio::signal;
26
27pub mod boot;
28pub mod cache;
29pub mod component_api;
30pub mod config;
31pub mod engine;
32pub mod http;
33pub mod ingress;
34pub mod pack;
35pub mod provider;
36pub mod provider_core;
37pub mod provider_core_only;
38pub mod routing;
39pub mod runner;
40pub mod runtime;
41pub mod runtime_wasmtime;
42pub mod secrets;
43pub mod storage;
44pub mod telemetry;
45pub mod verify;
46pub mod wasi;
47pub mod watcher;
48
49mod activity;
50mod host;
51pub mod oauth;
52
53pub use activity::{Activity, ActivityKind};
54pub use config::HostConfig;
55pub use host::TelemetryCfg;
56pub use host::{HostBuilder, RunnerHost, TenantHandle};
57pub use wasi::{PreopenSpec, RunnerWasiPolicy};
58
59pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
60
61pub use http::auth::AdminAuth;
62pub use routing::RoutingConfig;
63use routing::TenantRouting;
64pub use runner::HostServer;
65
66/// User-facing configuration for running the unified host.
67#[derive(Clone)]
68pub struct RunnerConfig {
69    pub bindings: Vec<PathBuf>,
70    pub pack: PackConfig,
71    pub port: u16,
72    pub refresh_interval: Duration,
73    pub routing: RoutingConfig,
74    pub admin: AdminAuth,
75    pub telemetry: Option<TelemetryCfg>,
76    pub secrets_backend: SecretsBackend,
77    pub wasi_policy: RunnerWasiPolicy,
78    pub resolved_config: ResolvedConfig,
79}
80
81impl RunnerConfig {
82    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
83    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
84        if bindings.is_empty() {
85            anyhow::bail!("at least one bindings file is required");
86        }
87        let pack = pack_config_from(
88            &resolved_config.config.packs,
89            &resolved_config.config.paths,
90            &resolved_config.config.network,
91        )?;
92        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
93        let port = std::env::var("PORT")
94            .ok()
95            .and_then(|value| value.parse().ok())
96            .unwrap_or(8080);
97        let default_tenant = resolved_config
98            .config
99            .dev
100            .as_ref()
101            .map(|dev| dev.default_tenant.clone())
102            .unwrap_or_else(|| "demo".into());
103        let routing = RoutingConfig::from_env_with_default(default_tenant);
104        let paths = &resolved_config.config.paths;
105        ensure_paths_exist(paths)?;
106        let wasi_policy = default_wasi_policy(paths);
107
108        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
109            s.events
110                .as_ref()
111                .and_then(|svc| svc.headers.as_ref())
112                .and_then(|headers| headers.get("x-admin-token").cloned())
113        }));
114        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
115        Ok(Self {
116            bindings,
117            pack,
118            port,
119            refresh_interval: refresh,
120            routing,
121            admin,
122            telemetry: telemetry_from(&resolved_config.config.telemetry),
123            secrets_backend,
124            wasi_policy,
125            resolved_config,
126        })
127    }
128
129    /// Override the HTTP port used by the host server.
130    pub fn with_port(mut self, port: u16) -> Self {
131        self.port = port;
132        self
133    }
134
135    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
136        self.wasi_policy = policy;
137        self
138    }
139}
140
141fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
142    let raw = value.unwrap_or_else(|| "30s".into());
143    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
144}
145
146fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
147    let mut policy = RunnerWasiPolicy::default()
148        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
149        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
150        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
151        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
152    policy = policy
153        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
154        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
155        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
156    policy
157}
158
159fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
160    for dir in [
161        &paths.greentic_root,
162        &paths.state_dir,
163        &paths.cache_dir,
164        &paths.logs_dir,
165    ] {
166        fs::create_dir_all(dir)
167            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
168    }
169    Ok(())
170}
171
172fn pack_config_from(
173    packs: &Option<PacksConfig>,
174    paths: &PathsConfig,
175    network: &NetworkConfig,
176) -> Result<PackConfig> {
177    if let Some(cfg) = packs {
178        let cache_dir = cfg.cache_dir.clone();
179        let index_location = match &cfg.source {
180            PackSourceConfig::LocalIndex { path } => {
181                runner_core::env::IndexLocation::File(path.clone())
182            }
183            PackSourceConfig::HttpIndex { url } => {
184                runner_core::env::IndexLocation::from_value(url)?
185            }
186            PackSourceConfig::OciRegistry { reference } => {
187                runner_core::env::IndexLocation::from_value(reference)?
188            }
189        };
190        let public_key = cfg
191            .trust
192            .as_ref()
193            .and_then(|trust| trust.public_keys.first().cloned());
194        return Ok(PackConfig {
195            source: runner_core::env::PackSource::Fs,
196            index_location,
197            cache_dir,
198            public_key,
199            network: Some(network.clone()),
200        });
201    }
202    let mut cfg = PackConfig::default_for_paths(paths)?;
203    cfg.network = Some(network.clone());
204    Ok(cfg)
205}
206
207#[cfg(feature = "telemetry")]
208fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
209    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
210        return None;
211    }
212    let mut export = TelemetryExportConfig::json_default();
213    export.mode = match cfg.exporter {
214        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
215        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
216        TelemetryExporterKind::None => return None,
217    };
218    export.endpoint = cfg.endpoint.clone();
219    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
220    Some(TelemetryCfg {
221        config: greentic_telemetry::TelemetryConfig {
222            service_name: "greentic-runner".into(),
223        },
224        export,
225    })
226}
227
228#[cfg(not(feature = "telemetry"))]
229fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
230    None
231}
232
233/// Run the unified Greentic runner host until shutdown.
234pub async fn run(cfg: RunnerConfig) -> Result<()> {
235    let RunnerConfig {
236        bindings,
237        pack,
238        port,
239        refresh_interval,
240        routing,
241        admin,
242        telemetry,
243        secrets_backend,
244        wasi_policy,
245        resolved_config: _resolved_config,
246    } = cfg;
247    #[cfg(not(feature = "telemetry"))]
248    let _ = telemetry;
249
250    let mut builder = HostBuilder::new();
251    for path in &bindings {
252        let host_config = HostConfig::load_from_path(path)
253            .with_context(|| format!("failed to load host bindings {}", path.display()))?;
254        builder = builder.with_config(host_config);
255    }
256    #[cfg(feature = "telemetry")]
257    if let Some(telemetry) = telemetry.clone() {
258        builder = builder.with_telemetry(telemetry);
259    }
260    builder = builder
261        .with_wasi_policy(wasi_policy.clone())
262        .with_secrets_manager(
263            secrets_backend
264                .build_manager()
265                .context("failed to initialise secrets backend")?,
266        );
267
268    let host = Arc::new(builder.build()?);
269    host.start().await?;
270
271    let (watcher, reload_handle) =
272        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
273
274    let routing = TenantRouting::new(routing.clone());
275    let server = HostServer::new(
276        port,
277        host.active_packs(),
278        routing,
279        host.health_state(),
280        Some(reload_handle),
281        admin.clone(),
282    )?;
283
284    tokio::select! {
285        result = server.serve() => {
286            result?;
287        }
288        _ = signal::ctrl_c() => {
289            tracing::info!("received shutdown signal");
290        }
291    }
292
293    drop(watcher);
294    host.stop().await?;
295    Ok(())
296}