greentic_runner_host/
host.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::storage::{
19    DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
20    state_host_from,
21};
22use crate::wasi::RunnerWasiPolicy;
23
24#[cfg(feature = "telemetry")]
25pub use greentic_telemetry::OtlpConfig as TelemetryCfg;
26#[cfg(not(feature = "telemetry"))]
27#[derive(Clone, Debug)]
28pub struct TelemetryCfg;
29
30/// Builder for composing multi-tenant host instances.
31pub struct HostBuilder {
32    configs: HashMap<String, HostConfig>,
33    #[cfg(feature = "telemetry")]
34    telemetry: Option<TelemetryCfg>,
35    wasi_policy: RunnerWasiPolicy,
36}
37
38impl HostBuilder {
39    pub fn new() -> Self {
40        Self {
41            configs: HashMap::new(),
42            #[cfg(feature = "telemetry")]
43            telemetry: None,
44            wasi_policy: RunnerWasiPolicy::default(),
45        }
46    }
47
48    pub fn with_config(mut self, config: HostConfig) -> Self {
49        self.configs.insert(config.tenant.clone(), config);
50        self
51    }
52
53    #[cfg(feature = "telemetry")]
54    pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
55        self.telemetry = Some(telemetry);
56        self
57    }
58
59    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
60        self.wasi_policy = policy;
61        self
62    }
63
64    pub fn build(self) -> Result<RunnerHost> {
65        if self.configs.is_empty() {
66            bail!("at least one tenant configuration is required");
67        }
68        let wasi_policy = Arc::new(self.wasi_policy);
69        let configs = self
70            .configs
71            .into_iter()
72            .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
73            .collect();
74        let session_store = new_session_store();
75        let session_host = session_host_from(Arc::clone(&session_store));
76        let state_store = new_state_store();
77        let state_host = state_host_from(Arc::clone(&state_store));
78        Ok(RunnerHost {
79            configs,
80            active: Arc::new(ActivePacks::new()),
81            health: Arc::new(HealthState::new()),
82            session_store,
83            state_store,
84            session_host,
85            state_host,
86            wasi_policy,
87            #[cfg(feature = "telemetry")]
88            telemetry: self.telemetry,
89        })
90    }
91}
92
93impl Default for HostBuilder {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99/// Runtime host that manages tenant-bound packs and flow execution.
100pub struct RunnerHost {
101    configs: HashMap<String, Arc<HostConfig>>,
102    active: Arc<ActivePacks>,
103    health: Arc<HealthState>,
104    session_store: DynSessionStore,
105    state_store: DynStateStore,
106    session_host: Arc<dyn SessionHost>,
107    state_host: Arc<dyn StateHost>,
108    wasi_policy: Arc<RunnerWasiPolicy>,
109    #[cfg(feature = "telemetry")]
110    telemetry: Option<TelemetryCfg>,
111}
112
113/// Handle exposing tenant internals for embedding hosts (e.g. CLI server).
114#[derive(Clone)]
115pub struct TenantHandle {
116    runtime: Arc<TenantRuntime>,
117}
118
119impl RunnerHost {
120    pub async fn start(&self) -> Result<()> {
121        #[cfg(feature = "telemetry")]
122        {
123            boot::init(&self.health, self.telemetry.as_ref())?;
124        }
125        #[cfg(not(feature = "telemetry"))]
126        {
127            boot::init(&self.health, None)?;
128        }
129        Ok(())
130    }
131
132    pub async fn stop(&self) -> Result<()> {
133        self.active.replace(HashMap::new());
134        Ok(())
135    }
136
137    pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
138        let archive_source = if is_pack_archive(pack_path) {
139            Some(pack_path)
140        } else {
141            None
142        };
143        let runtime = self
144            .prepare_runtime(tenant, pack_path, archive_source)
145            .await
146            .with_context(|| format!("failed to load tenant {tenant}"))?;
147        let mut next = (*self.active.snapshot()).clone();
148        next.insert(tenant.to_string(), runtime);
149        self.active.replace(next);
150        tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
151        Ok(())
152    }
153
154    pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
155        let runtime = self
156            .active
157            .load(tenant)
158            .with_context(|| format!("tenant {tenant} not loaded"))?;
159        let flow_id = resolve_flow_id(&runtime, &activity)?;
160        let action = activity.action().map(|value| value.to_string());
161        let session = activity.session_id().map(|value| value.to_string());
162        let provider = activity.provider_id().map(|value| value.to_string());
163        let channel = activity.channel().map(|value| value.to_string());
164        let conversation = activity.conversation().map(|value| value.to_string());
165        let user = activity.user().map(|value| value.to_string());
166        let flow_type = activity
167            .flow_type()
168            .map(|value| value.to_string())
169            .or_else(|| {
170                runtime
171                    .engine()
172                    .flow_by_id(&flow_id)
173                    .map(|desc| desc.flow_type.clone())
174            });
175        let payload = activity.into_payload();
176
177        let envelope = IngressEnvelope {
178            tenant: tenant.to_string(),
179            env: std::env::var("GREENTIC_ENV").ok(),
180            flow_id: flow_id.clone(),
181            flow_type,
182            action,
183            session_hint: session,
184            provider,
185            channel,
186            conversation,
187            user,
188            activity_id: None,
189            timestamp: None,
190            payload,
191            metadata: None,
192        }
193        .canonicalize();
194
195        let result = runtime.state_machine().handle(envelope).await?;
196        Ok(normalize_replies(result, tenant))
197    }
198
199    pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
200        self.active
201            .load(tenant)
202            .map(|runtime| TenantHandle { runtime })
203    }
204
205    pub fn active_packs(&self) -> Arc<ActivePacks> {
206        Arc::clone(&self.active)
207    }
208
209    pub fn health_state(&self) -> Arc<HealthState> {
210        Arc::clone(&self.health)
211    }
212
213    pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
214        Arc::clone(&self.wasi_policy)
215    }
216
217    pub fn session_store(&self) -> DynSessionStore {
218        Arc::clone(&self.session_store)
219    }
220
221    pub fn state_store(&self) -> DynStateStore {
222        Arc::clone(&self.state_store)
223    }
224
225    pub fn session_host(&self) -> Arc<dyn SessionHost> {
226        Arc::clone(&self.session_host)
227    }
228
229    pub fn state_host(&self) -> Arc<dyn StateHost> {
230        Arc::clone(&self.state_host)
231    }
232
233    pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
234        self.configs.clone()
235    }
236
237    async fn prepare_runtime(
238        &self,
239        tenant: &str,
240        pack_path: &Path,
241        archive_source: Option<&Path>,
242    ) -> Result<Arc<TenantRuntime>> {
243        let config = self
244            .configs
245            .get(tenant)
246            .cloned()
247            .with_context(|| format!("tenant {tenant} not registered"))?;
248        if config.tenant != tenant {
249            bail!(
250                "tenant mismatch: config declares '{}' but '{tenant}' was requested",
251                config.tenant
252            );
253        }
254        let runtime = TenantRuntime::load(
255            pack_path,
256            Arc::clone(&config),
257            None,
258            archive_source,
259            None,
260            self.wasi_policy(),
261            self.session_host(),
262            self.session_store(),
263            self.state_store(),
264            self.state_host(),
265        )
266        .await?;
267        let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
268        runtime.register_timers(timers);
269        Ok(runtime)
270    }
271}
272
273impl TenantHandle {
274    pub fn config(&self) -> Arc<HostConfig> {
275        Arc::clone(self.runtime.config())
276    }
277
278    pub fn pack(&self) -> Arc<PackRuntime> {
279        self.runtime.pack()
280    }
281
282    pub fn engine(&self) -> Arc<FlowEngine> {
283        Arc::clone(self.runtime.engine())
284    }
285
286    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
287        self.runtime.overlays()
288    }
289
290    pub fn overlay_digests(&self) -> Vec<Option<String>> {
291        self.runtime.overlay_digests()
292    }
293}
294
295fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
296    if let Some(flow_id) = activity.flow_id() {
297        return Ok(flow_id.to_string());
298    }
299
300    runtime
301        .pack()
302        .metadata()
303        .entry_flows
304        .first()
305        .cloned()
306        .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))
307}
308
309fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
310    result
311        .as_array()
312        .cloned()
313        .unwrap_or_else(|| vec![result])
314        .into_iter()
315        .map(|payload| Activity::from_output(payload, tenant))
316        .collect()
317}
318
319fn is_pack_archive(path: &Path) -> bool {
320    path.extension()
321        .and_then(|ext| ext.to_str())
322        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
323        .unwrap_or(false)
324}