Skip to main content

greentic_runner_host/
host.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::secrets::{DynSecretsManager, default_manager};
19use crate::storage::{
20    DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
21    state_host_from,
22};
23use crate::wasi::RunnerWasiPolicy;
24
25#[cfg(feature = "telemetry")]
26#[derive(Clone, Debug)]
27pub struct TelemetryCfg {
28    pub config: greentic_telemetry::TelemetryConfig,
29    pub export: greentic_telemetry::export::ExportConfig,
30}
31#[cfg(not(feature = "telemetry"))]
32#[derive(Clone, Debug)]
33pub struct TelemetryCfg;
34
35/// Builder for composing multi-tenant host instances.
36pub struct HostBuilder {
37    configs: HashMap<String, HostConfig>,
38    #[cfg(feature = "telemetry")]
39    telemetry: Option<TelemetryCfg>,
40    wasi_policy: RunnerWasiPolicy,
41    secrets: Option<DynSecretsManager>,
42}
43
44impl HostBuilder {
45    pub fn new() -> Self {
46        Self {
47            configs: HashMap::new(),
48            #[cfg(feature = "telemetry")]
49            telemetry: None,
50            wasi_policy: RunnerWasiPolicy::default(),
51            secrets: None,
52        }
53    }
54
55    pub fn with_config(mut self, config: HostConfig) -> Self {
56        self.configs.insert(config.tenant.clone(), config);
57        self
58    }
59
60    #[cfg(feature = "telemetry")]
61    pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
62        self.telemetry = Some(telemetry);
63        self
64    }
65
66    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
67        self.wasi_policy = policy;
68        self
69    }
70
71    pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
72        self.secrets = Some(manager);
73        self
74    }
75
76    pub fn build(self) -> Result<RunnerHost> {
77        if self.configs.is_empty() {
78            bail!("at least one tenant configuration is required");
79        }
80        let wasi_policy = Arc::new(self.wasi_policy);
81        let configs = self
82            .configs
83            .into_iter()
84            .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
85            .collect();
86        let session_store = new_session_store();
87        let session_host = session_host_from(Arc::clone(&session_store));
88        let state_store = new_state_store();
89        let state_host = state_host_from(Arc::clone(&state_store));
90        let secrets = self.secrets.unwrap_or_else(default_manager);
91        Ok(RunnerHost {
92            configs,
93            active: Arc::new(ActivePacks::new()),
94            health: Arc::new(HealthState::new()),
95            session_store,
96            state_store,
97            session_host,
98            state_host,
99            wasi_policy,
100            secrets_manager: secrets,
101            #[cfg(feature = "telemetry")]
102            telemetry: self.telemetry,
103        })
104    }
105}
106
107impl Default for HostBuilder {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113/// Runtime host that manages tenant-bound packs and flow execution.
114pub struct RunnerHost {
115    configs: HashMap<String, Arc<HostConfig>>,
116    active: Arc<ActivePacks>,
117    health: Arc<HealthState>,
118    session_store: DynSessionStore,
119    state_store: DynStateStore,
120    session_host: Arc<dyn SessionHost>,
121    state_host: Arc<dyn StateHost>,
122    wasi_policy: Arc<RunnerWasiPolicy>,
123    secrets_manager: DynSecretsManager,
124    #[cfg(feature = "telemetry")]
125    telemetry: Option<TelemetryCfg>,
126}
127
128/// Handle exposing tenant internals for embedding hosts (e.g. CLI server).
129#[derive(Clone)]
130pub struct TenantHandle {
131    runtime: Arc<TenantRuntime>,
132}
133
134impl RunnerHost {
135    pub async fn start(&self) -> Result<()> {
136        #[cfg(feature = "telemetry")]
137        {
138            boot::init(&self.health, self.telemetry.as_ref())?;
139        }
140        #[cfg(not(feature = "telemetry"))]
141        {
142            boot::init(&self.health, None)?;
143        }
144        Ok(())
145    }
146
147    pub async fn stop(&self) -> Result<()> {
148        self.active.replace(HashMap::new());
149        Ok(())
150    }
151
152    pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
153        let archive_source = if is_pack_archive(pack_path) {
154            Some(pack_path)
155        } else {
156            None
157        };
158        let runtime = self
159            .prepare_runtime(tenant, pack_path, archive_source)
160            .await
161            .with_context(|| format!("failed to load tenant {tenant}"))?;
162        let mut next = (*self.active.snapshot()).clone();
163        next.insert(tenant.to_string(), runtime);
164        self.active.replace(next);
165        tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
166        Ok(())
167    }
168
169    pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
170        let runtime = self
171            .active
172            .load(tenant)
173            .with_context(|| format!("tenant {tenant} not loaded"))?;
174        let flow_id = resolve_flow_id(&runtime, &activity)?;
175        let action = activity.action().map(|value| value.to_string());
176        let session = activity.session_id().map(|value| value.to_string());
177        let provider = activity.provider_id().map(|value| value.to_string());
178        let channel = activity.channel().map(|value| value.to_string());
179        let conversation = activity.conversation().map(|value| value.to_string());
180        let user = activity.user().map(|value| value.to_string());
181        let flow_type = activity
182            .flow_type()
183            .map(|value| value.to_string())
184            .or_else(|| {
185                runtime
186                    .engine()
187                    .flow_by_id(&flow_id)
188                    .map(|desc| desc.flow_type.clone())
189            });
190        let payload = activity.into_payload();
191
192        let envelope = IngressEnvelope {
193            tenant: tenant.to_string(),
194            env: std::env::var("GREENTIC_ENV").ok(),
195            flow_id: flow_id.clone(),
196            flow_type,
197            action,
198            session_hint: session,
199            provider,
200            channel,
201            conversation,
202            user,
203            activity_id: None,
204            timestamp: None,
205            payload,
206            metadata: None,
207        }
208        .canonicalize();
209
210        let result = runtime.state_machine().handle(envelope).await?;
211        Ok(normalize_replies(result, tenant))
212    }
213
214    pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
215        self.active
216            .load(tenant)
217            .map(|runtime| TenantHandle { runtime })
218    }
219
220    pub fn active_packs(&self) -> Arc<ActivePacks> {
221        Arc::clone(&self.active)
222    }
223
224    pub fn health_state(&self) -> Arc<HealthState> {
225        Arc::clone(&self.health)
226    }
227
228    pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
229        Arc::clone(&self.wasi_policy)
230    }
231
232    pub fn session_store(&self) -> DynSessionStore {
233        Arc::clone(&self.session_store)
234    }
235
236    pub fn state_store(&self) -> DynStateStore {
237        Arc::clone(&self.state_store)
238    }
239
240    pub fn session_host(&self) -> Arc<dyn SessionHost> {
241        Arc::clone(&self.session_host)
242    }
243
244    pub fn state_host(&self) -> Arc<dyn StateHost> {
245        Arc::clone(&self.state_host)
246    }
247
248    pub fn secrets_manager(&self) -> DynSecretsManager {
249        Arc::clone(&self.secrets_manager)
250    }
251
252    pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
253        self.configs.clone()
254    }
255
256    async fn prepare_runtime(
257        &self,
258        tenant: &str,
259        pack_path: &Path,
260        archive_source: Option<&Path>,
261    ) -> Result<Arc<TenantRuntime>> {
262        let config = self
263            .configs
264            .get(tenant)
265            .cloned()
266            .with_context(|| format!("tenant {tenant} not registered"))?;
267        if config.tenant != tenant {
268            bail!(
269                "tenant mismatch: config declares '{}' but '{tenant}' was requested",
270                config.tenant
271            );
272        }
273        let runtime = TenantRuntime::load(
274            pack_path,
275            Arc::clone(&config),
276            None,
277            archive_source,
278            None,
279            self.wasi_policy(),
280            self.session_host(),
281            self.session_store(),
282            self.state_store(),
283            self.state_host(),
284            self.secrets_manager(),
285        )
286        .await?;
287        let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
288        runtime.register_timers(timers);
289        Ok(runtime)
290    }
291}
292
293impl TenantHandle {
294    pub fn config(&self) -> Arc<HostConfig> {
295        Arc::clone(self.runtime.config())
296    }
297
298    pub fn pack(&self) -> Arc<PackRuntime> {
299        self.runtime.pack()
300    }
301
302    pub fn engine(&self) -> Arc<FlowEngine> {
303        Arc::clone(self.runtime.engine())
304    }
305
306    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
307        self.runtime.overlays()
308    }
309
310    pub fn overlay_digests(&self) -> Vec<Option<String>> {
311        self.runtime.overlay_digests()
312    }
313}
314
315fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
316    if let Some(flow_id) = activity.flow_id() {
317        return Ok(flow_id.to_string());
318    }
319
320    runtime
321        .pack()
322        .metadata()
323        .entry_flows
324        .first()
325        .cloned()
326        .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))
327}
328
329fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
330    result
331        .as_array()
332        .cloned()
333        .unwrap_or_else(|| vec![result])
334        .into_iter()
335        .map(|payload| Activity::from_output(payload, tenant))
336        .collect()
337}
338
339fn is_pack_archive(path: &Path) -> bool {
340    path.extension()
341        .and_then(|ext| ext.to_str())
342        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
343        .unwrap_or(false)
344}