1#![deny(unsafe_code)]
2use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18use greentic_config_types::TelemetryExporterKind;
19use greentic_config_types::{
20 NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
21};
22use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
23use runner_core::env::PackConfig;
24use serde_json::json;
25use tokio::signal;
26
27pub mod boot;
28pub mod cache;
29pub mod component_api;
30pub mod config;
31pub mod engine;
32pub mod fault;
33#[cfg(feature = "greentic-x-provider")]
34pub mod greentic_x_provider;
35pub mod gtbind;
36pub mod http;
37pub mod identify_hint;
38pub mod metrics;
39pub mod operator_metrics;
40pub mod operator_registry;
41pub mod pack;
42pub mod provider;
43pub mod provider_core;
44pub mod provider_core_only;
45pub mod routing;
46pub mod runner;
47pub mod runtime;
48pub mod runtime_refs;
49pub mod runtime_wasmtime;
50pub mod secrets;
51pub mod storage;
52pub mod telemetry;
53pub mod telemetry_scan;
54#[cfg(feature = "fault-injection")]
55pub mod testing;
56pub mod trace;
57pub mod validate;
58pub mod verify;
59pub mod wasi;
60pub mod watcher;
61
62mod activity;
63mod host;
64pub mod oauth;
65
66pub use activity::{Activity, ActivityKind, WelcomeFlowHint};
67pub use config::HostConfig;
68pub use gtbind::{PackBinding, TenantBindings};
69pub use host::TelemetryCfg;
70pub use host::{HostBuilder, RunnerHost, TenantHandle};
71pub use wasi::{PreopenSpec, RunnerWasiPolicy};
72
73pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
74
75pub use http::auth::AdminAuth;
76pub use routing::RoutingConfig;
77use routing::TenantRouting;
78pub use runner::HostServer;
79
80#[cfg(test)]
81pub(crate) mod test_support {
82 use super::*;
83 use crate::config::{OperatorPolicy, SecretsPolicy};
84 use crate::runtime::TenantRuntime;
85 use crate::secrets::default_manager;
86 use crate::storage::{new_session_store, new_state_store, session_host_from, state_host_from};
87 use crate::trace::TraceConfig;
88 use crate::validate::ValidationConfig;
89 use tempfile::TempDir;
90
91 pub(crate) fn fixture_pack_path() -> PathBuf {
92 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
93 .join("../../examples/packs/demo.gtpack")
94 .canonicalize()
95 .expect("fixture pack path")
96 }
97
98 fn minimal_config(workspace: &std::path::Path) -> Result<Arc<HostConfig>> {
99 let bindings_path = workspace.join("bindings.yaml");
100 std::fs::write(
101 &bindings_path,
102 r#"
103tenant: demo
104flow_type_bindings: {}
105rate_limits: {}
106retry: {}
107timers: []
108"#,
109 )?;
110 let mut config =
111 HostConfig::load_from_path(&bindings_path).context("load minimal host bindings")?;
112 config.secrets_policy = SecretsPolicy::allow_all();
113 config.operator_policy = OperatorPolicy::allow_all();
114 config.trace = TraceConfig::from_env();
115 config.validation = ValidationConfig::from_env();
116 Ok(Arc::new(config))
117 }
118
119 pub(crate) async fn build_test_runtime() -> Result<(TempDir, Arc<TenantRuntime>)> {
120 let workspace = TempDir::new().context("temp workspace")?;
121 let config = minimal_config(workspace.path())?;
122 let session_store = new_session_store();
123 let session_host = session_host_from(Arc::clone(&session_store));
124 let state_store = new_state_store();
125 let state_host = state_host_from(Arc::clone(&state_store));
126 let secrets = default_manager()?;
127 let pack_path = fixture_pack_path();
128 let runtime = TenantRuntime::load(
129 &pack_path,
130 config,
131 None,
132 Some(&pack_path),
133 None,
134 Arc::new(RunnerWasiPolicy::new()),
135 session_host,
136 Arc::clone(&session_store),
137 Arc::clone(&state_store),
138 state_host,
139 secrets,
140 )
141 .await?;
142 Ok((workspace, runtime))
143 }
144}
145
146#[derive(Clone)]
148pub struct RunnerConfig {
149 pub tenant_bindings: HashMap<String, TenantBindings>,
150 pub pack: PackConfig,
151 pub port: u16,
152 pub refresh_interval: Duration,
153 pub routing: RoutingConfig,
154 pub admin: AdminAuth,
155 pub telemetry: Option<TelemetryCfg>,
156 pub secrets_backend: SecretsBackend,
157 pub wasi_policy: RunnerWasiPolicy,
158 pub resolved_config: ResolvedConfig,
159 pub trace: trace::TraceConfig,
160 pub validation: validate::ValidationConfig,
161}
162
163impl RunnerConfig {
164 pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
166 if bindings.is_empty() {
167 anyhow::bail!("at least one gtbind file is required");
168 }
169 let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
170 if tenant_bindings.is_empty() {
171 anyhow::bail!("no gtbind files loaded");
172 }
173 let mut pack = pack_config_from(
174 &resolved_config.config.packs,
175 &resolved_config.config.paths,
176 &resolved_config.config.network,
177 )?;
178 maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
179 let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
180 let port = std::env::var("PORT")
181 .ok()
182 .and_then(|value| value.parse().ok())
183 .unwrap_or(8080);
184 let default_tenant = resolved_config
185 .config
186 .dev
187 .as_ref()
188 .map(|dev| dev.default_tenant.clone())
189 .unwrap_or_else(|| "demo".into());
190 let routing = RoutingConfig::from_env_with_default(default_tenant);
191 let paths = &resolved_config.config.paths;
192 ensure_paths_exist(paths)?;
193 let mut wasi_policy = default_wasi_policy(paths);
194 let mut env_allow = HashSet::new();
195 for binding in tenant_bindings.values() {
196 env_allow.extend(binding.env_passthrough.iter().cloned());
197 }
198 for key in env_allow {
199 wasi_policy = wasi_policy.allow_env(key);
200 }
201
202 let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
203 s.events
204 .as_ref()
205 .and_then(|svc| svc.headers.as_ref())
206 .and_then(|headers| headers.get("x-admin-token").cloned())
207 }));
208 let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
209 Ok(Self {
210 tenant_bindings,
211 pack,
212 port,
213 refresh_interval: refresh,
214 routing,
215 admin,
216 telemetry: telemetry_from(&resolved_config.config.telemetry),
217 secrets_backend,
218 wasi_policy,
219 resolved_config,
220 trace: trace::TraceConfig::from_env(),
221 validation: validate::ValidationConfig::from_env(),
222 })
223 }
224
225 pub fn with_port(mut self, port: u16) -> Self {
227 self.port = port;
228 self
229 }
230
231 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
232 self.wasi_policy = policy;
233 self
234 }
235}
236
237fn maybe_write_gtbind_index(
238 tenant_bindings: &HashMap<String, TenantBindings>,
239 paths: &PathsConfig,
240 pack: &mut PackConfig,
241) -> Result<()> {
242 let mut uses_locators = false;
243 for binding in tenant_bindings.values() {
244 for pack_binding in &binding.packs {
245 if pack_binding.pack_locator.is_some() {
246 uses_locators = true;
247 }
248 }
249 }
250 if !uses_locators {
251 return Ok(());
252 }
253
254 let mut entries = serde_json::Map::new();
255 for binding in tenant_bindings.values() {
256 let mut packs = Vec::new();
257 for pack_binding in &binding.packs {
258 let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
259 anyhow::anyhow!(
260 "gtbind {} missing pack_locator for pack {}",
261 binding.tenant,
262 pack_binding.pack_id
263 )
264 })?;
265 let (name, version_or_digest) =
266 pack_binding.pack_ref.split_once('@').ok_or_else(|| {
267 anyhow::anyhow!(
268 "gtbind {} invalid pack_ref {} (expected name@version)",
269 binding.tenant,
270 pack_binding.pack_ref
271 )
272 })?;
273 if name != pack_binding.pack_id {
274 anyhow::bail!(
275 "gtbind {} pack_ref {} does not match pack_id {}",
276 binding.tenant,
277 pack_binding.pack_ref,
278 pack_binding.pack_id
279 );
280 }
281 let mut entry = serde_json::Map::new();
282 entry.insert("name".to_string(), json!(name));
283 if version_or_digest.contains(':') {
284 entry.insert("digest".to_string(), json!(version_or_digest));
285 } else {
286 entry.insert("version".to_string(), json!(version_or_digest));
287 }
288 entry.insert("locator".to_string(), json!(locator));
289 packs.push(serde_json::Value::Object(entry));
290 }
291 let main_pack = packs
292 .first()
293 .cloned()
294 .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
295 let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
296 entries.insert(
297 binding.tenant.clone(),
298 json!({
299 "main_pack": main_pack,
300 "overlays": overlays,
301 }),
302 );
303 }
304
305 let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
306 if let Some(parent) = index_path.parent() {
307 fs::create_dir_all(parent)
308 .with_context(|| format!("failed to create {}", parent.display()))?;
309 }
310 let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
311 fs::write(&index_path, serialized)
312 .with_context(|| format!("failed to write {}", index_path.display()))?;
313 pack.index_location = runner_core::env::IndexLocation::File(index_path);
314 Ok(())
315}
316
317fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
318 let raw = value.unwrap_or_else(|| "30s".into());
319 humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
320}
321
322fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
323 let mut policy = RunnerWasiPolicy::default()
324 .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
325 .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
326 .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
327 .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
328 policy = policy
329 .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
330 .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
331 .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
332 policy
333}
334
335fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
336 for dir in [
337 &paths.greentic_root,
338 &paths.state_dir,
339 &paths.cache_dir,
340 &paths.logs_dir,
341 ] {
342 fs::create_dir_all(dir)
343 .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
344 }
345 Ok(())
346}
347
348fn pack_config_from(
349 packs: &Option<PacksConfig>,
350 paths: &PathsConfig,
351 network: &NetworkConfig,
352) -> Result<PackConfig> {
353 if let Some(cfg) = packs {
354 let cache_dir = cfg.cache_dir.clone();
355 let index_location = match &cfg.source {
356 PackSourceConfig::LocalIndex { path } => {
357 runner_core::env::IndexLocation::File(path.clone())
358 }
359 PackSourceConfig::HttpIndex { url } => {
360 runner_core::env::IndexLocation::from_value(url)?
361 }
362 PackSourceConfig::OciRegistry { reference } => {
363 runner_core::env::IndexLocation::from_value(reference)?
364 }
365 };
366 let public_key = cfg
367 .trust
368 .as_ref()
369 .and_then(|trust| trust.public_keys.first().cloned());
370 return Ok(PackConfig {
371 source: runner_core::env::PackSource::Fs,
372 index_location,
373 cache_dir,
374 public_key,
375 network: Some(network.clone()),
376 });
377 }
378 let mut cfg = PackConfig::default_for_paths(paths)?;
379 cfg.network = Some(network.clone());
380 Ok(cfg)
381}
382
383fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
384 if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
385 return None;
386 }
387 let mut export = TelemetryExportConfig::json_default();
388 export.mode = match cfg.exporter {
389 TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
390 TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
391 TelemetryExporterKind::Gcp => ExportMode::GcpCloudTrace,
392 TelemetryExporterKind::Azure => ExportMode::AzureAppInsights,
393 TelemetryExporterKind::Aws => ExportMode::AwsXRay,
394 TelemetryExporterKind::None => return None,
395 };
396 export.endpoint = cfg.endpoint.clone();
397 export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
398 Some(TelemetryCfg {
399 config: greentic_telemetry::TelemetryConfig {
400 service_name: "greentic-runner".into(),
401 },
402 export,
403 })
404}
405
406pub async fn run(cfg: RunnerConfig) -> Result<()> {
408 let RunnerConfig {
409 tenant_bindings,
410 pack,
411 port,
412 refresh_interval,
413 routing,
414 admin,
415 telemetry,
416 secrets_backend,
417 wasi_policy,
418 resolved_config: _resolved_config,
419 trace,
420 validation,
421 } = cfg;
422
423 let mut builder = HostBuilder::new();
424 for bindings in tenant_bindings.into_values() {
425 let mut host_config = HostConfig::from_gtbind(bindings);
426 host_config.trace = trace.clone();
427 host_config.validation = validation.clone();
428 builder = builder.with_config(host_config);
429 }
430 if let Some(telemetry) = telemetry.clone() {
431 builder = builder.with_telemetry(telemetry);
432 }
433 builder = builder
434 .with_wasi_policy(wasi_policy.clone())
435 .with_secrets_manager(
436 secrets_backend
437 .build_manager()
438 .context("failed to initialise secrets backend")?,
439 );
440
441 let host = Arc::new(builder.build()?);
442 host.start().await?;
443
444 let (watcher, reload_handle) =
445 watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
446
447 let routing = TenantRouting::new(routing.clone());
448 let server = HostServer::new(
449 port,
450 host.active_packs(),
451 routing,
452 host.health_state(),
453 Some(reload_handle),
454 admin.clone(),
455 )?;
456
457 tokio::select! {
458 result = server.serve() => {
459 result?;
460 }
461 _ = signal::ctrl_c() => {
462 tracing::info!("received shutdown signal");
463 }
464 }
465
466 drop(watcher);
467 host.stop().await?;
468 Ok(())
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474 use crate::gtbind::{PackBinding, TenantBindings};
475 use greentic_config_types::PackTrustConfig;
476 use tempfile::TempDir;
477
478 fn paths(temp: &TempDir) -> PathsConfig {
479 PathsConfig {
480 greentic_root: temp.path().join("greentic"),
481 state_dir: temp.path().join("state"),
482 cache_dir: temp.path().join("cache"),
483 logs_dir: temp.path().join("logs"),
484 }
485 }
486
487 #[test]
488 fn parse_refresh_interval_uses_default_and_rejects_invalid_values() {
489 assert_eq!(
490 parse_refresh_interval(None).expect("default interval"),
491 Duration::from_secs(30)
492 );
493 assert!(parse_refresh_interval(Some("not-a-duration".into())).is_err());
494 }
495
496 #[test]
497 fn ensure_paths_exist_creates_expected_directories() {
498 let temp = TempDir::new().expect("tempdir");
499 let paths = paths(&temp);
500 ensure_paths_exist(&paths).expect("create directories");
501
502 assert!(paths.greentic_root.is_dir());
503 assert!(paths.state_dir.is_dir());
504 assert!(paths.cache_dir.is_dir());
505 assert!(paths.logs_dir.is_dir());
506 }
507
508 #[test]
509 fn maybe_write_gtbind_index_writes_locator_index() {
510 let temp = TempDir::new().expect("tempdir");
511 let paths = paths(&temp);
512 fs::create_dir_all(&paths.greentic_root).expect("greentic root");
513 let mut pack = PackConfig::default_for_paths(&paths).expect("pack config");
514 let tenant_bindings = HashMap::from([(
515 "demo".to_string(),
516 TenantBindings {
517 tenant: "demo".into(),
518 packs: vec![
519 PackBinding {
520 pack_id: "pack.main".into(),
521 pack_ref: "pack.main@1.0.0".into(),
522 pack_locator: Some("fs:///packs/main.gtpack".into()),
523 flows: vec!["main".into()],
524 },
525 PackBinding {
526 pack_id: "pack.overlay".into(),
527 pack_ref: "pack.overlay@sha256:abcd".into(),
528 pack_locator: Some("fs:///packs/overlay.gtpack".into()),
529 flows: vec![],
530 },
531 ],
532 env_passthrough: vec![],
533 },
534 )]);
535
536 maybe_write_gtbind_index(&tenant_bindings, &paths, &mut pack).expect("write gtbind index");
537
538 let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
539 let json: serde_json::Value =
540 serde_json::from_slice(&fs::read(&index_path).expect("read index")).expect("json");
541 assert_eq!(
542 json["demo"]["main_pack"]["locator"],
543 "fs:///packs/main.gtpack"
544 );
545 assert_eq!(json["demo"]["overlays"][0]["digest"], "sha256:abcd");
546 match pack.index_location {
547 runner_core::env::IndexLocation::File(path) => assert_eq!(path, index_path),
548 runner_core::env::IndexLocation::Remote(_) => panic!("expected generated file index"),
549 }
550 }
551
552 #[test]
553 fn pack_config_from_prefers_configured_pack_source() {
554 let temp = TempDir::new().expect("tempdir");
555 let paths = paths(&temp);
556 let packs = Some(PacksConfig {
557 source: PackSourceConfig::HttpIndex {
558 url: "https://example.com/index.json".into(),
559 },
560 cache_dir: temp.path().join("packs-cache"),
561 index_cache_ttl_secs: None,
562 trust: Some(PackTrustConfig {
563 public_keys: vec!["ed25519:test".into()],
564 require_signatures: true,
565 }),
566 });
567 let config = pack_config_from(&packs, &paths, &NetworkConfig::default())
568 .expect("pack config from packs");
569
570 match config.index_location {
571 runner_core::env::IndexLocation::Remote(url) => {
572 assert_eq!(url.as_str(), "https://example.com/index.json");
573 }
574 runner_core::env::IndexLocation::File(_) => panic!("expected remote index"),
575 }
576 assert_eq!(config.public_key.as_deref(), Some("ed25519:test"));
577 assert!(config.network.is_some());
578 }
579
580 #[test]
581 fn default_wasi_policy_sets_expected_env_and_preopens() {
582 let temp = TempDir::new().expect("tempdir");
583 let paths = paths(&temp);
584 let policy = default_wasi_policy(&paths);
585
586 assert_eq!(
587 policy.env_set.get("GREENTIC_ROOT"),
588 Some(&paths.greentic_root.display().to_string())
589 );
590 assert_eq!(policy.preopens.len(), 3);
591 assert_eq!(policy.preopens[0].guest_path, "/state");
592 assert_eq!(policy.preopens[1].guest_path, "/cache");
593 assert_eq!(policy.preopens[2].guest_path, "/logs");
594 }
595
596 #[test]
597 fn telemetry_from_is_disabled_without_exporter() {
598 assert!(telemetry_from(&TelemetryConfig::default()).is_none());
599 }
600}