greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::fs;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::secrets::SecretsBackend;
15use anyhow::{Context, Result, anyhow};
16use greentic_config::ResolvedConfig;
17use greentic_config_types::{
18    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
19    TelemetryExporterKind,
20};
21#[cfg(feature = "telemetry")]
22use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
23use runner_core::env::PackConfig;
24use tokio::signal;
25
26pub mod boot;
27pub mod component_api;
28pub mod config;
29pub mod engine;
30pub mod http;
31pub mod ingress;
32pub mod pack;
33pub mod routing;
34pub mod runner;
35pub mod runtime;
36pub mod runtime_wasmtime;
37pub mod secrets;
38pub mod storage;
39pub mod telemetry;
40pub mod verify;
41pub mod wasi;
42pub mod watcher;
43
44mod activity;
45mod host;
46pub mod oauth;
47
48pub use activity::{Activity, ActivityKind};
49pub use config::HostConfig;
50pub use host::TelemetryCfg;
51pub use host::{HostBuilder, RunnerHost, TenantHandle};
52pub use wasi::{PreopenSpec, RunnerWasiPolicy};
53
54pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
55
56pub use http::auth::AdminAuth;
57pub use routing::RoutingConfig;
58use routing::TenantRouting;
59pub use runner::HostServer;
60
61/// User-facing configuration for running the unified host.
62#[derive(Clone)]
63pub struct RunnerConfig {
64    pub bindings: Vec<PathBuf>,
65    pub pack: PackConfig,
66    pub port: u16,
67    pub refresh_interval: Duration,
68    pub routing: RoutingConfig,
69    pub admin: AdminAuth,
70    pub telemetry: Option<TelemetryCfg>,
71    pub secrets_backend: SecretsBackend,
72    pub wasi_policy: RunnerWasiPolicy,
73    pub resolved_config: ResolvedConfig,
74}
75
76impl RunnerConfig {
77    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
78    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
79        if bindings.is_empty() {
80            anyhow::bail!("at least one bindings file is required");
81        }
82        let pack = pack_config_from(
83            &resolved_config.config.packs,
84            &resolved_config.config.paths,
85            &resolved_config.config.network,
86        )?;
87        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
88        let port = std::env::var("PORT")
89            .ok()
90            .and_then(|value| value.parse().ok())
91            .unwrap_or(8080);
92        let default_tenant = resolved_config
93            .config
94            .dev
95            .as_ref()
96            .map(|dev| dev.default_tenant.clone())
97            .unwrap_or_else(|| "demo".into());
98        let routing = RoutingConfig::from_env_with_default(default_tenant);
99        let paths = &resolved_config.config.paths;
100        ensure_paths_exist(paths)?;
101        let wasi_policy = default_wasi_policy(paths);
102
103        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
104            s.events
105                .as_ref()
106                .and_then(|svc| svc.headers.as_ref())
107                .and_then(|headers| headers.get("x-admin-token").cloned())
108        }));
109        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
110        Ok(Self {
111            bindings,
112            pack,
113            port,
114            refresh_interval: refresh,
115            routing,
116            admin,
117            telemetry: telemetry_from(&resolved_config.config.telemetry),
118            secrets_backend,
119            wasi_policy,
120            resolved_config,
121        })
122    }
123
124    /// Override the HTTP port used by the host server.
125    pub fn with_port(mut self, port: u16) -> Self {
126        self.port = port;
127        self
128    }
129
130    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
131        self.wasi_policy = policy;
132        self
133    }
134}
135
136fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
137    let raw = value.unwrap_or_else(|| "30s".into());
138    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
139}
140
141fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
142    let mut policy = RunnerWasiPolicy::default()
143        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
144        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
145        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
146        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
147    policy = policy
148        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
149        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
150        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
151    policy
152}
153
154fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
155    for dir in [
156        &paths.greentic_root,
157        &paths.state_dir,
158        &paths.cache_dir,
159        &paths.logs_dir,
160    ] {
161        fs::create_dir_all(dir)
162            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
163    }
164    Ok(())
165}
166
167fn pack_config_from(
168    packs: &Option<PacksConfig>,
169    paths: &PathsConfig,
170    network: &NetworkConfig,
171) -> Result<PackConfig> {
172    if let Some(cfg) = packs {
173        let cache_dir = cfg.cache_dir.clone();
174        let index_location = match &cfg.source {
175            PackSourceConfig::LocalIndex { path } => {
176                runner_core::env::IndexLocation::File(path.clone())
177            }
178            PackSourceConfig::HttpIndex { url } => {
179                runner_core::env::IndexLocation::from_value(url)?
180            }
181            PackSourceConfig::OciRegistry { reference } => {
182                runner_core::env::IndexLocation::from_value(reference)?
183            }
184        };
185        let public_key = cfg
186            .trust
187            .as_ref()
188            .and_then(|trust| trust.public_keys.first().cloned());
189        return Ok(PackConfig {
190            source: runner_core::env::PackSource::Fs,
191            index_location,
192            cache_dir,
193            public_key,
194            network: Some(network.clone()),
195        });
196    }
197    let mut cfg = PackConfig::default_for_paths(paths)?;
198    cfg.network = Some(network.clone());
199    Ok(cfg)
200}
201
202#[cfg(feature = "telemetry")]
203fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
204    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
205        return None;
206    }
207    let mut export = TelemetryExportConfig::json_default();
208    export.mode = match cfg.exporter {
209        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
210        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
211        TelemetryExporterKind::None => return None,
212    };
213    export.endpoint = cfg.endpoint.clone();
214    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
215    Some(TelemetryCfg {
216        config: greentic_telemetry::TelemetryConfig {
217            service_name: "greentic-runner".into(),
218        },
219        export,
220    })
221}
222
223#[cfg(not(feature = "telemetry"))]
224fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
225    None
226}
227
228/// Run the unified Greentic runner host until shutdown.
229pub async fn run(cfg: RunnerConfig) -> Result<()> {
230    let RunnerConfig {
231        bindings,
232        pack,
233        port,
234        refresh_interval,
235        routing,
236        admin,
237        telemetry,
238        secrets_backend,
239        wasi_policy,
240        resolved_config: _resolved_config,
241    } = cfg;
242    #[cfg(not(feature = "telemetry"))]
243    let _ = telemetry;
244
245    let mut builder = HostBuilder::new();
246    for path in &bindings {
247        let host_config = HostConfig::load_from_path(path)
248            .with_context(|| format!("failed to load host bindings {}", path.display()))?;
249        builder = builder.with_config(host_config);
250    }
251    #[cfg(feature = "telemetry")]
252    if let Some(telemetry) = telemetry.clone() {
253        builder = builder.with_telemetry(telemetry);
254    }
255    builder = builder
256        .with_wasi_policy(wasi_policy.clone())
257        .with_secrets_manager(
258            secrets_backend
259                .build_manager()
260                .context("failed to initialise secrets backend")?,
261        );
262
263    let host = Arc::new(builder.build()?);
264    host.start().await?;
265
266    let (watcher, reload_handle) =
267        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
268
269    let routing = TenantRouting::new(routing.clone());
270    let server = HostServer::new(
271        port,
272        host.active_packs(),
273        routing,
274        host.health_state(),
275        Some(reload_handle),
276        admin.clone(),
277    )?;
278
279    tokio::select! {
280        result = server.serve() => {
281            result?;
282        }
283        _ = signal::ctrl_c() => {
284            tracing::info!("received shutdown signal");
285        }
286    }
287
288    drop(watcher);
289    host.stop().await?;
290    Ok(())
291}