Skip to main content

greentic_runner_host/
host.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::Activity;
9use crate::boot;
10use crate::config::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::engine::runtime::IngressEnvelope;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runner::engine::FlowEngine;
17use crate::runtime::{ActivePacks, TenantRuntime};
18use crate::secrets::{DynSecretsManager, default_manager};
19use crate::storage::{
20    DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
21    state_host_from,
22};
23use crate::wasi::RunnerWasiPolicy;
24
25#[cfg(feature = "telemetry")]
26#[derive(Clone, Debug)]
27pub struct TelemetryCfg {
28    pub config: greentic_telemetry::TelemetryConfig,
29    pub export: greentic_telemetry::export::ExportConfig,
30}
31#[cfg(not(feature = "telemetry"))]
32#[derive(Clone, Debug)]
33pub struct TelemetryCfg;
34
35/// Builder for composing multi-tenant host instances.
36pub struct HostBuilder {
37    configs: HashMap<String, HostConfig>,
38    #[cfg(feature = "telemetry")]
39    telemetry: Option<TelemetryCfg>,
40    wasi_policy: RunnerWasiPolicy,
41    secrets: Option<DynSecretsManager>,
42}
43
44impl HostBuilder {
45    pub fn new() -> Self {
46        Self {
47            configs: HashMap::new(),
48            #[cfg(feature = "telemetry")]
49            telemetry: None,
50            wasi_policy: RunnerWasiPolicy::default(),
51            secrets: None,
52        }
53    }
54
55    pub fn with_config(mut self, config: HostConfig) -> Self {
56        self.configs.insert(config.tenant.clone(), config);
57        self
58    }
59
60    #[cfg(feature = "telemetry")]
61    pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
62        self.telemetry = Some(telemetry);
63        self
64    }
65
66    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
67        self.wasi_policy = policy;
68        self
69    }
70
71    pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
72        self.secrets = Some(manager);
73        self
74    }
75
76    pub fn build(self) -> Result<RunnerHost> {
77        if self.configs.is_empty() {
78            bail!("at least one tenant configuration is required");
79        }
80        let wasi_policy = Arc::new(self.wasi_policy);
81        let configs = self
82            .configs
83            .into_iter()
84            .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
85            .collect();
86        let session_store = new_session_store();
87        let session_host = session_host_from(Arc::clone(&session_store));
88        let state_store = new_state_store();
89        let state_host = state_host_from(Arc::clone(&state_store));
90        let secrets = match self.secrets {
91            Some(manager) => manager,
92            None => default_manager().context("failed to initialise default secrets backend")?,
93        };
94        Ok(RunnerHost {
95            configs,
96            active: Arc::new(ActivePacks::new()),
97            health: Arc::new(HealthState::new()),
98            session_store,
99            state_store,
100            session_host,
101            state_host,
102            wasi_policy,
103            secrets_manager: secrets,
104            #[cfg(feature = "telemetry")]
105            telemetry: self.telemetry,
106        })
107    }
108}
109
110impl Default for HostBuilder {
111    fn default() -> Self {
112        Self::new()
113    }
114}
115
116/// Runtime host that manages tenant-bound packs and flow execution.
117pub struct RunnerHost {
118    configs: HashMap<String, Arc<HostConfig>>,
119    active: Arc<ActivePacks>,
120    health: Arc<HealthState>,
121    session_store: DynSessionStore,
122    state_store: DynStateStore,
123    session_host: Arc<dyn SessionHost>,
124    state_host: Arc<dyn StateHost>,
125    wasi_policy: Arc<RunnerWasiPolicy>,
126    secrets_manager: DynSecretsManager,
127    #[cfg(feature = "telemetry")]
128    telemetry: Option<TelemetryCfg>,
129}
130
131/// Handle exposing tenant internals for embedding hosts (e.g. CLI server).
132#[derive(Clone)]
133pub struct TenantHandle {
134    runtime: Arc<TenantRuntime>,
135}
136
137impl RunnerHost {
138    pub async fn start(&self) -> Result<()> {
139        #[cfg(feature = "telemetry")]
140        {
141            boot::init(&self.health, self.telemetry.as_ref())?;
142        }
143        #[cfg(not(feature = "telemetry"))]
144        {
145            boot::init(&self.health, None)?;
146        }
147        Ok(())
148    }
149
150    pub async fn stop(&self) -> Result<()> {
151        self.active.replace(HashMap::new());
152        Ok(())
153    }
154
155    pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
156        let archive_source = if is_pack_archive(pack_path) {
157            Some(pack_path)
158        } else {
159            None
160        };
161        let runtime = self
162            .prepare_runtime(tenant, pack_path, archive_source)
163            .await
164            .with_context(|| format!("failed to load tenant {tenant}"))?;
165        let mut next = (*self.active.snapshot()).clone();
166        next.insert(tenant.to_string(), runtime);
167        self.active.replace(next);
168        tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
169        Ok(())
170    }
171
172    pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
173        let runtime = self
174            .active
175            .load(tenant)
176            .with_context(|| format!("tenant {tenant} not loaded"))?;
177        let (pack_id, flow_id) = resolve_flow_id(&runtime, &activity)?;
178        let action = activity.action().map(|value| value.to_string());
179        let session = activity.session_id().map(|value| value.to_string());
180        let provider = activity.provider_id().map(|value| value.to_string());
181        let channel = activity.channel().map(|value| value.to_string());
182        let conversation = activity.conversation().map(|value| value.to_string());
183        let user = activity.user().map(|value| value.to_string());
184        let flow_type = activity
185            .flow_type()
186            .map(|value| value.to_string())
187            .or_else(|| {
188                runtime
189                    .engine()
190                    .flow_by_key(&pack_id, &flow_id)
191                    .map(|desc| desc.flow_type.clone())
192            });
193        let payload = activity.into_payload();
194
195        let envelope = IngressEnvelope {
196            tenant: tenant.to_string(),
197            env: std::env::var("GREENTIC_ENV").ok(),
198            pack_id: Some(pack_id.clone()),
199            flow_id: flow_id.clone(),
200            flow_type,
201            action,
202            session_hint: session,
203            provider,
204            channel,
205            conversation,
206            user,
207            activity_id: None,
208            timestamp: None,
209            payload,
210            metadata: None,
211            reply_scope: None,
212        }
213        .canonicalize();
214
215        let result = runtime.state_machine().handle(envelope).await?;
216        Ok(normalize_replies(result, tenant))
217    }
218
219    pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
220        self.active
221            .load(tenant)
222            .map(|runtime| TenantHandle { runtime })
223    }
224
225    pub fn active_packs(&self) -> Arc<ActivePacks> {
226        Arc::clone(&self.active)
227    }
228
229    pub fn health_state(&self) -> Arc<HealthState> {
230        Arc::clone(&self.health)
231    }
232
233    pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
234        Arc::clone(&self.wasi_policy)
235    }
236
237    pub fn session_store(&self) -> DynSessionStore {
238        Arc::clone(&self.session_store)
239    }
240
241    pub fn state_store(&self) -> DynStateStore {
242        Arc::clone(&self.state_store)
243    }
244
245    pub fn session_host(&self) -> Arc<dyn SessionHost> {
246        Arc::clone(&self.session_host)
247    }
248
249    pub fn state_host(&self) -> Arc<dyn StateHost> {
250        Arc::clone(&self.state_host)
251    }
252
253    pub fn secrets_manager(&self) -> DynSecretsManager {
254        Arc::clone(&self.secrets_manager)
255    }
256
257    pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
258        self.configs.clone()
259    }
260
261    async fn prepare_runtime(
262        &self,
263        tenant: &str,
264        pack_path: &Path,
265        archive_source: Option<&Path>,
266    ) -> Result<Arc<TenantRuntime>> {
267        let config = self
268            .configs
269            .get(tenant)
270            .cloned()
271            .with_context(|| format!("tenant {tenant} not registered"))?;
272        if config.tenant != tenant {
273            bail!(
274                "tenant mismatch: config declares '{}' but '{tenant}' was requested",
275                config.tenant
276            );
277        }
278        let runtime = TenantRuntime::load(
279            pack_path,
280            Arc::clone(&config),
281            None,
282            archive_source,
283            None,
284            self.wasi_policy(),
285            self.session_host(),
286            self.session_store(),
287            self.state_store(),
288            self.state_host(),
289            self.secrets_manager(),
290        )
291        .await?;
292        let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
293        runtime.register_timers(timers);
294        Ok(runtime)
295    }
296}
297
298impl TenantHandle {
299    pub fn config(&self) -> Arc<HostConfig> {
300        Arc::clone(self.runtime.config())
301    }
302
303    pub fn pack(&self) -> Arc<PackRuntime> {
304        self.runtime.pack()
305    }
306
307    pub fn engine(&self) -> Arc<FlowEngine> {
308        Arc::clone(self.runtime.engine())
309    }
310
311    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
312        self.runtime.overlays()
313    }
314
315    pub fn overlay_digests(&self) -> Vec<Option<String>> {
316        self.runtime.overlay_digests()
317    }
318}
319
320fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<(String, String)> {
321    let engine = runtime.engine();
322    if let Some(flow_id) = activity.flow_id() {
323        if let Some(pack_id) = activity.pack_id() {
324            if engine.flow_by_key(pack_id, flow_id).is_none() {
325                bail!("flow {flow_id} not registered for pack {pack_id}");
326            }
327            return Ok((pack_id.to_string(), flow_id.to_string()));
328        }
329        if let Some(flow) = engine.flow_by_id(flow_id) {
330            return Ok((flow.pack_id.clone(), flow.id.clone()));
331        }
332        bail!("flow {flow_id} is ambiguous; pack_id is required");
333    }
334
335    if let Some(flow_type) = activity.flow_type() {
336        if let Some(pack_id) = activity.pack_id() {
337            if let Some(flow) = engine
338                .flows()
339                .iter()
340                .find(|flow| flow.pack_id == pack_id && flow.flow_type == flow_type)
341            {
342                return Ok((pack_id.to_string(), flow.id.clone()));
343            }
344            bail!("flow type {flow_type} not registered for pack {pack_id}");
345        }
346        if let Some(flow) = engine.flow_by_type(flow_type) {
347            return Ok((flow.pack_id.clone(), flow.id.clone()));
348        }
349        bail!("flow type {flow_type} is ambiguous; pack_id is required");
350    }
351
352    let pack = runtime.pack();
353    let flow_id = pack
354        .metadata()
355        .entry_flows
356        .first()
357        .cloned()
358        .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))?;
359    Ok((pack.metadata().pack_id.clone(), flow_id))
360}
361
362fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
363    result
364        .as_array()
365        .cloned()
366        .unwrap_or_else(|| vec![result])
367        .into_iter()
368        .map(|payload| Activity::from_output(payload, tenant))
369        .collect()
370}
371
372fn is_pack_archive(path: &Path) -> bool {
373    path.extension()
374        .and_then(|ext| ext.to_str())
375        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
376        .unwrap_or(false)
377}