1#![deny(unsafe_code)]
2use std::fs;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::secrets::SecretsBackend;
15use anyhow::{Context, Result, anyhow};
16use greentic_config::ResolvedConfig;
17use greentic_config_types::{
18 NetworkConfig, PackSourceConfig, PacksConfig, PathsConfig, TelemetryConfig,
19 TelemetryExporterKind,
20};
21#[cfg(feature = "telemetry")]
22use greentic_telemetry::export::{ExportConfig as TelemetryExportConfig, ExportMode, Sampling};
23use runner_core::env::PackConfig;
24use tokio::signal;
25
26pub mod boot;
27pub mod component_api;
28pub mod config;
29pub mod engine;
30pub mod http;
31pub mod ingress;
32pub mod pack;
33pub mod routing;
34pub mod runner;
35pub mod runtime;
36pub mod runtime_wasmtime;
37pub mod secrets;
38pub mod storage;
39pub mod telemetry;
40pub mod verify;
41pub mod wasi;
42pub mod watcher;
43
44mod activity;
45mod host;
46pub mod oauth;
47
48pub use activity::{Activity, ActivityKind};
49pub use config::HostConfig;
50pub use host::TelemetryCfg;
51pub use host::{HostBuilder, RunnerHost, TenantHandle};
52pub use wasi::{PreopenSpec, RunnerWasiPolicy};
53
54pub use greentic_types::{EnvId, FlowId, PackId, TenantCtx, TenantId};
55
56pub use http::auth::AdminAuth;
57pub use routing::RoutingConfig;
58use routing::TenantRouting;
59pub use runner::HostServer;
60
61#[derive(Clone)]
63pub struct RunnerConfig {
64 pub bindings: Vec<PathBuf>,
65 pub pack: PackConfig,
66 pub port: u16,
67 pub refresh_interval: Duration,
68 pub routing: RoutingConfig,
69 pub admin: AdminAuth,
70 pub telemetry: Option<TelemetryCfg>,
71 pub secrets_backend: SecretsBackend,
72 pub wasi_policy: RunnerWasiPolicy,
73 pub resolved_config: ResolvedConfig,
74}
75
76impl RunnerConfig {
77 pub fn from_config(resolved_config: ResolvedConfig, bindings: Vec<PathBuf>) -> Result<Self> {
79 if bindings.is_empty() {
80 anyhow::bail!("at least one bindings file is required");
81 }
82 let pack = pack_config_from(
83 &resolved_config.config.packs,
84 &resolved_config.config.paths,
85 &resolved_config.config.network,
86 )?;
87 let refresh = parse_refresh_interval(std::env::var("PACK_REFRESH_INTERVAL").ok())?;
88 let port = std::env::var("PORT")
89 .ok()
90 .and_then(|value| value.parse().ok())
91 .unwrap_or(8080);
92 let default_tenant = resolved_config
93 .config
94 .dev
95 .as_ref()
96 .map(|dev| dev.default_tenant.clone())
97 .unwrap_or_else(|| "demo".into());
98 let routing = RoutingConfig::from_env_with_default(default_tenant);
99 let paths = &resolved_config.config.paths;
100 ensure_paths_exist(paths)?;
101 let wasi_policy = default_wasi_policy(paths);
102
103 let admin = AdminAuth::new(resolved_config.config.services.as_ref().and_then(|s| {
104 s.events
105 .as_ref()
106 .and_then(|svc| svc.headers.as_ref())
107 .and_then(|headers| headers.get("x-admin-token").cloned())
108 }));
109 let secrets_backend = SecretsBackend::from_config(&resolved_config.config.secrets)?;
110 Ok(Self {
111 bindings,
112 pack,
113 port,
114 refresh_interval: refresh,
115 routing,
116 admin,
117 telemetry: telemetry_from(&resolved_config.config.telemetry),
118 secrets_backend,
119 wasi_policy,
120 resolved_config,
121 })
122 }
123
124 pub fn with_port(mut self, port: u16) -> Self {
126 self.port = port;
127 self
128 }
129
130 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
131 self.wasi_policy = policy;
132 self
133 }
134}
135
136fn parse_refresh_interval(value: Option<String>) -> Result<Duration> {
137 let raw = value.unwrap_or_else(|| "30s".into());
138 humantime::parse_duration(&raw).map_err(|err| anyhow!("invalid PACK_REFRESH_INTERVAL: {err}"))
139}
140
141fn default_wasi_policy(paths: &PathsConfig) -> RunnerWasiPolicy {
142 let mut policy = RunnerWasiPolicy::default()
143 .with_env("GREENTIC_ROOT", paths.greentic_root.display().to_string())
144 .with_env("GREENTIC_STATE_DIR", paths.state_dir.display().to_string())
145 .with_env("GREENTIC_CACHE_DIR", paths.cache_dir.display().to_string())
146 .with_env("GREENTIC_LOGS_DIR", paths.logs_dir.display().to_string());
147 policy = policy
148 .with_preopen(PreopenSpec::new(&paths.state_dir, "/state"))
149 .with_preopen(PreopenSpec::new(&paths.cache_dir, "/cache"))
150 .with_preopen(PreopenSpec::new(&paths.logs_dir, "/logs"));
151 policy
152}
153
154fn ensure_paths_exist(paths: &PathsConfig) -> Result<()> {
155 for dir in [
156 &paths.greentic_root,
157 &paths.state_dir,
158 &paths.cache_dir,
159 &paths.logs_dir,
160 ] {
161 fs::create_dir_all(dir)
162 .with_context(|| format!("failed to ensure directory {}", dir.display()))?;
163 }
164 Ok(())
165}
166
167fn pack_config_from(
168 packs: &Option<PacksConfig>,
169 paths: &PathsConfig,
170 network: &NetworkConfig,
171) -> Result<PackConfig> {
172 if let Some(cfg) = packs {
173 let cache_dir = cfg.cache_dir.clone();
174 let index_location = match &cfg.source {
175 PackSourceConfig::LocalIndex { path } => {
176 runner_core::env::IndexLocation::File(path.clone())
177 }
178 PackSourceConfig::HttpIndex { url } => {
179 runner_core::env::IndexLocation::from_value(url)?
180 }
181 PackSourceConfig::OciRegistry { reference } => {
182 runner_core::env::IndexLocation::from_value(reference)?
183 }
184 };
185 let public_key = cfg
186 .trust
187 .as_ref()
188 .and_then(|trust| trust.public_keys.first().cloned());
189 return Ok(PackConfig {
190 source: runner_core::env::PackSource::Fs,
191 index_location,
192 cache_dir,
193 public_key,
194 network: Some(network.clone()),
195 });
196 }
197 let mut cfg = PackConfig::default_for_paths(paths)?;
198 cfg.network = Some(network.clone());
199 Ok(cfg)
200}
201
202#[cfg(feature = "telemetry")]
203fn telemetry_from(cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
204 if !cfg.enabled || matches!(cfg.exporter, TelemetryExporterKind::None) {
205 return None;
206 }
207 let mut export = TelemetryExportConfig::json_default();
208 export.mode = match cfg.exporter {
209 TelemetryExporterKind::Otlp => ExportMode::OtlpGrpc,
210 TelemetryExporterKind::Stdout => ExportMode::JsonStdout,
211 TelemetryExporterKind::None => return None,
212 };
213 export.endpoint = cfg.endpoint.clone();
214 export.sampling = Sampling::TraceIdRatio(cfg.sampling as f64);
215 Some(TelemetryCfg {
216 config: greentic_telemetry::TelemetryConfig {
217 service_name: "greentic-runner".into(),
218 },
219 export,
220 })
221}
222
223#[cfg(not(feature = "telemetry"))]
224fn telemetry_from(_cfg: &TelemetryConfig) -> Option<TelemetryCfg> {
225 None
226}
227
228pub async fn run(cfg: RunnerConfig) -> Result<()> {
230 let RunnerConfig {
231 bindings,
232 pack,
233 port,
234 refresh_interval,
235 routing,
236 admin,
237 telemetry,
238 secrets_backend,
239 wasi_policy,
240 resolved_config: _resolved_config,
241 } = cfg;
242 #[cfg(not(feature = "telemetry"))]
243 let _ = telemetry;
244
245 let mut builder = HostBuilder::new();
246 for path in &bindings {
247 let host_config = HostConfig::load_from_path(path)
248 .with_context(|| format!("failed to load host bindings {}", path.display()))?;
249 builder = builder.with_config(host_config);
250 }
251 #[cfg(feature = "telemetry")]
252 if let Some(telemetry) = telemetry.clone() {
253 builder = builder.with_telemetry(telemetry);
254 }
255 builder = builder
256 .with_wasi_policy(wasi_policy.clone())
257 .with_secrets_manager(
258 secrets_backend
259 .build_manager()
260 .context("failed to initialise secrets backend")?,
261 );
262
263 let host = Arc::new(builder.build()?);
264 host.start().await?;
265
266 let (watcher, reload_handle) =
267 watcher::start_pack_watcher(Arc::clone(&host), pack.clone(), refresh_interval).await?;
268
269 let routing = TenantRouting::new(routing.clone());
270 let server = HostServer::new(
271 port,
272 host.active_packs(),
273 routing,
274 host.health_state(),
275 Some(reload_handle),
276 admin.clone(),
277 )?;
278
279 tokio::select! {
280 result = server.serve() => {
281 result?;
282 }
283 _ = signal::ctrl_c() => {
284 tracing::info!("received shutdown signal");
285 }
286 }
287
288 drop(watcher);
289 host.stop().await?;
290 Ok(())
291}