1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::secrets::{DynSecretsManager, default_manager};
19use crate::storage::{
20 DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
21 state_host_from,
22};
23use crate::wasi::RunnerWasiPolicy;
24
25#[cfg(feature = "telemetry")]
26pub use greentic_telemetry::OtlpConfig as TelemetryCfg;
27#[cfg(not(feature = "telemetry"))]
28#[derive(Clone, Debug)]
29pub struct TelemetryCfg;
30
31pub struct HostBuilder {
33 configs: HashMap<String, HostConfig>,
34 #[cfg(feature = "telemetry")]
35 telemetry: Option<TelemetryCfg>,
36 wasi_policy: RunnerWasiPolicy,
37 secrets: Option<DynSecretsManager>,
38}
39
40impl HostBuilder {
41 pub fn new() -> Self {
42 Self {
43 configs: HashMap::new(),
44 #[cfg(feature = "telemetry")]
45 telemetry: None,
46 wasi_policy: RunnerWasiPolicy::default(),
47 secrets: None,
48 }
49 }
50
51 pub fn with_config(mut self, config: HostConfig) -> Self {
52 self.configs.insert(config.tenant.clone(), config);
53 self
54 }
55
56 #[cfg(feature = "telemetry")]
57 pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
58 self.telemetry = Some(telemetry);
59 self
60 }
61
62 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
63 self.wasi_policy = policy;
64 self
65 }
66
67 pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
68 self.secrets = Some(manager);
69 self
70 }
71
72 pub fn build(self) -> Result<RunnerHost> {
73 if self.configs.is_empty() {
74 bail!("at least one tenant configuration is required");
75 }
76 let wasi_policy = Arc::new(self.wasi_policy);
77 let configs = self
78 .configs
79 .into_iter()
80 .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
81 .collect();
82 let session_store = new_session_store();
83 let session_host = session_host_from(Arc::clone(&session_store));
84 let state_store = new_state_store();
85 let state_host = state_host_from(Arc::clone(&state_store));
86 let secrets = self.secrets.unwrap_or_else(default_manager);
87 Ok(RunnerHost {
88 configs,
89 active: Arc::new(ActivePacks::new()),
90 health: Arc::new(HealthState::new()),
91 session_store,
92 state_store,
93 session_host,
94 state_host,
95 wasi_policy,
96 secrets_manager: secrets,
97 #[cfg(feature = "telemetry")]
98 telemetry: self.telemetry,
99 })
100 }
101}
102
103impl Default for HostBuilder {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109pub struct RunnerHost {
111 configs: HashMap<String, Arc<HostConfig>>,
112 active: Arc<ActivePacks>,
113 health: Arc<HealthState>,
114 session_store: DynSessionStore,
115 state_store: DynStateStore,
116 session_host: Arc<dyn SessionHost>,
117 state_host: Arc<dyn StateHost>,
118 wasi_policy: Arc<RunnerWasiPolicy>,
119 secrets_manager: DynSecretsManager,
120 #[cfg(feature = "telemetry")]
121 telemetry: Option<TelemetryCfg>,
122}
123
124#[derive(Clone)]
126pub struct TenantHandle {
127 runtime: Arc<TenantRuntime>,
128}
129
130impl RunnerHost {
131 pub async fn start(&self) -> Result<()> {
132 #[cfg(feature = "telemetry")]
133 {
134 boot::init(&self.health, self.telemetry.as_ref())?;
135 }
136 #[cfg(not(feature = "telemetry"))]
137 {
138 boot::init(&self.health, None)?;
139 }
140 Ok(())
141 }
142
143 pub async fn stop(&self) -> Result<()> {
144 self.active.replace(HashMap::new());
145 Ok(())
146 }
147
148 pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
149 let archive_source = if is_pack_archive(pack_path) {
150 Some(pack_path)
151 } else {
152 None
153 };
154 let runtime = self
155 .prepare_runtime(tenant, pack_path, archive_source)
156 .await
157 .with_context(|| format!("failed to load tenant {tenant}"))?;
158 let mut next = (*self.active.snapshot()).clone();
159 next.insert(tenant.to_string(), runtime);
160 self.active.replace(next);
161 tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
162 Ok(())
163 }
164
165 pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
166 let runtime = self
167 .active
168 .load(tenant)
169 .with_context(|| format!("tenant {tenant} not loaded"))?;
170 let flow_id = resolve_flow_id(&runtime, &activity)?;
171 let action = activity.action().map(|value| value.to_string());
172 let session = activity.session_id().map(|value| value.to_string());
173 let provider = activity.provider_id().map(|value| value.to_string());
174 let channel = activity.channel().map(|value| value.to_string());
175 let conversation = activity.conversation().map(|value| value.to_string());
176 let user = activity.user().map(|value| value.to_string());
177 let flow_type = activity
178 .flow_type()
179 .map(|value| value.to_string())
180 .or_else(|| {
181 runtime
182 .engine()
183 .flow_by_id(&flow_id)
184 .map(|desc| desc.flow_type.clone())
185 });
186 let payload = activity.into_payload();
187
188 let envelope = IngressEnvelope {
189 tenant: tenant.to_string(),
190 env: std::env::var("GREENTIC_ENV").ok(),
191 flow_id: flow_id.clone(),
192 flow_type,
193 action,
194 session_hint: session,
195 provider,
196 channel,
197 conversation,
198 user,
199 activity_id: None,
200 timestamp: None,
201 payload,
202 metadata: None,
203 }
204 .canonicalize();
205
206 let result = runtime.state_machine().handle(envelope).await?;
207 Ok(normalize_replies(result, tenant))
208 }
209
210 pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
211 self.active
212 .load(tenant)
213 .map(|runtime| TenantHandle { runtime })
214 }
215
216 pub fn active_packs(&self) -> Arc<ActivePacks> {
217 Arc::clone(&self.active)
218 }
219
220 pub fn health_state(&self) -> Arc<HealthState> {
221 Arc::clone(&self.health)
222 }
223
224 pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
225 Arc::clone(&self.wasi_policy)
226 }
227
228 pub fn session_store(&self) -> DynSessionStore {
229 Arc::clone(&self.session_store)
230 }
231
232 pub fn state_store(&self) -> DynStateStore {
233 Arc::clone(&self.state_store)
234 }
235
236 pub fn session_host(&self) -> Arc<dyn SessionHost> {
237 Arc::clone(&self.session_host)
238 }
239
240 pub fn state_host(&self) -> Arc<dyn StateHost> {
241 Arc::clone(&self.state_host)
242 }
243
244 pub fn secrets_manager(&self) -> DynSecretsManager {
245 Arc::clone(&self.secrets_manager)
246 }
247
248 pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
249 self.configs.clone()
250 }
251
252 async fn prepare_runtime(
253 &self,
254 tenant: &str,
255 pack_path: &Path,
256 archive_source: Option<&Path>,
257 ) -> Result<Arc<TenantRuntime>> {
258 let config = self
259 .configs
260 .get(tenant)
261 .cloned()
262 .with_context(|| format!("tenant {tenant} not registered"))?;
263 if config.tenant != tenant {
264 bail!(
265 "tenant mismatch: config declares '{}' but '{tenant}' was requested",
266 config.tenant
267 );
268 }
269 let runtime = TenantRuntime::load(
270 pack_path,
271 Arc::clone(&config),
272 None,
273 archive_source,
274 None,
275 self.wasi_policy(),
276 self.session_host(),
277 self.session_store(),
278 self.state_store(),
279 self.state_host(),
280 self.secrets_manager(),
281 )
282 .await?;
283 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
284 runtime.register_timers(timers);
285 Ok(runtime)
286 }
287}
288
289impl TenantHandle {
290 pub fn config(&self) -> Arc<HostConfig> {
291 Arc::clone(self.runtime.config())
292 }
293
294 pub fn pack(&self) -> Arc<PackRuntime> {
295 self.runtime.pack()
296 }
297
298 pub fn engine(&self) -> Arc<FlowEngine> {
299 Arc::clone(self.runtime.engine())
300 }
301
302 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
303 self.runtime.overlays()
304 }
305
306 pub fn overlay_digests(&self) -> Vec<Option<String>> {
307 self.runtime.overlay_digests()
308 }
309}
310
311fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
312 if let Some(flow_id) = activity.flow_id() {
313 return Ok(flow_id.to_string());
314 }
315
316 runtime
317 .pack()
318 .metadata()
319 .entry_flows
320 .first()
321 .cloned()
322 .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))
323}
324
325fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
326 result
327 .as_array()
328 .cloned()
329 .unwrap_or_else(|| vec![result])
330 .into_iter()
331 .map(|payload| Activity::from_output(payload, tenant))
332 .collect()
333}
334
335fn is_pack_archive(path: &Path) -> bool {
336 path.extension()
337 .and_then(|ext| ext.to_str())
338 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
339 .unwrap_or(false)
340}