1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::storage::{
19 DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
20 state_host_from,
21};
22use crate::wasi::RunnerWasiPolicy;
23
24#[cfg(feature = "telemetry")]
25pub use greentic_telemetry::OtlpConfig as TelemetryCfg;
26#[cfg(not(feature = "telemetry"))]
27#[derive(Clone, Debug)]
28pub struct TelemetryCfg;
29
30pub struct HostBuilder {
32 configs: HashMap<String, HostConfig>,
33 #[cfg(feature = "telemetry")]
34 telemetry: Option<TelemetryCfg>,
35 wasi_policy: RunnerWasiPolicy,
36}
37
38impl HostBuilder {
39 pub fn new() -> Self {
40 Self {
41 configs: HashMap::new(),
42 #[cfg(feature = "telemetry")]
43 telemetry: None,
44 wasi_policy: RunnerWasiPolicy::default(),
45 }
46 }
47
48 pub fn with_config(mut self, config: HostConfig) -> Self {
49 self.configs.insert(config.tenant.clone(), config);
50 self
51 }
52
53 #[cfg(feature = "telemetry")]
54 pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
55 self.telemetry = Some(telemetry);
56 self
57 }
58
59 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
60 self.wasi_policy = policy;
61 self
62 }
63
64 pub fn build(self) -> Result<RunnerHost> {
65 if self.configs.is_empty() {
66 bail!("at least one tenant configuration is required");
67 }
68 let wasi_policy = Arc::new(self.wasi_policy);
69 let configs = self
70 .configs
71 .into_iter()
72 .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
73 .collect();
74 let session_store = new_session_store();
75 let session_host = session_host_from(Arc::clone(&session_store));
76 let state_store = new_state_store();
77 let state_host = state_host_from(Arc::clone(&state_store));
78 Ok(RunnerHost {
79 configs,
80 active: Arc::new(ActivePacks::new()),
81 health: Arc::new(HealthState::new()),
82 session_store,
83 state_store,
84 session_host,
85 state_host,
86 wasi_policy,
87 #[cfg(feature = "telemetry")]
88 telemetry: self.telemetry,
89 })
90 }
91}
92
93impl Default for HostBuilder {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99pub struct RunnerHost {
101 configs: HashMap<String, Arc<HostConfig>>,
102 active: Arc<ActivePacks>,
103 health: Arc<HealthState>,
104 session_store: DynSessionStore,
105 state_store: DynStateStore,
106 session_host: Arc<dyn SessionHost>,
107 state_host: Arc<dyn StateHost>,
108 wasi_policy: Arc<RunnerWasiPolicy>,
109 #[cfg(feature = "telemetry")]
110 telemetry: Option<TelemetryCfg>,
111}
112
113#[derive(Clone)]
115pub struct TenantHandle {
116 runtime: Arc<TenantRuntime>,
117}
118
119impl RunnerHost {
120 pub async fn start(&self) -> Result<()> {
121 #[cfg(feature = "telemetry")]
122 {
123 boot::init(&self.health, self.telemetry.as_ref())?;
124 }
125 #[cfg(not(feature = "telemetry"))]
126 {
127 boot::init(&self.health, None)?;
128 }
129 Ok(())
130 }
131
132 pub async fn stop(&self) -> Result<()> {
133 self.active.replace(HashMap::new());
134 Ok(())
135 }
136
137 pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
138 let archive_source = if is_pack_archive(pack_path) {
139 Some(pack_path)
140 } else {
141 None
142 };
143 let runtime = self
144 .prepare_runtime(tenant, pack_path, archive_source)
145 .await
146 .with_context(|| format!("failed to load tenant {tenant}"))?;
147 let mut next = (*self.active.snapshot()).clone();
148 next.insert(tenant.to_string(), runtime);
149 self.active.replace(next);
150 tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
151 Ok(())
152 }
153
154 pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
155 let runtime = self
156 .active
157 .load(tenant)
158 .with_context(|| format!("tenant {tenant} not loaded"))?;
159 let flow_id = resolve_flow_id(&runtime, &activity)?;
160 let action = activity.action().map(|value| value.to_string());
161 let session = activity.session_id().map(|value| value.to_string());
162 let provider = activity.provider_id().map(|value| value.to_string());
163 let channel = activity.channel().map(|value| value.to_string());
164 let conversation = activity.conversation().map(|value| value.to_string());
165 let user = activity.user().map(|value| value.to_string());
166 let flow_type = activity
167 .flow_type()
168 .map(|value| value.to_string())
169 .or_else(|| {
170 runtime
171 .engine()
172 .flow_by_id(&flow_id)
173 .map(|desc| desc.flow_type.clone())
174 });
175 let payload = activity.into_payload();
176
177 let envelope = IngressEnvelope {
178 tenant: tenant.to_string(),
179 env: std::env::var("GREENTIC_ENV").ok(),
180 flow_id: flow_id.clone(),
181 flow_type,
182 action,
183 session_hint: session,
184 provider,
185 channel,
186 conversation,
187 user,
188 activity_id: None,
189 timestamp: None,
190 payload,
191 metadata: None,
192 }
193 .canonicalize();
194
195 let result = runtime.state_machine().handle(envelope).await?;
196 Ok(normalize_replies(result, tenant))
197 }
198
199 pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
200 self.active
201 .load(tenant)
202 .map(|runtime| TenantHandle { runtime })
203 }
204
205 pub fn active_packs(&self) -> Arc<ActivePacks> {
206 Arc::clone(&self.active)
207 }
208
209 pub fn health_state(&self) -> Arc<HealthState> {
210 Arc::clone(&self.health)
211 }
212
213 pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
214 Arc::clone(&self.wasi_policy)
215 }
216
217 pub fn session_store(&self) -> DynSessionStore {
218 Arc::clone(&self.session_store)
219 }
220
221 pub fn state_store(&self) -> DynStateStore {
222 Arc::clone(&self.state_store)
223 }
224
225 pub fn session_host(&self) -> Arc<dyn SessionHost> {
226 Arc::clone(&self.session_host)
227 }
228
229 pub fn state_host(&self) -> Arc<dyn StateHost> {
230 Arc::clone(&self.state_host)
231 }
232
233 pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
234 self.configs.clone()
235 }
236
237 async fn prepare_runtime(
238 &self,
239 tenant: &str,
240 pack_path: &Path,
241 archive_source: Option<&Path>,
242 ) -> Result<Arc<TenantRuntime>> {
243 let config = self
244 .configs
245 .get(tenant)
246 .cloned()
247 .with_context(|| format!("tenant {tenant} not registered"))?;
248 if config.tenant != tenant {
249 bail!(
250 "tenant mismatch: config declares '{}' but '{tenant}' was requested",
251 config.tenant
252 );
253 }
254 let runtime = TenantRuntime::load(
255 pack_path,
256 Arc::clone(&config),
257 None,
258 archive_source,
259 None,
260 self.wasi_policy(),
261 self.session_host(),
262 self.session_store(),
263 self.state_store(),
264 self.state_host(),
265 )
266 .await?;
267 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
268 runtime.register_timers(timers);
269 Ok(runtime)
270 }
271}
272
273impl TenantHandle {
274 pub fn config(&self) -> Arc<HostConfig> {
275 Arc::clone(self.runtime.config())
276 }
277
278 pub fn pack(&self) -> Arc<PackRuntime> {
279 self.runtime.pack()
280 }
281
282 pub fn engine(&self) -> Arc<FlowEngine> {
283 Arc::clone(self.runtime.engine())
284 }
285
286 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
287 self.runtime.overlays()
288 }
289
290 pub fn overlay_digests(&self) -> Vec<Option<String>> {
291 self.runtime.overlay_digests()
292 }
293}
294
295fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
296 if let Some(flow_id) = activity.flow_id() {
297 return Ok(flow_id.to_string());
298 }
299
300 runtime
301 .pack()
302 .metadata()
303 .entry_flows
304 .first()
305 .cloned()
306 .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))
307}
308
309fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
310 result
311 .as_array()
312 .cloned()
313 .unwrap_or_else(|| vec![result])
314 .into_iter()
315 .map(|payload| Activity::from_output(payload, tenant))
316 .collect()
317}
318
319fn is_pack_archive(path: &Path) -> bool {
320 path.extension()
321 .and_then(|ext| ext.to_str())
322 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
323 .unwrap_or(false)
324}