1#![deny(unsafe_code)]
2use std::fs;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::secrets::SecretsBackend;
15use anyhow::{Context, Result, anyhow};
16use greentic_config::ResolvedConfig;
17#[cfg(feature = "telemetry")]
18use greentic_config_types::TelemetryExporterKind;
19use greentic_config_types::{
20 NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
21};
22#[cfg(feature = "telemetry")]
23use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
24use runner_core::env::PackConfig;
25use tokio::signal;
26
27pub mod boot;
28pub mod component_api;
29pub mod config;
30pub mod engine;
31pub mod http;
32pub mod ingress;
33pub mod pack;
34pub mod routing;
35pub mod runner;
36pub mod runtime;
37pub mod runtime_wasmtime;
38pub mod secrets;
39pub mod storage;
40pub mod telemetry;
41pub mod verify;
42pub mod wasi;
43pub mod watcher;
44
45mod activity;
46mod host;
47pub mod oauth;
48
49pub use activity::{Activity, ActivityKind};
50pub use config::HostConfig;
51pub use host::TelemetryCfg;
52pub use host::{HostBuilder, RunnerHost, TenantHandle};
53pub use wasi::{PreopenSpec, RunnerWasiPolicy};
54
55pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
56
57pub use http::auth::AdminAuth;
58pub use routing::RoutingConfig;
59use routing::TenantRouting;
60pub use runner::HostServer;
61
62#[derive(Clone)]
64pub struct RunnerConfig {
65 pub bindings: Vec<PathBuf>,
66 pub pack: PackConfig,
67 pub port: u16,
68 pub refresh_interval: Duration,
69 pub routing: RoutingConfig,
70 pub admin: AdminAuth,
71 pub telemetry: Option<TelemetryCfg>,
72 pub secrets_backend: SecretsBackend,
73 pub wasi_policy: RunnerWasiPolicy,
74 pub resolved_config: ResolvedConfig,
75}
76
77impl RunnerConfig {
78 pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
80 if bindings.is_empty() {
81 anyhow::bail!("at least one bindings file is required");
82 }
83 let pack = pack_config_from(
84 &resolved_config.config.packs,
85 &resolved_config.config.paths,
86 &resolved_config.config.network,
87 )?;
88 let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
89 let port = std::env::var("PORT")
90 .ok()
91 .and_then(|value| value.parse().ok())
92 .unwrap_or(8080);
93 let default_tenant = resolved_config
94 .config
95 .dev
96 .as_ref()
97 .map(|dev| dev.default_tenant.clone())
98 .unwrap_or_else(|| "demo".into());
99 let routing = RoutingConfig::from_env_with_default(default_tenant);
100 let paths = &resolved_config.config.paths;
101 ensure_paths_exist(paths)?;
102 let wasi_policy = default_wasi_policy(paths);
103
104 let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
105 s.events
106 .as_ref()
107 .and_then(|svc| svc.headers.as_ref())
108 .and_then(|headers| headers.get("x-admin-token").cloned())
109 }));
110 let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
111 Ok(Self {
112 bindings,
113 pack,
114 port,
115 refresh_interval: refresh,
116 routing,
117 admin,
118 telemetry: telemetry_from(&resolved_config.config.telemetry),
119 secrets_backend,
120 wasi_policy,
121 resolved_config,
122 })
123 }
124
125 pub fn with_port(mut self, port: u16) -> Self {
127 self.port = port;
128 self
129 }
130
131 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
132 self.wasi_policy = policy;
133 self
134 }
135}
136
137fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
138 let raw = value.unwrap_or_else(|| "30s".into());
139 humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
140}
141
142fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
143 let mut policy = RunnerWasiPolicy::default()
144 .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
145 .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
146 .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
147 .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
148 policy = policy
149 .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
150 .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
151 .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
152 policy
153}
154
155fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
156 for dir in [
157 &paths.greentic_root,
158 &paths.state_dir,
159 &paths.cache_dir,
160 &paths.logs_dir,
161 ] {
162 fs::create_dir_all(dir)
163 .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
164 }
165 Ok(())
166}
167
168fn pack_config_from(
169 packs: &Option<PacksConfig>,
170 paths: &PathsConfig,
171 network: &NetworkConfig,
172) -> Result<PackConfig> {
173 if let Some(cfg) = packs {
174 let cache_dir = cfg.cache_dir.clone();
175 let index_location = match &cfg.source {
176 PackSourceConfig::LocalIndex { path } => {
177 runner_core::env::IndexLocation::File(path.clone())
178 }
179 PackSourceConfig::HttpIndex { url } => {
180 runner_core::env::IndexLocation::from_value(url)?
181 }
182 PackSourceConfig::OciRegistry { reference } => {
183 runner_core::env::IndexLocation::from_value(reference)?
184 }
185 };
186 let public_key = cfg
187 .trust
188 .as_ref()
189 .and_then(|trust| trust.public_keys.first().cloned());
190 return Ok(PackConfig {
191 source: runner_core::env::PackSource::Fs,
192 index_location,
193 cache_dir,
194 public_key,
195 network: Some(network.clone()),
196 });
197 }
198 let mut cfg = PackConfig::default_for_paths(paths)?;
199 cfg.network = Some(network.clone());
200 Ok(cfg)
201}
202
203#[cfg(feature = "telemetry")]
204fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
205 if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
206 return None;
207 }
208 let mut export = TelemetryExportConfig::json_default();
209 export.mode = match cfg.exporter {
210 TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
211 TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
212 TelemetryExporterKind::None => return None,
213 };
214 export.endpoint = cfg.endpoint.clone();
215 export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
216 Some(TelemetryCfg {
217 config: greentic_telemetry::TelemetryConfig {
218 service_name: "greentic-runner".into(),
219 },
220 export,
221 })
222}
223
224#[cfg(not(feature = "telemetry"))]
225fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
226 None
227}
228
229pub async fn run(cfg: RunnerConfig) -> Result<()> {
231 let RunnerConfig {
232 bindings,
233 pack,
234 port,
235 refresh_interval,
236 routing,
237 admin,
238 telemetry,
239 secrets_backend,
240 wasi_policy,
241 resolved_config: _resolved_config,
242 } = cfg;
243 #[cfg(not(feature = "telemetry"))]
244 let _ = telemetry;
245
246 let mut builder = HostBuilder::new();
247 for path in &bindings {
248 let host_config = HostConfig::load_from_path(path)
249 .with_context(|| format!("failed to load host bindings {}", path.display()))?;
250 builder = builder.with_config(host_config);
251 }
252 #[cfg(feature = "telemetry")]
253 if let Some(telemetry) = telemetry.clone() {
254 builder = builder.with_telemetry(telemetry);
255 }
256 builder = builder
257 .with_wasi_policy(wasi_policy.clone())
258 .with_secrets_manager(
259 secrets_backend
260 .build_manager()
261 .context("failed to initialise secrets backend")?,
262 );
263
264 let host = Arc::new(builder.build()?);
265 host.start().await?;
266
267 let (watcher, reload_handle) =
268 watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
269
270 let routing = TenantRouting::new(routing.clone());
271 let server = HostServer::new(
272 port,
273 host.active_packs(),
274 routing,
275 host.health_state(),
276 Some(reload_handle),
277 admin.clone(),
278 )?;
279
280 tokio::select! {
281 result = server.serve() => {
282 result?;
283 }
284 _ = signal::ctrl_c() => {
285 tracing::info!("received shutdown signal");
286 }
287 }
288
289 drop(watcher);
290 host.stop().await?;
291 Ok(())
292}