greentic_runner_host/
host.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::secrets::{DynSecretsManager, default_manager};
19use crate::storage::{
20    DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
21    state_host_from,
22};
23use crate::wasi::RunnerWasiPolicy;
24
25#[cfg(feature = "telemetry")]
26pub use greentic_telemetry::OtlpConfig as TelemetryCfg;
27#[cfg(not(feature = "telemetry"))]
28#[derive(Clone, Debug)]
29pub struct TelemetryCfg;
30
31/// Builder for composing multi-tenant host instances.
32pub struct HostBuilder {
33    configs: HashMap<String, HostConfig>,
34    #[cfg(feature = "telemetry")]
35    telemetry: Option<TelemetryCfg>,
36    wasi_policy: RunnerWasiPolicy,
37    secrets: Option<DynSecretsManager>,
38}
39
40impl HostBuilder {
41    pub fn new() -> Self {
42        Self {
43            configs: HashMap::new(),
44            #[cfg(feature = "telemetry")]
45            telemetry: None,
46            wasi_policy: RunnerWasiPolicy::default(),
47            secrets: None,
48        }
49    }
50
51    pub fn with_config(mut self, config: HostConfig) -> Self {
52        self.configs.insert(config.tenant.clone(), config);
53        self
54    }
55
56    #[cfg(feature = "telemetry")]
57    pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
58        self.telemetry = Some(telemetry);
59        self
60    }
61
62    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
63        self.wasi_policy = policy;
64        self
65    }
66
67    pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
68        self.secrets = Some(manager);
69        self
70    }
71
72    pub fn build(self) -> Result<RunnerHost> {
73        if self.configs.is_empty() {
74            bail!("at least one tenant configuration is required");
75        }
76        let wasi_policy = Arc::new(self.wasi_policy);
77        let configs = self
78            .configs
79            .into_iter()
80            .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
81            .collect();
82        let session_store = new_session_store();
83        let session_host = session_host_from(Arc::clone(&session_store));
84        let state_store = new_state_store();
85        let state_host = state_host_from(Arc::clone(&state_store));
86        let secrets = self.secrets.unwrap_or_else(default_manager);
87        Ok(RunnerHost {
88            configs,
89            active: Arc::new(ActivePacks::new()),
90            health: Arc::new(HealthState::new()),
91            session_store,
92            state_store,
93            session_host,
94            state_host,
95            wasi_policy,
96            secrets_manager: secrets,
97            #[cfg(feature = "telemetry")]
98            telemetry: self.telemetry,
99        })
100    }
101}
102
103impl Default for HostBuilder {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109/// Runtime host that manages tenant-bound packs and flow execution.
110pub struct RunnerHost {
111    configs: HashMap<String, Arc<HostConfig>>,
112    active: Arc<ActivePacks>,
113    health: Arc<HealthState>,
114    session_store: DynSessionStore,
115    state_store: DynStateStore,
116    session_host: Arc<dyn SessionHost>,
117    state_host: Arc<dyn StateHost>,
118    wasi_policy: Arc<RunnerWasiPolicy>,
119    secrets_manager: DynSecretsManager,
120    #[cfg(feature = "telemetry")]
121    telemetry: Option<TelemetryCfg>,
122}
123
124/// Handle exposing tenant internals for embedding hosts (e.g. CLI server).
125#[derive(Clone)]
126pub struct TenantHandle {
127    runtime: Arc<TenantRuntime>,
128}
129
130impl RunnerHost {
131    pub async fn start(&self) -> Result<()> {
132        #[cfg(feature = "telemetry")]
133        {
134            boot::init(&self.health, self.telemetry.as_ref())?;
135        }
136        #[cfg(not(feature = "telemetry"))]
137        {
138            boot::init(&self.health, None)?;
139        }
140        Ok(())
141    }
142
143    pub async fn stop(&self) -> Result<()> {
144        self.active.replace(HashMap::new());
145        Ok(())
146    }
147
148    pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
149        let archive_source = if is_pack_archive(pack_path) {
150            Some(pack_path)
151        } else {
152            None
153        };
154        let runtime = self
155            .prepare_runtime(tenant, pack_path, archive_source)
156            .await
157            .with_context(|| format!("failed to load tenant {tenant}"))?;
158        let mut next = (*self.active.snapshot()).clone();
159        next.insert(tenant.to_string(), runtime);
160        self.active.replace(next);
161        tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
162        Ok(())
163    }
164
165    pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
166        let runtime = self
167            .active
168            .load(tenant)
169            .with_context(|| format!("tenant {tenant} not loaded"))?;
170        let flow_id = resolve_flow_id(&runtime, &activity)?;
171        let action = activity.action().map(|value| value.to_string());
172        let session = activity.session_id().map(|value| value.to_string());
173        let provider = activity.provider_id().map(|value| value.to_string());
174        let channel = activity.channel().map(|value| value.to_string());
175        let conversation = activity.conversation().map(|value| value.to_string());
176        let user = activity.user().map(|value| value.to_string());
177        let flow_type = activity
178            .flow_type()
179            .map(|value| value.to_string())
180            .or_else(|| {
181                runtime
182                    .engine()
183                    .flow_by_id(&flow_id)
184                    .map(|desc| desc.flow_type.clone())
185            });
186        let payload = activity.into_payload();
187
188        let envelope = IngressEnvelope {
189            tenant: tenant.to_string(),
190            env: std::env::var("GREENTIC_ENV").ok(),
191            flow_id: flow_id.clone(),
192            flow_type,
193            action,
194            session_hint: session,
195            provider,
196            channel,
197            conversation,
198            user,
199            activity_id: None,
200            timestamp: None,
201            payload,
202            metadata: None,
203        }
204        .canonicalize();
205
206        let result = runtime.state_machine().handle(envelope).await?;
207        Ok(normalize_replies(result, tenant))
208    }
209
210    pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
211        self.active
212            .load(tenant)
213            .map(|runtime| TenantHandle { runtime })
214    }
215
216    pub fn active_packs(&self) -> Arc<ActivePacks> {
217        Arc::clone(&self.active)
218    }
219
220    pub fn health_state(&self) -> Arc<HealthState> {
221        Arc::clone(&self.health)
222    }
223
224    pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
225        Arc::clone(&self.wasi_policy)
226    }
227
228    pub fn session_store(&self) -> DynSessionStore {
229        Arc::clone(&self.session_store)
230    }
231
232    pub fn state_store(&self) -> DynStateStore {
233        Arc::clone(&self.state_store)
234    }
235
236    pub fn session_host(&self) -> Arc<dyn SessionHost> {
237        Arc::clone(&self.session_host)
238    }
239
240    pub fn state_host(&self) -> Arc<dyn StateHost> {
241        Arc::clone(&self.state_host)
242    }
243
244    pub fn secrets_manager(&self) -> DynSecretsManager {
245        Arc::clone(&self.secrets_manager)
246    }
247
248    pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
249        self.configs.clone()
250    }
251
252    async fn prepare_runtime(
253        &self,
254        tenant: &str,
255        pack_path: &Path,
256        archive_source: Option<&Path>,
257    ) -> Result<Arc<TenantRuntime>> {
258        let config = self
259            .configs
260            .get(tenant)
261            .cloned()
262            .with_context(|| format!("tenant {tenant} not registered"))?;
263        if config.tenant != tenant {
264            bail!(
265                "tenant mismatch: config declares '{}' but '{tenant}' was requested",
266                config.tenant
267            );
268        }
269        let runtime = TenantRuntime::load(
270            pack_path,
271            Arc::clone(&config),
272            None,
273            archive_source,
274            None,
275            self.wasi_policy(),
276            self.session_host(),
277            self.session_store(),
278            self.state_store(),
279            self.state_host(),
280            self.secrets_manager(),
281        )
282        .await?;
283        let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
284        runtime.register_timers(timers);
285        Ok(runtime)
286    }
287}
288
289impl TenantHandle {
290    pub fn config(&self) -> Arc<HostConfig> {
291        Arc::clone(self.runtime.config())
292    }
293
294    pub fn pack(&self) -> Arc<PackRuntime> {
295        self.runtime.pack()
296    }
297
298    pub fn engine(&self) -> Arc<FlowEngine> {
299        Arc::clone(self.runtime.engine())
300    }
301
302    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
303        self.runtime.overlays()
304    }
305
306    pub fn overlay_digests(&self) -> Vec<Option<String>> {
307        self.runtime.overlay_digests()
308    }
309}
310
311fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
312    if let Some(flow_id) = activity.flow_id() {
313        return Ok(flow_id.to_string());
314    }
315
316    runtime
317        .pack()
318        .metadata()
319        .entry_flows
320        .first()
321        .cloned()
322        .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))
323}
324
325fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
326    result
327        .as_array()
328        .cloned()
329        .unwrap_or_else(|| vec![result])
330        .into_iter()
331        .map(|payload| Activity::from_output(payload, tenant))
332        .collect()
333}
334
335fn is_pack_archive(path: &Path) -> bool {
336    path.extension()
337        .and_then(|ext| ext.to_str())
338        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
339        .unwrap_or(false)
340}