greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::fs;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::secrets::SecretsBackend;
15use anyhow::{Context, Result, anyhow};
16use greentic_config::ResolvedConfig;
17#[cfg(feature = "telemetry")]
18use greentic_config_types::TelemetryExporterKind;
19use greentic_config_types::{
20    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
21};
22#[cfg(feature = "telemetry")]
23use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
24use runner_core::env::PackConfig;
25use tokio::signal;
26
27pub mod boot;
28pub mod component_api;
29pub mod config;
30pub mod engine;
31pub mod http;
32pub mod ingress;
33pub mod pack;
34pub mod routing;
35pub mod runner;
36pub mod runtime;
37pub mod runtime_wasmtime;
38pub mod secrets;
39pub mod storage;
40pub mod telemetry;
41pub mod verify;
42pub mod wasi;
43pub mod watcher;
44
45mod activity;
46mod host;
47pub mod oauth;
48
49pub use activity::{Activity, ActivityKind};
50pub use config::HostConfig;
51pub use host::TelemetryCfg;
52pub use host::{HostBuilder, RunnerHost, TenantHandle};
53pub use wasi::{PreopenSpec, RunnerWasiPolicy};
54
55pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
56
57pub use http::auth::AdminAuth;
58pub use routing::RoutingConfig;
59use routing::TenantRouting;
60pub use runner::HostServer;
61
62/// User-facing configuration for running the unified host.
63#[derive(Clone)]
64pub struct RunnerConfig {
65    pub bindings: Vec<PathBuf>,
66    pub pack: PackConfig,
67    pub port: u16,
68    pub refresh_interval: Duration,
69    pub routing: RoutingConfig,
70    pub admin: AdminAuth,
71    pub telemetry: Option<TelemetryCfg>,
72    pub secrets_backend: SecretsBackend,
73    pub wasi_policy: RunnerWasiPolicy,
74    pub resolved_config: ResolvedConfig,
75}
76
77impl RunnerConfig {
78    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
79    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
80        if bindings.is_empty() {
81            anyhow::bail!("at least one bindings file is required");
82        }
83        let pack = pack_config_from(
84            &resolved_config.config.packs,
85            &resolved_config.config.paths,
86            &resolved_config.config.network,
87        )?;
88        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
89        let port = std::env::var("PORT")
90            .ok()
91            .and_then(|value| value.parse().ok())
92            .unwrap_or(8080);
93        let default_tenant = resolved_config
94            .config
95            .dev
96            .as_ref()
97            .map(|dev| dev.default_tenant.clone())
98            .unwrap_or_else(|| "demo".into());
99        let routing = RoutingConfig::from_env_with_default(default_tenant);
100        let paths = &resolved_config.config.paths;
101        ensure_paths_exist(paths)?;
102        let wasi_policy = default_wasi_policy(paths);
103
104        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
105            s.events
106                .as_ref()
107                .and_then(|svc| svc.headers.as_ref())
108                .and_then(|headers| headers.get("x-admin-token").cloned())
109        }));
110        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
111        Ok(Self {
112            bindings,
113            pack,
114            port,
115            refresh_interval: refresh,
116            routing,
117            admin,
118            telemetry: telemetry_from(&resolved_config.config.telemetry),
119            secrets_backend,
120            wasi_policy,
121            resolved_config,
122        })
123    }
124
125    /// Override the HTTP port used by the host server.
126    pub fn with_port(mut self, port: u16) -> Self {
127        self.port = port;
128        self
129    }
130
131    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
132        self.wasi_policy = policy;
133        self
134    }
135}
136
137fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
138    let raw = value.unwrap_or_else(|| "30s".into());
139    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
140}
141
142fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
143    let mut policy = RunnerWasiPolicy::default()
144        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
145        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
146        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
147        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
148    policy = policy
149        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
150        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
151        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
152    policy
153}
154
155fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
156    for dir in [
157        &paths.greentic_root,
158        &paths.state_dir,
159        &paths.cache_dir,
160        &paths.logs_dir,
161    ] {
162        fs::create_dir_all(dir)
163            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
164    }
165    Ok(())
166}
167
168fn pack_config_from(
169    packs: &Option<PacksConfig>,
170    paths: &PathsConfig,
171    network: &NetworkConfig,
172) -> Result<PackConfig> {
173    if let Some(cfg) = packs {
174        let cache_dir = cfg.cache_dir.clone();
175        let index_location = match &cfg.source {
176            PackSourceConfig::LocalIndex { path } => {
177                runner_core::env::IndexLocation::File(path.clone())
178            }
179            PackSourceConfig::HttpIndex { url } => {
180                runner_core::env::IndexLocation::from_value(url)?
181            }
182            PackSourceConfig::OciRegistry { reference } => {
183                runner_core::env::IndexLocation::from_value(reference)?
184            }
185        };
186        let public_key = cfg
187            .trust
188            .as_ref()
189            .and_then(|trust| trust.public_keys.first().cloned());
190        return Ok(PackConfig {
191            source: runner_core::env::PackSource::Fs,
192            index_location,
193            cache_dir,
194            public_key,
195            network: Some(network.clone()),
196        });
197    }
198    let mut cfg = PackConfig::default_for_paths(paths)?;
199    cfg.network = Some(network.clone());
200    Ok(cfg)
201}
202
203#[cfg(feature = "telemetry")]
204fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
205    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
206        return None;
207    }
208    let mut export = TelemetryExportConfig::json_default();
209    export.mode = match cfg.exporter {
210        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
211        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
212        TelemetryExporterKind::None => return None,
213    };
214    export.endpoint = cfg.endpoint.clone();
215    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
216    Some(TelemetryCfg {
217        config: greentic_telemetry::TelemetryConfig {
218            service_name: "greentic-runner".into(),
219        },
220        export,
221    })
222}
223
224#[cfg(not(feature = "telemetry"))]
225fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
226    None
227}
228
229/// Run the unified Greentic runner host until shutdown.
230pub async fn run(cfg: RunnerConfig) -> Result<()> {
231    let RunnerConfig {
232        bindings,
233        pack,
234        port,
235        refresh_interval,
236        routing,
237        admin,
238        telemetry,
239        secrets_backend,
240        wasi_policy,
241        resolved_config: _resolved_config,
242    } = cfg;
243    #[cfg(not(feature = "telemetry"))]
244    let _ = telemetry;
245
246    let mut builder = HostBuilder::new();
247    for path in &bindings {
248        let host_config = HostConfig::load_from_path(path)
249            .with_context(|| format!("failed to load host bindings {}", path.display()))?;
250        builder = builder.with_config(host_config);
251    }
252    #[cfg(feature = "telemetry")]
253    if let Some(telemetry) = telemetry.clone() {
254        builder = builder.with_telemetry(telemetry);
255    }
256    builder = builder
257        .with_wasi_policy(wasi_policy.clone())
258        .with_secrets_manager(
259            secrets_backend
260                .build_manager()
261                .context("failed to initialise secrets backend")?,
262        );
263
264    let host = Arc::new(builder.build()?);
265    host.start().await?;
266
267    let (watcher, reload_handle) =
268        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
269
270    let routing = TenantRouting::new(routing.clone());
271    let server = HostServer::new(
272        port,
273        host.active_packs(),
274        routing,
275        host.health_state(),
276        Some(reload_handle),
277        admin.clone(),
278    )?;
279
280    tokio::select! {
281        result = server.serve() => {
282            result?;
283        }
284        _ = signal::ctrl_c() => {
285            tracing::info!("received shutdown signal");
286        }
287    }
288
289    drop(watcher);
290    host.stop().await?;
291    Ok(())
292}