Skip to main content

greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::fs;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::secrets::SecretsBackend;
15use anyhow::{Context, Result, anyhow};
16use greentic_config::ResolvedConfig;
17#[cfg(feature = "telemetry")]
18use greentic_config_types::TelemetryExporterKind;
19use greentic_config_types::{
20    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
21};
22#[cfg(feature = "telemetry")]
23use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
24use runner_core::env::PackConfig;
25use tokio::signal;
26
27pub mod boot;
28pub mod component_api;
29pub mod config;
30pub mod engine;
31pub mod http;
32pub mod ingress;
33pub mod pack;
34pub mod provider;
35pub mod provider_core;
36pub mod provider_core_only;
37pub mod routing;
38pub mod runner;
39pub mod runtime;
40pub mod runtime_wasmtime;
41pub mod secrets;
42pub mod storage;
43pub mod telemetry;
44pub mod verify;
45pub mod wasi;
46pub mod watcher;
47
48mod activity;
49mod host;
50pub mod oauth;
51
52pub use activity::{Activity, ActivityKind};
53pub use config::HostConfig;
54pub use host::TelemetryCfg;
55pub use host::{HostBuilder, RunnerHost, TenantHandle};
56pub use wasi::{PreopenSpec, RunnerWasiPolicy};
57
58pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
59
60pub use http::auth::AdminAuth;
61pub use routing::RoutingConfig;
62use routing::TenantRouting;
63pub use runner::HostServer;
64
65/// User-facing configuration for running the unified host.
66#[derive(Clone)]
67pub struct RunnerConfig {
68    pub bindings: Vec<PathBuf>,
69    pub pack: PackConfig,
70    pub port: u16,
71    pub refresh_interval: Duration,
72    pub routing: RoutingConfig,
73    pub admin: AdminAuth,
74    pub telemetry: Option<TelemetryCfg>,
75    pub secrets_backend: SecretsBackend,
76    pub wasi_policy: RunnerWasiPolicy,
77    pub resolved_config: ResolvedConfig,
78}
79
80impl RunnerConfig {
81    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
82    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
83        if bindings.is_empty() {
84            anyhow::bail!("at least one bindings file is required");
85        }
86        let pack = pack_config_from(
87            &resolved_config.config.packs,
88            &resolved_config.config.paths,
89            &resolved_config.config.network,
90        )?;
91        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
92        let port = std::env::var("PORT")
93            .ok()
94            .and_then(|value| value.parse().ok())
95            .unwrap_or(8080);
96        let default_tenant = resolved_config
97            .config
98            .dev
99            .as_ref()
100            .map(|dev| dev.default_tenant.clone())
101            .unwrap_or_else(|| "demo".into());
102        let routing = RoutingConfig::from_env_with_default(default_tenant);
103        let paths = &resolved_config.config.paths;
104        ensure_paths_exist(paths)?;
105        let wasi_policy = default_wasi_policy(paths);
106
107        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
108            s.events
109                .as_ref()
110                .and_then(|svc| svc.headers.as_ref())
111                .and_then(|headers| headers.get("x-admin-token").cloned())
112        }));
113        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
114        Ok(Self {
115            bindings,
116            pack,
117            port,
118            refresh_interval: refresh,
119            routing,
120            admin,
121            telemetry: telemetry_from(&resolved_config.config.telemetry),
122            secrets_backend,
123            wasi_policy,
124            resolved_config,
125        })
126    }
127
128    /// Override the HTTP port used by the host server.
129    pub fn with_port(mut self, port: u16) -> Self {
130        self.port = port;
131        self
132    }
133
134    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
135        self.wasi_policy = policy;
136        self
137    }
138}
139
140fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
141    let raw = value.unwrap_or_else(|| "30s".into());
142    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
143}
144
145fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
146    let mut policy = RunnerWasiPolicy::default()
147        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
148        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
149        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
150        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
151    policy = policy
152        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
153        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
154        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
155    policy
156}
157
158fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
159    for dir in [
160        &paths.greentic_root,
161        &paths.state_dir,
162        &paths.cache_dir,
163        &paths.logs_dir,
164    ] {
165        fs::create_dir_all(dir)
166            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
167    }
168    Ok(())
169}
170
171fn pack_config_from(
172    packs: &Option<PacksConfig>,
173    paths: &PathsConfig,
174    network: &NetworkConfig,
175) -> Result<PackConfig> {
176    if let Some(cfg) = packs {
177        let cache_dir = cfg.cache_dir.clone();
178        let index_location = match &cfg.source {
179            PackSourceConfig::LocalIndex { path } => {
180                runner_core::env::IndexLocation::File(path.clone())
181            }
182            PackSourceConfig::HttpIndex { url } => {
183                runner_core::env::IndexLocation::from_value(url)?
184            }
185            PackSourceConfig::OciRegistry { reference } => {
186                runner_core::env::IndexLocation::from_value(reference)?
187            }
188        };
189        let public_key = cfg
190            .trust
191            .as_ref()
192            .and_then(|trust| trust.public_keys.first().cloned());
193        return Ok(PackConfig {
194            source: runner_core::env::PackSource::Fs,
195            index_location,
196            cache_dir,
197            public_key,
198            network: Some(network.clone()),
199        });
200    }
201    let mut cfg = PackConfig::default_for_paths(paths)?;
202    cfg.network = Some(network.clone());
203    Ok(cfg)
204}
205
206#[cfg(feature = "telemetry")]
207fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
208    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
209        return None;
210    }
211    let mut export = TelemetryExportConfig::json_default();
212    export.mode = match cfg.exporter {
213        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
214        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
215        TelemetryExporterKind::None => return None,
216    };
217    export.endpoint = cfg.endpoint.clone();
218    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
219    Some(TelemetryCfg {
220        config: greentic_telemetry::TelemetryConfig {
221            service_name: "greentic-runner".into(),
222        },
223        export,
224    })
225}
226
227#[cfg(not(feature = "telemetry"))]
228fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
229    None
230}
231
232/// Run the unified Greentic runner host until shutdown.
233pub async fn run(cfg: RunnerConfig) -> Result<()> {
234    let RunnerConfig {
235        bindings,
236        pack,
237        port,
238        refresh_interval,
239        routing,
240        admin,
241        telemetry,
242        secrets_backend,
243        wasi_policy,
244        resolved_config: _resolved_config,
245    } = cfg;
246    #[cfg(not(feature = "telemetry"))]
247    let _ = telemetry;
248
249    let mut builder = HostBuilder::new();
250    for path in &bindings {
251        let host_config = HostConfig::load_from_path(path)
252            .with_context(|| format!("failed to load host bindings {}", path.display()))?;
253        builder = builder.with_config(host_config);
254    }
255    #[cfg(feature = "telemetry")]
256    if let Some(telemetry) = telemetry.clone() {
257        builder = builder.with_telemetry(telemetry);
258    }
259    builder = builder
260        .with_wasi_policy(wasi_policy.clone())
261        .with_secrets_manager(
262            secrets_backend
263                .build_manager()
264                .context("failed to initialise secrets backend")?,
265        );
266
267    let host = Arc::new(builder.build()?);
268    host.start().await?;
269
270    let (watcher, reload_handle) =
271        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
272
273    let routing = TenantRouting::new(routing.clone());
274    let server = HostServer::new(
275        port,
276        host.active_packs(),
277        routing,
278        host.health_state(),
279        Some(reload_handle),
280        admin.clone(),
281    )?;
282
283    tokio::select! {
284        result = server.serve() => {
285            result?;
286        }
287        _ = signal::ctrl_c() => {
288            tracing::info!("received shutdown signal");
289        }
290    }
291
292    drop(watcher);
293    host.stop().await?;
294    Ok(())
295}