greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18#[cfg(feature = "telemetry")]
19use greentic_config_types::TelemetryExporterKind;
20use greentic_config_types::{
21    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
22};
23#[cfg(feature = "telemetry")]
24use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
25use runner_core::env::PackConfig;
26use serde_json::json;
27use tokio::signal;
28
29pub mod boot;
30pub mod cache;
31pub mod component_api;
32pub mod config;
33pub mod engine;
34pub mod gtbind;
35pub mod http;
36pub mod ingress;
37pub mod pack;
38pub mod provider;
39pub mod provider_core;
40pub mod provider_core_only;
41pub mod routing;
42pub mod runner;
43pub mod runtime;
44pub mod runtime_wasmtime;
45pub mod secrets;
46pub mod storage;
47pub mod telemetry;
48pub mod verify;
49pub mod wasi;
50pub mod watcher;
51
52mod activity;
53mod host;
54pub mod oauth;
55
56pub use activity::{Activity, ActivityKind};
57pub use config::HostConfig;
58pub use gtbind::{PackBinding, TenantBindings};
59pub use host::TelemetryCfg;
60pub use host::{HostBuilder, RunnerHost, TenantHandle};
61pub use wasi::{PreopenSpec, RunnerWasiPolicy};
62
63pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
64
65pub use http::auth::AdminAuth;
66pub use routing::RoutingConfig;
67use routing::TenantRouting;
68pub use runner::HostServer;
69
70/// User-facing configuration for running the unified host.
71#[derive(Clone)]
72pub struct RunnerConfig {
73    pub tenant_bindings: HashMap<String, TenantBindings>,
74    pub pack: PackConfig,
75    pub port: u16,
76    pub refresh_interval: Duration,
77    pub routing: RoutingConfig,
78    pub admin: AdminAuth,
79    pub telemetry: Option<TelemetryCfg>,
80    pub secrets_backend: SecretsBackend,
81    pub wasi_policy: RunnerWasiPolicy,
82    pub resolved_config: ResolvedConfig,
83}
84
85impl RunnerConfig {
86    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
87    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
88        if bindings.is_empty() {
89            anyhow::bail!("at least one gtbind file is required");
90        }
91        let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
92        if tenant_bindings.is_empty() {
93            anyhow::bail!("no gtbind files loaded");
94        }
95        let mut pack = pack_config_from(
96            &resolved_config.config.packs,
97            &resolved_config.config.paths,
98            &resolved_config.config.network,
99        )?;
100        maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
101        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
102        let port = std::env::var("PORT")
103            .ok()
104            .and_then(|value| value.parse().ok())
105            .unwrap_or(8080);
106        let default_tenant = resolved_config
107            .config
108            .dev
109            .as_ref()
110            .map(|dev| dev.default_tenant.clone())
111            .unwrap_or_else(|| "demo".into());
112        let routing = RoutingConfig::from_env_with_default(default_tenant);
113        let paths = &resolved_config.config.paths;
114        ensure_paths_exist(paths)?;
115        let mut wasi_policy = default_wasi_policy(paths);
116        let mut env_allow = HashSet::new();
117        for binding in tenant_bindings.values() {
118            env_allow.extend(binding.env_passthrough.iter().cloned());
119        }
120        for key in env_allow {
121            wasi_policy = wasi_policy.allow_env(key);
122        }
123
124        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
125            s.events
126                .as_ref()
127                .and_then(|svc| svc.headers.as_ref())
128                .and_then(|headers| headers.get("x-admin-token").cloned())
129        }));
130        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
131        Ok(Self {
132            tenant_bindings,
133            pack,
134            port,
135            refresh_interval: refresh,
136            routing,
137            admin,
138            telemetry: telemetry_from(&resolved_config.config.telemetry),
139            secrets_backend,
140            wasi_policy,
141            resolved_config,
142        })
143    }
144
145    /// Override the HTTP port used by the host server.
146    pub fn with_port(mut self, port: u16) -> Self {
147        self.port = port;
148        self
149    }
150
151    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
152        self.wasi_policy = policy;
153        self
154    }
155}
156
157fn maybe_write_gtbind_index(
158    tenant_bindings: &HashMap<String, TenantBindings>,
159    paths: &PathsConfig,
160    pack: &mut PackConfig,
161) -> Result<()> {
162    let mut uses_locators = false;
163    for binding in tenant_bindings.values() {
164        for pack_binding in &binding.packs {
165            if pack_binding.pack_locator.is_some() {
166                uses_locators = true;
167            }
168        }
169    }
170    if !uses_locators {
171        return Ok(());
172    }
173
174    let mut entries = serde_json::Map::new();
175    for binding in tenant_bindings.values() {
176        let mut packs = Vec::new();
177        for pack_binding in &binding.packs {
178            let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
179                anyhow::anyhow!(
180                    "gtbind {} missing pack_locator for pack {}",
181                    binding.tenant,
182                    pack_binding.pack_id
183                )
184            })?;
185            let (name, version_or_digest) =
186                pack_binding.pack_ref.split_once('@').ok_or_else(|| {
187                    anyhow::anyhow!(
188                        "gtbind {} invalid pack_ref {} (expected name@version)",
189                        binding.tenant,
190                        pack_binding.pack_ref
191                    )
192                })?;
193            if name != pack_binding.pack_id {
194                anyhow::bail!(
195                    "gtbind {} pack_ref {} does not match pack_id {}",
196                    binding.tenant,
197                    pack_binding.pack_ref,
198                    pack_binding.pack_id
199                );
200            }
201            let mut entry = serde_json::Map::new();
202            entry.insert("name".to_string(), json!(name));
203            if version_or_digest.contains(':') {
204                entry.insert("digest".to_string(), json!(version_or_digest));
205            } else {
206                entry.insert("version".to_string(), json!(version_or_digest));
207            }
208            entry.insert("locator".to_string(), json!(locator));
209            packs.push(serde_json::Value::Object(entry));
210        }
211        let main_pack = packs
212            .first()
213            .cloned()
214            .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
215        let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
216        entries.insert(
217            binding.tenant.clone(),
218            json!({
219                "main_pack": main_pack,
220                "overlays": overlays,
221            }),
222        );
223    }
224
225    let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
226    if let Some(parent) = index_path.parent() {
227        fs::create_dir_all(parent)
228            .with_context(|| format!("failed to create {}", parent.display()))?;
229    }
230    let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
231    fs::write(&index_path, serialized)
232        .with_context(|| format!("failed to write {}", index_path.display()))?;
233    pack.index_location = runner_core::env::IndexLocation::File(index_path);
234    Ok(())
235}
236
237fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
238    let raw = value.unwrap_or_else(|| "30s".into());
239    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
240}
241
242fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
243    let mut policy = RunnerWasiPolicy::default()
244        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
245        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
246        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
247        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
248    policy = policy
249        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
250        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
251        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
252    policy
253}
254
255fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
256    for dir in [
257        &paths.greentic_root,
258        &paths.state_dir,
259        &paths.cache_dir,
260        &paths.logs_dir,
261    ] {
262        fs::create_dir_all(dir)
263            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
264    }
265    Ok(())
266}
267
268fn pack_config_from(
269    packs: &Option<PacksConfig>,
270    paths: &PathsConfig,
271    network: &NetworkConfig,
272) -> Result<PackConfig> {
273    if let Some(cfg) = packs {
274        let cache_dir = cfg.cache_dir.clone();
275        let index_location = match &cfg.source {
276            PackSourceConfig::LocalIndex { path } => {
277                runner_core::env::IndexLocation::File(path.clone())
278            }
279            PackSourceConfig::HttpIndex { url } => {
280                runner_core::env::IndexLocation::from_value(url)?
281            }
282            PackSourceConfig::OciRegistry { reference } => {
283                runner_core::env::IndexLocation::from_value(reference)?
284            }
285        };
286        let public_key = cfg
287            .trust
288            .as_ref()
289            .and_then(|trust| trust.public_keys.first().cloned());
290        return Ok(PackConfig {
291            source: runner_core::env::PackSource::Fs,
292            index_location,
293            cache_dir,
294            public_key,
295            network: Some(network.clone()),
296        });
297    }
298    let mut cfg = PackConfig::default_for_paths(paths)?;
299    cfg.network = Some(network.clone());
300    Ok(cfg)
301}
302
303#[cfg(feature = "telemetry")]
304fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
305    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
306        return None;
307    }
308    let mut export = TelemetryExportConfig::json_default();
309    export.mode = match cfg.exporter {
310        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
311        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
312        TelemetryExporterKind::None => return None,
313    };
314    export.endpoint = cfg.endpoint.clone();
315    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
316    Some(TelemetryCfg {
317        config: greentic_telemetry::TelemetryConfig {
318            service_name: "greentic-runner".into(),
319        },
320        export,
321    })
322}
323
324#[cfg(not(feature = "telemetry"))]
325fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
326    None
327}
328
329/// Run the unified Greentic runner host until shutdown.
330pub async fn run(cfg: RunnerConfig) -> Result<()> {
331    let RunnerConfig {
332        tenant_bindings,
333        pack,
334        port,
335        refresh_interval,
336        routing,
337        admin,
338        telemetry,
339        secrets_backend,
340        wasi_policy,
341        resolved_config: _resolved_config,
342    } = cfg;
343    #[cfg(not(feature = "telemetry"))]
344    let _ = telemetry;
345
346    let mut builder = HostBuilder::new();
347    for bindings in tenant_bindings.into_values() {
348        let host_config = HostConfig::from_gtbind(bindings);
349        builder = builder.with_config(host_config);
350    }
351    #[cfg(feature = "telemetry")]
352    if let Some(telemetry) = telemetry.clone() {
353        builder = builder.with_telemetry(telemetry);
354    }
355    builder = builder
356        .with_wasi_policy(wasi_policy.clone())
357        .with_secrets_manager(
358            secrets_backend
359                .build_manager()
360                .context("failed to initialise secrets backend")?,
361        );
362
363    let host = Arc::new(builder.build()?);
364    host.start().await?;
365
366    let (watcher, reload_handle) =
367        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
368
369    let routing = TenantRouting::new(routing.clone());
370    let server = HostServer::new(
371        port,
372        host.active_packs(),
373        routing,
374        host.health_state(),
375        Some(reload_handle),
376        admin.clone(),
377    )?;
378
379    tokio::select! {
380        result = server.serve() => {
381            result?;
382        }
383        _ = signal::ctrl_c() => {
384            tracing::info!("received shutdown signal");
385        }
386    }
387
388    drop(watcher);
389    host.stop().await?;
390    Ok(())
391}