1#![deny(unsafe_code)]
2use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18#[cfg(feature = "telemetry")]
19use greentic_config_types::TelemetryExporterKind;
20use greentic_config_types::{
21 NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
22};
23#[cfg(feature = "telemetry")]
24use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
25use runner_core::env::PackConfig;
26use serde_json::json;
27use tokio::signal;
28
29pub mod boot;
30pub mod cache;
31pub mod component_api;
32pub mod config;
33pub mod engine;
34pub mod fault;
35pub mod gtbind;
36pub mod http;
37pub mod ingress;
38pub mod operator_metrics;
39pub mod operator_registry;
40pub mod pack;
41pub mod provider;
42pub mod provider_core;
43pub mod provider_core_only;
44pub mod routing;
45pub mod runner;
46pub mod runtime;
47pub mod runtime_wasmtime;
48pub mod secrets;
49pub mod storage;
50pub mod telemetry;
51#[cfg(feature = "fault-injection")]
52pub mod testing;
53pub mod trace;
54pub mod validate;
55pub mod verify;
56pub mod wasi;
57pub mod watcher;
58
59mod activity;
60mod host;
61pub mod oauth;
62
63pub use activity::{Activity, ActivityKind};
64pub use config::HostConfig;
65pub use gtbind::{PackBinding, TenantBindings};
66pub use host::TelemetryCfg;
67pub use host::{HostBuilder, RunnerHost, TenantHandle};
68pub use wasi::{PreopenSpec, RunnerWasiPolicy};
69
70pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
71
72pub use http::auth::AdminAuth;
73pub use routing::RoutingConfig;
74use routing::TenantRouting;
75pub use runner::HostServer;
76
77#[derive(Clone)]
79pub struct RunnerConfig {
80 pub tenant_bindings: HashMap<String, TenantBindings>,
81 pub pack: PackConfig,
82 pub port: u16,
83 pub refresh_interval: Duration,
84 pub routing: RoutingConfig,
85 pub admin: AdminAuth,
86 pub telemetry: Option<TelemetryCfg>,
87 pub secrets_backend: SecretsBackend,
88 pub wasi_policy: RunnerWasiPolicy,
89 pub resolved_config: ResolvedConfig,
90 pub trace: trace::TraceConfig,
91 pub validation: validate::ValidationConfig,
92}
93
94impl RunnerConfig {
95 pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
97 if bindings.is_empty() {
98 anyhow::bail!("at least one gtbind file is required");
99 }
100 let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
101 if tenant_bindings.is_empty() {
102 anyhow::bail!("no gtbind files loaded");
103 }
104 let mut pack = pack_config_from(
105 &resolved_config.config.packs,
106 &resolved_config.config.paths,
107 &resolved_config.config.network,
108 )?;
109 maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
110 let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
111 let port = std::env::var("PORT")
112 .ok()
113 .and_then(|value| value.parse().ok())
114 .unwrap_or(8080);
115 let default_tenant = resolved_config
116 .config
117 .dev
118 .as_ref()
119 .map(|dev| dev.default_tenant.clone())
120 .unwrap_or_else(|| "demo".into());
121 let routing = RoutingConfig::from_env_with_default(default_tenant);
122 let paths = &resolved_config.config.paths;
123 ensure_paths_exist(paths)?;
124 let mut wasi_policy = default_wasi_policy(paths);
125 let mut env_allow = HashSet::new();
126 for binding in tenant_bindings.values() {
127 env_allow.extend(binding.env_passthrough.iter().cloned());
128 }
129 for key in env_allow {
130 wasi_policy = wasi_policy.allow_env(key);
131 }
132
133 let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
134 s.events
135 .as_ref()
136 .and_then(|svc| svc.headers.as_ref())
137 .and_then(|headers| headers.get("x-admin-token").cloned())
138 }));
139 let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
140 Ok(Self {
141 tenant_bindings,
142 pack,
143 port,
144 refresh_interval: refresh,
145 routing,
146 admin,
147 telemetry: telemetry_from(&resolved_config.config.telemetry),
148 secrets_backend,
149 wasi_policy,
150 resolved_config,
151 trace: trace::TraceConfig::from_env(),
152 validation: validate::ValidationConfig::from_env(),
153 })
154 }
155
156 pub fn with_port(mut self, port: u16) -> Self {
158 self.port = port;
159 self
160 }
161
162 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
163 self.wasi_policy = policy;
164 self
165 }
166}
167
168fn maybe_write_gtbind_index(
169 tenant_bindings: &HashMap<String, TenantBindings>,
170 paths: &PathsConfig,
171 pack: &mut PackConfig,
172) -> Result<()> {
173 let mut uses_locators = false;
174 for binding in tenant_bindings.values() {
175 for pack_binding in &binding.packs {
176 if pack_binding.pack_locator.is_some() {
177 uses_locators = true;
178 }
179 }
180 }
181 if !uses_locators {
182 return Ok(());
183 }
184
185 let mut entries = serde_json::Map::new();
186 for binding in tenant_bindings.values() {
187 let mut packs = Vec::new();
188 for pack_binding in &binding.packs {
189 let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
190 anyhow::anyhow!(
191 "gtbind {} missing pack_locator for pack {}",
192 binding.tenant,
193 pack_binding.pack_id
194 )
195 })?;
196 let (name, version_or_digest) =
197 pack_binding.pack_ref.split_once('@').ok_or_else(|| {
198 anyhow::anyhow!(
199 "gtbind {} invalid pack_ref {} (expected name@version)",
200 binding.tenant,
201 pack_binding.pack_ref
202 )
203 })?;
204 if name != pack_binding.pack_id {
205 anyhow::bail!(
206 "gtbind {} pack_ref {} does not match pack_id {}",
207 binding.tenant,
208 pack_binding.pack_ref,
209 pack_binding.pack_id
210 );
211 }
212 let mut entry = serde_json::Map::new();
213 entry.insert("name".to_string(), json!(name));
214 if version_or_digest.contains(':') {
215 entry.insert("digest".to_string(), json!(version_or_digest));
216 } else {
217 entry.insert("version".to_string(), json!(version_or_digest));
218 }
219 entry.insert("locator".to_string(), json!(locator));
220 packs.push(serde_json::Value::Object(entry));
221 }
222 let main_pack = packs
223 .first()
224 .cloned()
225 .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
226 let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
227 entries.insert(
228 binding.tenant.clone(),
229 json!({
230 "main_pack": main_pack,
231 "overlays": overlays,
232 }),
233 );
234 }
235
236 let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
237 if let Some(parent) = index_path.parent() {
238 fs::create_dir_all(parent)
239 .with_context(|| format!("failed to create {}", parent.display()))?;
240 }
241 let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
242 fs::write(&index_path, serialized)
243 .with_context(|| format!("failed to write {}", index_path.display()))?;
244 pack.index_location = runner_core::env::IndexLocation::File(index_path);
245 Ok(())
246}
247
248fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
249 let raw = value.unwrap_or_else(|| "30s".into());
250 humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
251}
252
253fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
254 let mut policy = RunnerWasiPolicy::default()
255 .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
256 .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
257 .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
258 .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
259 policy = policy
260 .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
261 .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
262 .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
263 policy
264}
265
266fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
267 for dir in [
268 &paths.greentic_root,
269 &paths.state_dir,
270 &paths.cache_dir,
271 &paths.logs_dir,
272 ] {
273 fs::create_dir_all(dir)
274 .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
275 }
276 Ok(())
277}
278
279fn pack_config_from(
280 packs: &Option<PacksConfig>,
281 paths: &PathsConfig,
282 network: &NetworkConfig,
283) -> Result<PackConfig> {
284 if let Some(cfg) = packs {
285 let cache_dir = cfg.cache_dir.clone();
286 let index_location = match &cfg.source {
287 PackSourceConfig::LocalIndex { path } => {
288 runner_core::env::IndexLocation::File(path.clone())
289 }
290 PackSourceConfig::HttpIndex { url } => {
291 runner_core::env::IndexLocation::from_value(url)?
292 }
293 PackSourceConfig::OciRegistry { reference } => {
294 runner_core::env::IndexLocation::from_value(reference)?
295 }
296 };
297 let public_key = cfg
298 .trust
299 .as_ref()
300 .and_then(|trust| trust.public_keys.first().cloned());
301 return Ok(PackConfig {
302 source: runner_core::env::PackSource::Fs,
303 index_location,
304 cache_dir,
305 public_key,
306 network: Some(network.clone()),
307 });
308 }
309 let mut cfg = PackConfig::default_for_paths(paths)?;
310 cfg.network = Some(network.clone());
311 Ok(cfg)
312}
313
314#[cfg(feature = "telemetry")]
315fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
316 if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
317 return None;
318 }
319 let mut export = TelemetryExportConfig::json_default();
320 export.mode = match cfg.exporter {
321 TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
322 TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
323 TelemetryExporterKind::None => return None,
324 };
325 export.endpoint = cfg.endpoint.clone();
326 export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
327 Some(TelemetryCfg {
328 config: greentic_telemetry::TelemetryConfig {
329 service_name: "greentic-runner".into(),
330 },
331 export,
332 })
333}
334
335#[cfg(not(feature = "telemetry"))]
336fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
337 None
338}
339
340pub async fn run(cfg: RunnerConfig) -> Result<()> {
342 let RunnerConfig {
343 tenant_bindings,
344 pack,
345 port,
346 refresh_interval,
347 routing,
348 admin,
349 telemetry,
350 secrets_backend,
351 wasi_policy,
352 resolved_config: _resolved_config,
353 trace,
354 validation,
355 } = cfg;
356 #[cfg(not(feature = "telemetry"))]
357 let _ = telemetry;
358
359 let mut builder = HostBuilder::new();
360 for bindings in tenant_bindings.into_values() {
361 let mut host_config = HostConfig::from_gtbind(bindings);
362 host_config.trace = trace.clone();
363 host_config.validation = validation.clone();
364 builder = builder.with_config(host_config);
365 }
366 #[cfg(feature = "telemetry")]
367 if let Some(telemetry) = telemetry.clone() {
368 builder = builder.with_telemetry(telemetry);
369 }
370 builder = builder
371 .with_wasi_policy(wasi_policy.clone())
372 .with_secrets_manager(
373 secrets_backend
374 .build_manager()
375 .context("failed to initialise secrets backend")?,
376 );
377
378 let host = Arc::new(builder.build()?);
379 host.start().await?;
380
381 let (watcher, reload_handle) =
382 watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
383
384 let routing = TenantRouting::new(routing.clone());
385 let server = HostServer::new(
386 port,
387 host.active_packs(),
388 routing,
389 host.health_state(),
390 Some(reload_handle),
391 admin.clone(),
392 )?;
393
394 tokio::select! {
395 result = server.serve() => {
396 result?;
397 }
398 _ = signal::ctrl_c() => {
399 tracing::info!("received shutdown signal");
400 }
401 }
402
403 drop(watcher);
404 host.stop().await?;
405 Ok(())
406}