1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::secrets::{DynSecretsManager, default_manager};
19use crate::storage::{
20 DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
21 state_host_from,
22};
23use crate::wasi::RunnerWasiPolicy;
24
25#[cfg(feature = "telemetry")]
26#[derive(Clone, Debug)]
27pub struct TelemetryCfg {
28 pub config: greentic_telemetry::TelemetryConfig,
29 pub export: greentic_telemetry::export::ExportConfig,
30}
31#[cfg(not(feature = "telemetry"))]
32#[derive(Clone, Debug)]
33pub struct TelemetryCfg;
34
35pub struct HostBuilder {
37 configs: HashMap<String, HostConfig>,
38 #[cfg(feature = "telemetry")]
39 telemetry: Option<TelemetryCfg>,
40 wasi_policy: RunnerWasiPolicy,
41 secrets: Option<DynSecretsManager>,
42}
43
44impl HostBuilder {
45 pub fn new() -> Self {
46 Self {
47 configs: HashMap::new(),
48 #[cfg(feature = "telemetry")]
49 telemetry: None,
50 wasi_policy: RunnerWasiPolicy::default(),
51 secrets: None,
52 }
53 }
54
55 pub fn with_config(mut self, config: HostConfig) -> Self {
56 self.configs.insert(config.tenant.clone(), config);
57 self
58 }
59
60 #[cfg(feature = "telemetry")]
61 pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
62 self.telemetry = Some(telemetry);
63 self
64 }
65
66 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
67 self.wasi_policy = policy;
68 self
69 }
70
71 pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
72 self.secrets = Some(manager);
73 self
74 }
75
76 pub fn build(self) -> Result<RunnerHost> {
77 if self.configs.is_empty() {
78 bail!("at least one tenant configuration is required");
79 }
80 let wasi_policy = Arc::new(self.wasi_policy);
81 let configs = self
82 .configs
83 .into_iter()
84 .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
85 .collect();
86 let session_store = new_session_store();
87 let session_host = session_host_from(Arc::clone(&session_store));
88 let state_store = new_state_store();
89 let state_host = state_host_from(Arc::clone(&state_store));
90 let secrets = match self.secrets {
91 Some(manager) => manager,
92 None => default_manager().context("failed to initialise default secrets backend")?,
93 };
94 Ok(RunnerHost {
95 configs,
96 active: Arc::new(ActivePacks::new()),
97 health: Arc::new(HealthState::new()),
98 session_store,
99 state_store,
100 session_host,
101 state_host,
102 wasi_policy,
103 secrets_manager: secrets,
104 #[cfg(feature = "telemetry")]
105 telemetry: self.telemetry,
106 })
107 }
108}
109
110impl Default for HostBuilder {
111 fn default() -> Self {
112 Self::new()
113 }
114}
115
116pub struct RunnerHost {
118 configs: HashMap<String, Arc<HostConfig>>,
119 active: Arc<ActivePacks>,
120 health: Arc<HealthState>,
121 session_store: DynSessionStore,
122 state_store: DynStateStore,
123 session_host: Arc<dyn SessionHost>,
124 state_host: Arc<dyn StateHost>,
125 wasi_policy: Arc<RunnerWasiPolicy>,
126 secrets_manager: DynSecretsManager,
127 #[cfg(feature = "telemetry")]
128 telemetry: Option<TelemetryCfg>,
129}
130
131#[derive(Clone)]
133pub struct TenantHandle {
134 runtime: Arc<TenantRuntime>,
135}
136
137impl RunnerHost {
138 pub async fn start(&self) -> Result<()> {
139 #[cfg(feature = "telemetry")]
140 {
141 boot::init(&self.health, self.telemetry.as_ref())?;
142 }
143 #[cfg(not(feature = "telemetry"))]
144 {
145 boot::init(&self.health, None)?;
146 }
147 Ok(())
148 }
149
150 pub async fn stop(&self) -> Result<()> {
151 self.active.replace(HashMap::new());
152 Ok(())
153 }
154
155 pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
156 let archive_source = if is_pack_archive(pack_path) {
157 Some(pack_path)
158 } else {
159 None
160 };
161 let runtime = self
162 .prepare_runtime(tenant, pack_path, archive_source)
163 .await
164 .with_context(|| format!("failed to load tenant {tenant}"))?;
165 let mut next = (*self.active.snapshot()).clone();
166 next.insert(tenant.to_string(), runtime);
167 self.active.replace(next);
168 tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
169 Ok(())
170 }
171
172 pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
173 let runtime = self
174 .active
175 .load(tenant)
176 .with_context(|| format!("tenant {tenant} not loaded"))?;
177 let (pack_id, flow_id) = resolve_flow_id(&runtime, &activity)?;
178 let action = activity.action().map(|value| value.to_string());
179 let session = activity.session_id().map(|value| value.to_string());
180 let provider = activity.provider_id().map(|value| value.to_string());
181 let channel = activity.channel().map(|value| value.to_string());
182 let conversation = activity.conversation().map(|value| value.to_string());
183 let user = activity.user().map(|value| value.to_string());
184 let flow_type = activity
185 .flow_type()
186 .map(|value| value.to_string())
187 .or_else(|| {
188 runtime
189 .engine()
190 .flow_by_key(&pack_id, &flow_id)
191 .map(|desc| desc.flow_type.clone())
192 });
193 let payload = activity.into_payload();
194
195 let envelope = IngressEnvelope {
196 tenant: tenant.to_string(),
197 env: std::env::var("GREENTIC_ENV").ok(),
198 pack_id: Some(pack_id.clone()),
199 flow_id: flow_id.clone(),
200 flow_type,
201 action,
202 session_hint: session,
203 provider,
204 channel,
205 conversation,
206 user,
207 activity_id: None,
208 timestamp: None,
209 payload,
210 metadata: None,
211 reply_scope: None,
212 }
213 .canonicalize();
214
215 let result = runtime.state_machine().handle(envelope).await?;
216 Ok(normalize_replies(result, tenant))
217 }
218
219 pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
220 self.active
221 .load(tenant)
222 .map(|runtime| TenantHandle { runtime })
223 }
224
225 pub fn active_packs(&self) -> Arc<ActivePacks> {
226 Arc::clone(&self.active)
227 }
228
229 pub fn health_state(&self) -> Arc<HealthState> {
230 Arc::clone(&self.health)
231 }
232
233 pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
234 Arc::clone(&self.wasi_policy)
235 }
236
237 pub fn session_store(&self) -> DynSessionStore {
238 Arc::clone(&self.session_store)
239 }
240
241 pub fn state_store(&self) -> DynStateStore {
242 Arc::clone(&self.state_store)
243 }
244
245 pub fn session_host(&self) -> Arc<dyn SessionHost> {
246 Arc::clone(&self.session_host)
247 }
248
249 pub fn state_host(&self) -> Arc<dyn StateHost> {
250 Arc::clone(&self.state_host)
251 }
252
253 pub fn secrets_manager(&self) -> DynSecretsManager {
254 Arc::clone(&self.secrets_manager)
255 }
256
257 pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
258 self.configs.clone()
259 }
260
261 async fn prepare_runtime(
262 &self,
263 tenant: &str,
264 pack_path: &Path,
265 archive_source: Option<&Path>,
266 ) -> Result<Arc<TenantRuntime>> {
267 let config = self
268 .configs
269 .get(tenant)
270 .cloned()
271 .with_context(|| format!("tenant {tenant} not registered"))?;
272 if config.tenant != tenant {
273 bail!(
274 "tenant mismatch: config declares '{}' but '{tenant}' was requested",
275 config.tenant
276 );
277 }
278 let runtime = TenantRuntime::load(
279 pack_path,
280 Arc::clone(&config),
281 None,
282 archive_source,
283 None,
284 self.wasi_policy(),
285 self.session_host(),
286 self.session_store(),
287 self.state_store(),
288 self.state_host(),
289 self.secrets_manager(),
290 )
291 .await?;
292 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
293 runtime.register_timers(timers);
294 Ok(runtime)
295 }
296}
297
298impl TenantHandle {
299 pub fn config(&self) -> Arc<HostConfig> {
300 Arc::clone(self.runtime.config())
301 }
302
303 pub fn pack(&self) -> Arc<PackRuntime> {
304 self.runtime.pack()
305 }
306
307 pub fn engine(&self) -> Arc<FlowEngine> {
308 Arc::clone(self.runtime.engine())
309 }
310
311 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
312 self.runtime.overlays()
313 }
314
315 pub fn overlay_digests(&self) -> Vec<Option<String>> {
316 self.runtime.overlay_digests()
317 }
318}
319
320fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<(String, String)> {
321 let engine = runtime.engine();
322 if let Some(flow_id) = activity.flow_id() {
323 if let Some(pack_id) = activity.pack_id() {
324 if engine.flow_by_key(pack_id, flow_id).is_none() {
325 bail!("flow {flow_id} not registered for pack {pack_id}");
326 }
327 return Ok((pack_id.to_string(), flow_id.to_string()));
328 }
329 if let Some(flow) = engine.flow_by_id(flow_id) {
330 return Ok((flow.pack_id.clone(), flow.id.clone()));
331 }
332 bail!("flow {flow_id} is ambiguous; pack_id is required");
333 }
334
335 if let Some(flow_type) = activity.flow_type() {
336 if let Some(pack_id) = activity.pack_id() {
337 if let Some(flow) = engine
338 .flows()
339 .iter()
340 .find(|flow| flow.pack_id == pack_id && flow.flow_type == flow_type)
341 {
342 return Ok((pack_id.to_string(), flow.id.clone()));
343 }
344 bail!("flow type {flow_type} not registered for pack {pack_id}");
345 }
346 if let Some(flow) = engine.flow_by_type(flow_type) {
347 return Ok((flow.pack_id.clone(), flow.id.clone()));
348 }
349 bail!("flow type {flow_type} is ambiguous; pack_id is required");
350 }
351
352 let pack = runtime.pack();
353 let flow_id = pack
354 .metadata()
355 .entry_flows
356 .first()
357 .cloned()
358 .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))?;
359 Ok((pack.metadata().pack_id.clone(), flow_id))
360}
361
362fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
363 result
364 .as_array()
365 .cloned()
366 .unwrap_or_else(|| vec![result])
367 .into_iter()
368 .map(|payload| Activity::from_output(payload, tenant))
369 .collect()
370}
371
372fn is_pack_archive(path: &Path) -> bool {
373 path.extension()
374 .and_then(|ext| ext.to_str())
375 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
376 .unwrap_or(false)
377}