1#![deny(unsafe_code)]
2use std::collections::{HashMap, HashSet};
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::secrets::SecretsBackend;
16use anyhow::{Context, Result, anyhow};
17use greentic_config::ResolvedConfig;
18#[cfg(feature = "telemetry")]
19use greentic_config_types::TelemetryExporterKind;
20use greentic_config_types::{
21 NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
22};
23#[cfg(feature = "telemetry")]
24use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
25use runner_core::env::PackConfig;
26use serde_json::json;
27use tokio::signal;
28
29pub mod boot;
30pub mod cache;
31pub mod component_api;
32pub mod config;
33pub mod engine;
34pub mod gtbind;
35pub mod http;
36pub mod ingress;
37pub mod pack;
38pub mod provider;
39pub mod provider_core;
40pub mod provider_core_only;
41pub mod routing;
42pub mod runner;
43pub mod runtime;
44pub mod runtime_wasmtime;
45pub mod secrets;
46pub mod storage;
47pub mod telemetry;
48pub mod verify;
49pub mod wasi;
50pub mod watcher;
51
52mod activity;
53mod host;
54pub mod oauth;
55
56pub use activity::{Activity, ActivityKind};
57pub use config::HostConfig;
58pub use gtbind::{PackBinding, TenantBindings};
59pub use host::TelemetryCfg;
60pub use host::{HostBuilder, RunnerHost, TenantHandle};
61pub use wasi::{PreopenSpec, RunnerWasiPolicy};
62
63pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
64
65pub use http::auth::AdminAuth;
66pub use routing::RoutingConfig;
67use routing::TenantRouting;
68pub use runner::HostServer;
69
70#[derive(Clone)]
72pub struct RunnerConfig {
73 pub tenant_bindings: HashMap<String, TenantBindings>,
74 pub pack: PackConfig,
75 pub port: u16,
76 pub refresh_interval: Duration,
77 pub routing: RoutingConfig,
78 pub admin: AdminAuth,
79 pub telemetry: Option<TelemetryCfg>,
80 pub secrets_backend: SecretsBackend,
81 pub wasi_policy: RunnerWasiPolicy,
82 pub resolved_config: ResolvedConfig,
83}
84
85impl RunnerConfig {
86 pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
88 if bindings.is_empty() {
89 anyhow::bail!("at least one gtbind file is required");
90 }
91 let tenant_bindings = gtbind::load_gtbinds(&bindings)?;
92 if tenant_bindings.is_empty() {
93 anyhow::bail!("no gtbind files loaded");
94 }
95 let mut pack = pack_config_from(
96 &resolved_config.config.packs,
97 &resolved_config.config.paths,
98 &resolved_config.config.network,
99 )?;
100 maybe_write_gtbind_index(&tenant_bindings, &resolved_config.config.paths, &mut pack)?;
101 let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
102 let port = std::env::var("PORT")
103 .ok()
104 .and_then(|value| value.parse().ok())
105 .unwrap_or(8080);
106 let default_tenant = resolved_config
107 .config
108 .dev
109 .as_ref()
110 .map(|dev| dev.default_tenant.clone())
111 .unwrap_or_else(|| "demo".into());
112 let routing = RoutingConfig::from_env_with_default(default_tenant);
113 let paths = &resolved_config.config.paths;
114 ensure_paths_exist(paths)?;
115 let mut wasi_policy = default_wasi_policy(paths);
116 let mut env_allow = HashSet::new();
117 for binding in tenant_bindings.values() {
118 env_allow.extend(binding.env_passthrough.iter().cloned());
119 }
120 for key in env_allow {
121 wasi_policy = wasi_policy.allow_env(key);
122 }
123
124 let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
125 s.events
126 .as_ref()
127 .and_then(|svc| svc.headers.as_ref())
128 .and_then(|headers| headers.get("x-admin-token").cloned())
129 }));
130 let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
131 Ok(Self {
132 tenant_bindings,
133 pack,
134 port,
135 refresh_interval: refresh,
136 routing,
137 admin,
138 telemetry: telemetry_from(&resolved_config.config.telemetry),
139 secrets_backend,
140 wasi_policy,
141 resolved_config,
142 })
143 }
144
145 pub fn with_port(mut self, port: u16) -> Self {
147 self.port = port;
148 self
149 }
150
151 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
152 self.wasi_policy = policy;
153 self
154 }
155}
156
157fn maybe_write_gtbind_index(
158 tenant_bindings: &HashMap<String, TenantBindings>,
159 paths: &PathsConfig,
160 pack: &mut PackConfig,
161) -> Result<()> {
162 let mut uses_locators = false;
163 for binding in tenant_bindings.values() {
164 for pack_binding in &binding.packs {
165 if pack_binding.pack_locator.is_some() {
166 uses_locators = true;
167 }
168 }
169 }
170 if !uses_locators {
171 return Ok(());
172 }
173
174 let mut entries = serde_json::Map::new();
175 for binding in tenant_bindings.values() {
176 let mut packs = Vec::new();
177 for pack_binding in &binding.packs {
178 let locator = pack_binding.pack_locator.as_ref().ok_or_else(|| {
179 anyhow::anyhow!(
180 "gtbind {} missing pack_locator for pack {}",
181 binding.tenant,
182 pack_binding.pack_id
183 )
184 })?;
185 let (name, version_or_digest) =
186 pack_binding.pack_ref.split_once('@').ok_or_else(|| {
187 anyhow::anyhow!(
188 "gtbind {} invalid pack_ref {} (expected name@version)",
189 binding.tenant,
190 pack_binding.pack_ref
191 )
192 })?;
193 if name != pack_binding.pack_id {
194 anyhow::bail!(
195 "gtbind {} pack_ref {} does not match pack_id {}",
196 binding.tenant,
197 pack_binding.pack_ref,
198 pack_binding.pack_id
199 );
200 }
201 let mut entry = serde_json::Map::new();
202 entry.insert("name".to_string(), json!(name));
203 if version_or_digest.contains(':') {
204 entry.insert("digest".to_string(), json!(version_or_digest));
205 } else {
206 entry.insert("version".to_string(), json!(version_or_digest));
207 }
208 entry.insert("locator".to_string(), json!(locator));
209 packs.push(serde_json::Value::Object(entry));
210 }
211 let main_pack = packs
212 .first()
213 .cloned()
214 .ok_or_else(|| anyhow::anyhow!("gtbind {} has no packs", binding.tenant))?;
215 let overlays = packs.into_iter().skip(1).collect::<Vec<_>>();
216 entries.insert(
217 binding.tenant.clone(),
218 json!({
219 "main_pack": main_pack,
220 "overlays": overlays,
221 }),
222 );
223 }
224
225 let index_path = paths.greentic_root.join("packs").join("gtbind.index.json");
226 if let Some(parent) = index_path.parent() {
227 fs::create_dir_all(parent)
228 .with_context(|| format!("failed to create {}", parent.display()))?;
229 }
230 let serialized = serde_json::to_vec_pretty(&serde_json::Value::Object(entries))?;
231 fs::write(&index_path, serialized)
232 .with_context(|| format!("failed to write {}", index_path.display()))?;
233 pack.index_location = runner_core::env::IndexLocation::File(index_path);
234 Ok(())
235}
236
237fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
238 let raw = value.unwrap_or_else(|| "30s".into());
239 humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
240}
241
242fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
243 let mut policy = RunnerWasiPolicy::default()
244 .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
245 .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
246 .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
247 .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
248 policy = policy
249 .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
250 .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
251 .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
252 policy
253}
254
255fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
256 for dir in [
257 &paths.greentic_root,
258 &paths.state_dir,
259 &paths.cache_dir,
260 &paths.logs_dir,
261 ] {
262 fs::create_dir_all(dir)
263 .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
264 }
265 Ok(())
266}
267
268fn pack_config_from(
269 packs: &Option<PacksConfig>,
270 paths: &PathsConfig,
271 network: &NetworkConfig,
272) -> Result<PackConfig> {
273 if let Some(cfg) = packs {
274 let cache_dir = cfg.cache_dir.clone();
275 let index_location = match &cfg.source {
276 PackSourceConfig::LocalIndex { path } => {
277 runner_core::env::IndexLocation::File(path.clone())
278 }
279 PackSourceConfig::HttpIndex { url } => {
280 runner_core::env::IndexLocation::from_value(url)?
281 }
282 PackSourceConfig::OciRegistry { reference } => {
283 runner_core::env::IndexLocation::from_value(reference)?
284 }
285 };
286 let public_key = cfg
287 .trust
288 .as_ref()
289 .and_then(|trust| trust.public_keys.first().cloned());
290 return Ok(PackConfig {
291 source: runner_core::env::PackSource::Fs,
292 index_location,
293 cache_dir,
294 public_key,
295 network: Some(network.clone()),
296 });
297 }
298 let mut cfg = PackConfig::default_for_paths(paths)?;
299 cfg.network = Some(network.clone());
300 Ok(cfg)
301}
302
303#[cfg(feature = "telemetry")]
304fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
305 if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
306 return None;
307 }
308 let mut export = TelemetryExportConfig::json_default();
309 export.mode = match cfg.exporter {
310 TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
311 TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
312 TelemetryExporterKind::None => return None,
313 };
314 export.endpoint = cfg.endpoint.clone();
315 export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
316 Some(TelemetryCfg {
317 config: greentic_telemetry::TelemetryConfig {
318 service_name: "greentic-runner".into(),
319 },
320 export,
321 })
322}
323
324#[cfg(not(feature = "telemetry"))]
325fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
326 None
327}
328
329pub async fn run(cfg: RunnerConfig) -> Result<()> {
331 let RunnerConfig {
332 tenant_bindings,
333 pack,
334 port,
335 refresh_interval,
336 routing,
337 admin,
338 telemetry,
339 secrets_backend,
340 wasi_policy,
341 resolved_config: _resolved_config,
342 } = cfg;
343 #[cfg(not(feature = "telemetry"))]
344 let _ = telemetry;
345
346 let mut builder = HostBuilder::new();
347 for bindings in tenant_bindings.into_values() {
348 let host_config = HostConfig::from_gtbind(bindings);
349 builder = builder.with_config(host_config);
350 }
351 #[cfg(feature = "telemetry")]
352 if let Some(telemetry) = telemetry.clone() {
353 builder = builder.with_telemetry(telemetry);
354 }
355 builder = builder
356 .with_wasi_policy(wasi_policy.clone())
357 .with_secrets_manager(
358 secrets_backend
359 .build_manager()
360 .context("failed to initialise secrets backend")?,
361 );
362
363 let host = Arc::new(builder.build()?);
364 host.start().await?;
365
366 let (watcher, reload_handle) =
367 watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
368
369 let routing = TenantRouting::new(routing.clone());
370 let server = HostServer::new(
371 port,
372 host.active_packs(),
373 routing,
374 host.health_state(),
375 Some(reload_handle),
376 admin.clone(),
377 )?;
378
379 tokio::select! {
380 result = server.serve() => {
381 result?;
382 }
383 _ = signal::ctrl_c() => {
384 tracing::info!("received shutdown signal");
385 }
386 }
387
388 drop(watcher);
389 host.stop().await?;
390 Ok(())
391}