Skip to main content

greentic_runner_host/
lib.rs

1#![deny(unsafe_code)]
2//! Canonical Greentic host runtime.
3//!
4//! This crate owns tenant bindings, pack ingestion/watchers, ingress adapters,
5//! Wasmtime glue, session/state storage, and the HTTP server used by the
6//! `greentic-runner` CLI. Downstream crates embed it either through
7//! [`RunnerConfig`] + [`run`] (HTTP host) or [`HostBuilder`] (direct API access).
8
9use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18use greentic_config_types::TelemetryExporterKind;
19use greentic_config_types::{
20    NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
21};
22use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
23use runner_core::env::PackConfig;
24use serde_json::json;
25use tokio::signal;
26
27pub mod boot;
28pub mod cache;
29pub mod component_api;
30pub mod config;
31pub mod engine;
32pub mod fault;
33#[cfg(feature = "greentic-x-provider")]
34pub mod greentic_x_provider;
35pub mod gtbind;
36pub mod http;
37pub mod identify_hint;
38pub mod metrics;
39pub mod operator_metrics;
40pub mod operator_registry;
41pub mod pack;
42pub mod provider;
43pub mod provider_core;
44pub mod provider_core_only;
45pub mod routing;
46pub mod runner;
47pub mod runtime;
48pub mod runtime_refs;
49pub mod runtime_wasmtime;
50pub mod secrets;
51pub mod storage;
52pub mod telemetry;
53pub mod telemetry_scan;
54#[cfg(feature = "fault-injection")]
55pub mod testing;
56pub mod trace;
57pub mod validate;
58pub mod verify;
59pub mod wasi;
60pub mod watcher;
61
62mod activity;
63mod host;
64pub mod oauth;
65
66pub use activity::{Activity, ActivityKind, WelcomeFlowHint};
67pub use config::HostConfig;
68pub use gtbind::{PackBinding, TenantBindings};
69pub use host::TelemetryCfg;
70pub use host::{HostBuilder, RunnerHost, TenantHandle};
71pub use wasi::{PreopenSpec, RunnerWasiPolicy};
72
73pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
74
75pub use http::auth::AdminAuth;
76pub use routing::RoutingConfig;
77use routing::TenantRouting;
78pub use runner::HostServer;
79
80#[cfg(test)]
81pub(crate) mod test_support {
82    use super::*;
83    use crate::config::{OperatorPolicy, SecretsPolicy};
84    use crate::runtime::TenantRuntime;
85    use crate::secrets::default_manager;
86    use crate::storage::{new_session_store, new_state_store, session_host_from, state_host_from};
87    use crate::trace::TraceConfig;
88    use crate::validate::ValidationConfig;
89    use tempfile::TempDir;
90
91    pub(crate) fn fixture_pack_path() -> PathBuf {
92        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
93            .join("../../examples/packs/demo.gtpack")
94            .canonicalize()
95            .expect("fixture pack path")
96    }
97
98    fn minimal_config(workspace: &std::path::Path) -> Result<Arc<HostConfig>> {
99        let bindings_path = workspace.join("bindings.yaml");
100        std::fs::write(
101            &bindings_path,
102            r#"
103tenant: demo
104flow_type_bindings: {}
105rate_limits: {}
106retry: {}
107timers: []
108"#,
109        )?;
110        let mut config =
111            HostConfig::load_from_path(&bindings_path).context("load minimal host bindings")?;
112        config.secrets_policy = SecretsPolicy::allow_all();
113        config.operator_policy = OperatorPolicy::allow_all();
114        config.trace = TraceConfig::from_env();
115        config.validation = ValidationConfig::from_env();
116        Ok(Arc::new(config))
117    }
118
119    pub(crate) async fn build_test_runtime() -> Result<(TempDir, Arc<TenantRuntime>)> {
120        let workspace = TempDir::new().context("temp workspace")?;
121        let config = minimal_config(workspace.path())?;
122        let session_store = new_session_store();
123        let session_host = session_host_from(Arc::clone(&session_store));
124        let state_store = new_state_store();
125        let state_host = state_host_from(Arc::clone(&state_store));
126        let secrets = default_manager()?;
127        let pack_path = fixture_pack_path();
128        let runtime = TenantRuntime::load(
129            &pack_path,
130            config,
131            None,
132            Some(&pack_path),
133            None,
134            Arc::new(RunnerWasiPolicy::new()),
135            session_host,
136            Arc::clone(&session_store),
137            Arc::clone(&state_store),
138            state_host,
139            secrets,
140        )
141        .await?;
142        Ok((workspace, runtime))
143    }
144}
145
146/// User-facing configuration for running the unified host.
147#[derive(Clone)]
148pub struct RunnerConfig {
149    pub tenant_bindings: HashMap<String, TenantBindings>,
150    pub pack: PackConfig,
151    pub port: u16,
152    pub refresh_interval: Duration,
153    pub routing: RoutingConfig,
154    pub admin: AdminAuth,
155    pub telemetry: Option<TelemetryCfg>,
156    pub secrets_backend: SecretsBackend,
157    pub wasi_policy: RunnerWasiPolicy,
158    pub resolved_config: ResolvedConfig,
159    pub trace: trace::TraceConfig,
160    pub validation: validate::ValidationConfig,
161}
162
163impl RunnerConfig {
164    /// Build a [`RunnerConfig`] from a resolved greentic-config and the provided binding files.
165    pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
166        if bindings.is_empty() {
167            anyhow::bail!("at least one gtbind file is required");
168        }
169        let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
170        if tenant_bindings.is_empty() {
171            anyhow::bail!("no gtbind files loaded");
172        }
173        let mut pack = pack_config_from(
174            &resolved_config.config.packs,
175            &resolved_config.config.paths,
176            &resolved_config.config.network,
177        )?;
178        maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
179        let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
180        let port = std::env::var("PORT")
181            .ok()
182            .and_then(|value| value.parse().ok())
183            .unwrap_or(8080);
184        let default_tenant = resolved_config
185            .config
186            .dev
187            .as_ref()
188            .map(|dev| dev.default_tenant.clone())
189            .unwrap_or_else(|| "demo".into());
190        let routing = RoutingConfig::from_env_with_default(default_tenant);
191        let paths = &resolved_config.config.paths;
192        ensure_paths_exist(paths)?;
193        let mut wasi_policy = default_wasi_policy(paths);
194        let mut env_allow = HashSet::new();
195        for binding in tenant_bindings.values() {
196            env_allow.extend(binding.env_passthrough.iter().cloned());
197        }
198        for key in env_allow {
199            wasi_policy = wasi_policy.allow_env(key);
200        }
201
202        let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
203            s.events
204                .as_ref()
205                .and_then(|svc| svc.headers.as_ref())
206                .and_then(|headers| headers.get("x-admin-token").cloned())
207        }));
208        let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
209        Ok(Self {
210            tenant_bindings,
211            pack,
212            port,
213            refresh_interval: refresh,
214            routing,
215            admin,
216            telemetry: telemetry_from(&resolved_config.config.telemetry),
217            secrets_backend,
218            wasi_policy,
219            resolved_config,
220            trace: trace::TraceConfig::from_env(),
221            validation: validate::ValidationConfig::from_env(),
222        })
223    }
224
225    /// Override the HTTP port used by the host server.
226    pub fn with_port(mut self, port: u16) -> Self {
227        self.port = port;
228        self
229    }
230
231    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
232        self.wasi_policy = policy;
233        self
234    }
235}
236
237fn maybe_write_gtbind_index(
238    tenant_bindings: &HashMap<String, TenantBindings>,
239    paths: &PathsConfig,
240    pack: &mut PackConfig,
241) -> Result<()> {
242    let mut uses_locators = false;
243    for binding in tenant_bindings.values() {
244        for pack_binding in &binding.packs {
245            if pack_binding.pack_locator.is_some() {
246                uses_locators = true;
247            }
248        }
249    }
250    if !uses_locators {
251        return Ok(());
252    }
253
254    let mut entries = serde_json::Map::new();
255    for binding in tenant_bindings.values() {
256        let mut packs = Vec::new();
257        for pack_binding in &binding.packs {
258            let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
259                anyhow::anyhow!(
260                    "gtbind {} missing pack_locator for pack {}",
261                    binding.tenant,
262                    pack_binding.pack_id
263                )
264            })?;
265            let (name, version_or_digest) =
266                pack_binding.pack_ref.split_once('@').ok_or_else(|| {
267                    anyhow::anyhow!(
268                        "gtbind {} invalid pack_ref {} (expected name@version)",
269                        binding.tenant,
270                        pack_binding.pack_ref
271                    )
272                })?;
273            if name != pack_binding.pack_id {
274                anyhow::bail!(
275                    "gtbind {} pack_ref {} does not match pack_id {}",
276                    binding.tenant,
277                    pack_binding.pack_ref,
278                    pack_binding.pack_id
279                );
280            }
281            let mut entry = serde_json::Map::new();
282            entry.insert("name".to_string(), json!(name));
283            if version_or_digest.contains(':') {
284                entry.insert("digest".to_string(), json!(version_or_digest));
285            } else {
286                entry.insert("version".to_string(), json!(version_or_digest));
287            }
288            entry.insert("locator".to_string(), json!(locator));
289            packs.push(serde_json::Value::Object(entry));
290        }
291        let main_pack = packs
292            .first()
293            .cloned()
294            .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
295        let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
296        entries.insert(
297            binding.tenant.clone(),
298            json!({
299                "main_pack": main_pack,
300                "overlays": overlays,
301            }),
302        );
303    }
304
305    let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
306    if let Some(parent) = index_path.parent() {
307        fs::create_dir_all(parent)
308            .with_context(|| format!("failed to create {}", parent.display()))?;
309    }
310    let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
311    fs::write(&index_path, serialized)
312        .with_context(|| format!("failed to write {}", index_path.display()))?;
313    pack.index_location = runner_core::env::IndexLocation::File(index_path);
314    Ok(())
315}
316
317fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
318    let raw = value.unwrap_or_else(|| "30s".into());
319    humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
320}
321
322fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
323    let mut policy = RunnerWasiPolicy::default()
324        .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
325        .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
326        .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
327        .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
328    policy = policy
329        .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
330        .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
331        .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
332    policy
333}
334
335fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
336    for dir in [
337        &paths.greentic_root,
338        &paths.state_dir,
339        &paths.cache_dir,
340        &paths.logs_dir,
341    ] {
342        fs::create_dir_all(dir)
343            .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
344    }
345    Ok(())
346}
347
348fn pack_config_from(
349    packs: &Option<PacksConfig>,
350    paths: &PathsConfig,
351    network: &NetworkConfig,
352) -> Result<PackConfig> {
353    if let Some(cfg) = packs {
354        let cache_dir = cfg.cache_dir.clone();
355        let index_location = match &cfg.source {
356            PackSourceConfig::LocalIndex { path } => {
357                runner_core::env::IndexLocation::File(path.clone())
358            }
359            PackSourceConfig::HttpIndex { url } => {
360                runner_core::env::IndexLocation::from_value(url)?
361            }
362            PackSourceConfig::OciRegistry { reference } => {
363                runner_core::env::IndexLocation::from_value(reference)?
364            }
365        };
366        let public_key = cfg
367            .trust
368            .as_ref()
369            .and_then(|trust| trust.public_keys.first().cloned());
370        return Ok(PackConfig {
371            source: runner_core::env::PackSource::Fs,
372            index_location,
373            cache_dir,
374            public_key,
375            network: Some(network.clone()),
376        });
377    }
378    let mut cfg = PackConfig::default_for_paths(paths)?;
379    cfg.network = Some(network.clone());
380    Ok(cfg)
381}
382
383fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
384    if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
385        return None;
386    }
387    let mut export = TelemetryExportConfig::json_default();
388    export.mode = match cfg.exporter {
389        TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
390        TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
391        TelemetryExporterKind::Gcp => ExportMode::GcpCloudTrace,
392        TelemetryExporterKind::Azure => ExportMode::AzureAppInsights,
393        TelemetryExporterKind::Aws => ExportMode::AwsXRay,
394        TelemetryExporterKind::None => return None,
395    };
396    export.endpoint = cfg.endpoint.clone();
397    export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
398    Some(TelemetryCfg {
399        config: greentic_telemetry::TelemetryConfig {
400            service_name: "greentic-runner".into(),
401        },
402        export,
403    })
404}
405
406/// Run the unified Greentic runner host until shutdown.
407pub async fn run(cfg: RunnerConfig) -> Result<()> {
408    let RunnerConfig {
409        tenant_bindings,
410        pack,
411        port,
412        refresh_interval,
413        routing,
414        admin,
415        telemetry,
416        secrets_backend,
417        wasi_policy,
418        resolved_config: _resolved_config,
419        trace,
420        validation,
421    } = cfg;
422
423    let mut builder = HostBuilder::new();
424    for bindings in tenant_bindings.into_values() {
425        let mut host_config = HostConfig::from_gtbind(bindings);
426        host_config.trace = trace.clone();
427        host_config.validation = validation.clone();
428        builder = builder.with_config(host_config);
429    }
430    if let Some(telemetry) = telemetry.clone() {
431        builder = builder.with_telemetry(telemetry);
432    }
433    builder = builder
434        .with_wasi_policy(wasi_policy.clone())
435        .with_secrets_manager(
436            secrets_backend
437                .build_manager()
438                .context("failed to initialise secrets backend")?,
439        );
440
441    let host = Arc::new(builder.build()?);
442    host.start().await?;
443
444    let (watcher, reload_handle) =
445        watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
446
447    let routing = TenantRouting::new(routing.clone());
448    let server = HostServer::new(
449        port,
450        host.active_packs(),
451        routing,
452        host.health_state(),
453        Some(reload_handle),
454        admin.clone(),
455    )?;
456
457    tokio::select! {
458        result = server.serve() => {
459            result?;
460        }
461        _ = signal::ctrl_c() => {
462            tracing::info!("received shutdown signal");
463        }
464    }
465
466    drop(watcher);
467    host.stop().await?;
468    Ok(())
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use crate::gtbind::{PackBinding, TenantBindings};
475    use greentic_config_types::PackTrustConfig;
476    use tempfile::TempDir;
477
478    fn paths(temp: &TempDir) -> PathsConfig {
479        PathsConfig {
480            greentic_root: temp.path().join("greentic"),
481            state_dir: temp.path().join("state"),
482            cache_dir: temp.path().join("cache"),
483            logs_dir: temp.path().join("logs"),
484        }
485    }
486
487    #[test]
488    fn parse_refresh_interval_uses_default_and_rejects_invalid_values() {
489        assert_eq!(
490            parse_refresh_interval(None).expect("default interval"),
491            Duration::from_secs(30)
492        );
493        assert!(parse_refresh_interval(Some("not-a-duration".into())).is_err());
494    }
495
496    #[test]
497    fn ensure_paths_exist_creates_expected_directories() {
498        let temp = TempDir::new().expect("tempdir");
499        let paths = paths(&temp);
500        ensure_paths_exist(&paths).expect("create directories");
501
502        assert!(paths.greentic_root.is_dir());
503        assert!(paths.state_dir.is_dir());
504        assert!(paths.cache_dir.is_dir());
505        assert!(paths.logs_dir.is_dir());
506    }
507
508    #[test]
509    fn maybe_write_gtbind_index_writes_locator_index() {
510        let temp = TempDir::new().expect("tempdir");
511        let paths = paths(&temp);
512        fs::create_dir_all(&paths.greentic_root).expect("greentic root");
513        let mut pack = PackConfig::default_for_paths(&paths).expect("pack config");
514        let tenant_bindings = HashMap::from([(
515            "demo".to_string(),
516            TenantBindings {
517                tenant: "demo".into(),
518                packs: vec![
519                    PackBinding {
520                        pack_id: "pack.main".into(),
521                        pack_ref: "pack.main@1.0.0".into(),
522                        pack_locator: Some("fs:///packs/main.gtpack".into()),
523                        flows: vec!["main".into()],
524                    },
525                    PackBinding {
526                        pack_id: "pack.overlay".into(),
527                        pack_ref: "pack.overlay@sha256:abcd".into(),
528                        pack_locator: Some("fs:///packs/overlay.gtpack".into()),
529                        flows: vec![],
530                    },
531                ],
532                env_passthrough: vec![],
533            },
534        )]);
535
536        maybe_write_gtbind_index(&tenant_bindings, &paths, &mut pack).expect("write gtbind index");
537
538        let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
539        let json: serde_json::Value =
540            serde_json::from_slice(&fs::read(&index_path).expect("read index")).expect("json");
541        assert_eq!(
542            json["demo"]["main_pack"]["locator"],
543            "fs:///packs/main.gtpack"
544        );
545        assert_eq!(json["demo"]["overlays"][0]["digest"], "sha256:abcd");
546        match pack.index_location {
547            runner_core::env::IndexLocation::File(path) => assert_eq!(path, index_path),
548            runner_core::env::IndexLocation::Remote(_) => panic!("expected generated file index"),
549        }
550    }
551
552    #[test]
553    fn pack_config_from_prefers_configured_pack_source() {
554        let temp = TempDir::new().expect("tempdir");
555        let paths = paths(&temp);
556        let packs = Some(PacksConfig {
557            source: PackSourceConfig::HttpIndex {
558                url: "https://example.com/index.json".into(),
559            },
560            cache_dir: temp.path().join("packs-cache"),
561            index_cache_ttl_secs: None,
562            trust: Some(PackTrustConfig {
563                public_keys: vec!["ed25519:test".into()],
564                require_signatures: true,
565            }),
566        });
567        let config = pack_config_from(&packs, &paths, &NetworkConfig::default())
568            .expect("pack config from packs");
569
570        match config.index_location {
571            runner_core::env::IndexLocation::Remote(url) => {
572                assert_eq!(url.as_str(), "https://example.com/index.json");
573            }
574            runner_core::env::IndexLocation::File(_) => panic!("expected remote index"),
575        }
576        assert_eq!(config.public_key.as_deref(), Some("ed25519:test"));
577        assert!(config.network.is_some());
578    }
579
580    #[test]
581    fn default_wasi_policy_sets_expected_env_and_preopens() {
582        let temp = TempDir::new().expect("tempdir");
583        let paths = paths(&temp);
584        let policy = default_wasi_policy(&paths);
585
586        assert_eq!(
587            policy.env_set.get("GREENTIC_ROOT"),
588            Some(&paths.greentic_root.display().to_string())
589        );
590        assert_eq!(policy.preopens.len(), 3);
591        assert_eq!(policy.preopens[0].guest_path, "/state");
592        assert_eq!(policy.preopens[1].guest_path, "/cache");
593        assert_eq!(policy.preopens[2].guest_path, "/logs");
594    }
595
596    #[test]
597    fn telemetry_from_is_disabled_without_exporter() {
598        assert!(telemetry_from(&TelemetryConfig::default()).is_none());
599    }
600}