1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::secrets::{DynSecretsManager, default_manager};
19use crate::storage::{
20 DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
21 state_host_from,
22};
23use crate::wasi::RunnerWasiPolicy;
24
25#[cfg(feature = "telemetry")]
26#[derive(Clone, Debug)]
27pub struct TelemetryCfg {
28 pub config: greentic_telemetry::TelemetryConfig,
29 pub export: greentic_telemetry::export::ExportConfig,
30}
31#[cfg(not(feature = "telemetry"))]
32#[derive(Clone, Debug)]
33pub struct TelemetryCfg;
34
35pub struct HostBuilder {
37 configs: HashMap<String, HostConfig>,
38 #[cfg(feature = "telemetry")]
39 telemetry: Option<TelemetryCfg>,
40 wasi_policy: RunnerWasiPolicy,
41 secrets: Option<DynSecretsManager>,
42}
43
44impl HostBuilder {
45 pub fn new() -> Self {
46 Self {
47 configs: HashMap::new(),
48 #[cfg(feature = "telemetry")]
49 telemetry: None,
50 wasi_policy: RunnerWasiPolicy::default(),
51 secrets: None,
52 }
53 }
54
55 pub fn with_config(mut self, config: HostConfig) -> Self {
56 self.configs.insert(config.tenant.clone(), config);
57 self
58 }
59
60 #[cfg(feature = "telemetry")]
61 pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
62 self.telemetry = Some(telemetry);
63 self
64 }
65
66 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
67 self.wasi_policy = policy;
68 self
69 }
70
71 pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
72 self.secrets = Some(manager);
73 self
74 }
75
76 pub fn build(self) -> Result<RunnerHost> {
77 if self.configs.is_empty() {
78 bail!("at least one tenant configuration is required");
79 }
80 let wasi_policy = Arc::new(self.wasi_policy);
81 let configs = self
82 .configs
83 .into_iter()
84 .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
85 .collect();
86 let session_store = new_session_store();
87 let session_host = session_host_from(Arc::clone(&session_store));
88 let state_store = new_state_store();
89 let state_host = state_host_from(Arc::clone(&state_store));
90 let secrets = self.secrets.unwrap_or_else(default_manager);
91 Ok(RunnerHost {
92 configs,
93 active: Arc::new(ActivePacks::new()),
94 health: Arc::new(HealthState::new()),
95 session_store,
96 state_store,
97 session_host,
98 state_host,
99 wasi_policy,
100 secrets_manager: secrets,
101 #[cfg(feature = "telemetry")]
102 telemetry: self.telemetry,
103 })
104 }
105}
106
107impl Default for HostBuilder {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113pub struct RunnerHost {
115 configs: HashMap<String, Arc<HostConfig>>,
116 active: Arc<ActivePacks>,
117 health: Arc<HealthState>,
118 session_store: DynSessionStore,
119 state_store: DynStateStore,
120 session_host: Arc<dyn SessionHost>,
121 state_host: Arc<dyn StateHost>,
122 wasi_policy: Arc<RunnerWasiPolicy>,
123 secrets_manager: DynSecretsManager,
124 #[cfg(feature = "telemetry")]
125 telemetry: Option<TelemetryCfg>,
126}
127
128#[derive(Clone)]
130pub struct TenantHandle {
131 runtime: Arc<TenantRuntime>,
132}
133
134impl RunnerHost {
135 pub async fn start(&self) -> Result<()> {
136 #[cfg(feature = "telemetry")]
137 {
138 boot::init(&self.health, self.telemetry.as_ref())?;
139 }
140 #[cfg(not(feature = "telemetry"))]
141 {
142 boot::init(&self.health, None)?;
143 }
144 Ok(())
145 }
146
147 pub async fn stop(&self) -> Result<()> {
148 self.active.replace(HashMap::new());
149 Ok(())
150 }
151
152 pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
153 let archive_source = if is_pack_archive(pack_path) {
154 Some(pack_path)
155 } else {
156 None
157 };
158 let runtime = self
159 .prepare_runtime(tenant, pack_path, archive_source)
160 .await
161 .with_context(|| format!("failed to load tenant {tenant}"))?;
162 let mut next = (*self.active.snapshot()).clone();
163 next.insert(tenant.to_string(), runtime);
164 self.active.replace(next);
165 tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
166 Ok(())
167 }
168
169 pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
170 let runtime = self
171 .active
172 .load(tenant)
173 .with_context(|| format!("tenant {tenant} not loaded"))?;
174 let flow_id = resolve_flow_id(&runtime, &activity)?;
175 let action = activity.action().map(|value| value.to_string());
176 let session = activity.session_id().map(|value| value.to_string());
177 let provider = activity.provider_id().map(|value| value.to_string());
178 let channel = activity.channel().map(|value| value.to_string());
179 let conversation = activity.conversation().map(|value| value.to_string());
180 let user = activity.user().map(|value| value.to_string());
181 let flow_type = activity
182 .flow_type()
183 .map(|value| value.to_string())
184 .or_else(|| {
185 runtime
186 .engine()
187 .flow_by_id(&flow_id)
188 .map(|desc| desc.flow_type.clone())
189 });
190 let payload = activity.into_payload();
191
192 let envelope = IngressEnvelope {
193 tenant: tenant.to_string(),
194 env: std::env::var("GREENTIC_ENV").ok(),
195 flow_id: flow_id.clone(),
196 flow_type,
197 action,
198 session_hint: session,
199 provider,
200 channel,
201 conversation,
202 user,
203 activity_id: None,
204 timestamp: None,
205 payload,
206 metadata: None,
207 }
208 .canonicalize();
209
210 let result = runtime.state_machine().handle(envelope).await?;
211 Ok(normalize_replies(result, tenant))
212 }
213
214 pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
215 self.active
216 .load(tenant)
217 .map(|runtime| TenantHandle { runtime })
218 }
219
220 pub fn active_packs(&self) -> Arc<ActivePacks> {
221 Arc::clone(&self.active)
222 }
223
224 pub fn health_state(&self) -> Arc<HealthState> {
225 Arc::clone(&self.health)
226 }
227
228 pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
229 Arc::clone(&self.wasi_policy)
230 }
231
232 pub fn session_store(&self) -> DynSessionStore {
233 Arc::clone(&self.session_store)
234 }
235
236 pub fn state_store(&self) -> DynStateStore {
237 Arc::clone(&self.state_store)
238 }
239
240 pub fn session_host(&self) -> Arc<dyn SessionHost> {
241 Arc::clone(&self.session_host)
242 }
243
244 pub fn state_host(&self) -> Arc<dyn StateHost> {
245 Arc::clone(&self.state_host)
246 }
247
248 pub fn secrets_manager(&self) -> DynSecretsManager {
249 Arc::clone(&self.secrets_manager)
250 }
251
252 pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
253 self.configs.clone()
254 }
255
256 async fn prepare_runtime(
257 &self,
258 tenant: &str,
259 pack_path: &Path,
260 archive_source: Option<&Path>,
261 ) -> Result<Arc<TenantRuntime>> {
262 let config = self
263 .configs
264 .get(tenant)
265 .cloned()
266 .with_context(|| format!("tenant {tenant} not registered"))?;
267 if config.tenant != tenant {
268 bail!(
269 "tenant mismatch: config declares '{}' but '{tenant}' was requested",
270 config.tenant
271 );
272 }
273 let runtime = TenantRuntime::load(
274 pack_path,
275 Arc::clone(&config),
276 None,
277 archive_source,
278 None,
279 self.wasi_policy(),
280 self.session_host(),
281 self.session_store(),
282 self.state_store(),
283 self.state_host(),
284 self.secrets_manager(),
285 )
286 .await?;
287 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
288 runtime.register_timers(timers);
289 Ok(runtime)
290 }
291}
292
293impl TenantHandle {
294 pub fn config(&self) -> Arc<HostConfig> {
295 Arc::clone(self.runtime.config())
296 }
297
298 pub fn pack(&self) -> Arc<PackRuntime> {
299 self.runtime.pack()
300 }
301
302 pub fn engine(&self) -> Arc<FlowEngine> {
303 Arc::clone(self.runtime.engine())
304 }
305
306 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
307 self.runtime.overlays()
308 }
309
310 pub fn overlay_digests(&self) -> Vec<Option<String>> {
311 self.runtime.overlay_digests()
312 }
313}
314
315fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
316 if let Some(flow_id) = activity.flow_id() {
317 return Ok(flow_id.to_string());
318 }
319
320 runtime
321 .pack()
322 .metadata()
323 .entry_flows
324 .first()
325 .cloned()
326 .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))
327}
328
329fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
330 result
331 .as_array()
332 .cloned()
333 .unwrap_or_else(|| vec![result])
334 .into_iter()
335 .map(|payload| Activity::from_output(payload, tenant))
336 .collect()
337}
338
339fn is_pack_archive(path: &Path) -> bool {
340 path.extension()
341 .and_then(|ext| ext.to_str())
342 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
343 .unwrap_or(false)
344}