greentic_runner_host/
host.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7use tokio::sync::RwLock;
8
9use crate::activity::Activity;
10use crate::config::HostConfig;
11use crate::pack::PackRuntime;
12use crate::runner::engine::{FlowContext, FlowEngine};
13use crate::runner::mocks::MockLayer;
14
15#[cfg(feature = "telemetry")]
16pub use greentic_telemetry::OtlpConfig as TelemetryCfg;
17#[cfg(feature = "telemetry")]
18use greentic_telemetry::init_otlp;
19
20/// Builder for composing multi-tenant host instances.
21pub struct HostBuilder {
22    configs: HashMap<String, HostConfig>,
23    #[cfg(feature = "telemetry")]
24    telemetry: Option<TelemetryCfg>,
25}
26
27impl HostBuilder {
28    /// Create an empty builder.
29    pub fn new() -> Self {
30        Self {
31            configs: HashMap::new(),
32            #[cfg(feature = "telemetry")]
33            telemetry: None,
34        }
35    }
36
37    /// Register a tenant configuration.
38    pub fn with_config(mut self, config: HostConfig) -> Self {
39        self.configs.insert(config.tenant.clone(), config);
40        self
41    }
42
43    /// Attach telemetry configuration (requires the `telemetry` feature).
44    #[cfg(feature = "telemetry")]
45    pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
46        self.telemetry = Some(telemetry);
47        self
48    }
49
50    /// Build a [`RunnerHost`].
51    pub fn build(self) -> Result<RunnerHost> {
52        if self.configs.is_empty() {
53            bail!("at least one tenant configuration is required");
54        }
55        let configs = self
56            .configs
57            .into_iter()
58            .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
59            .collect();
60        Ok(RunnerHost {
61            configs,
62            tenants: RwLock::new(HashMap::new()),
63            #[cfg(feature = "telemetry")]
64            telemetry: self.telemetry,
65        })
66    }
67}
68
69impl Default for HostBuilder {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75/// Runtime host that manages tenant-bound packs and flow execution.
76pub struct RunnerHost {
77    configs: HashMap<String, Arc<HostConfig>>,
78    tenants: RwLock<HashMap<String, Arc<TenantRuntime>>>,
79    #[cfg(feature = "telemetry")]
80    telemetry: Option<TelemetryCfg>,
81}
82
83struct TenantRuntime {
84    config: Arc<HostConfig>,
85    pack: Arc<PackRuntime>,
86    engine: Arc<FlowEngine>,
87    mocks: Option<Arc<MockLayer>>,
88}
89
90/// Handle exposing tenant internals for embedding hosts (e.g. CLI server).
91#[derive(Clone)]
92pub struct TenantHandle {
93    config: Arc<HostConfig>,
94    pack: Arc<PackRuntime>,
95    engine: Arc<FlowEngine>,
96}
97
98impl RunnerHost {
99    /// Initialise telemetry sinks (no-op when the `telemetry` feature is disabled).
100    pub async fn start(&self) -> Result<()> {
101        #[cfg(feature = "telemetry")]
102        if let Some(cfg) = &self.telemetry {
103            init_otlp(cfg.clone(), Vec::new()).map_err(|err| anyhow!(err.to_string()))?;
104        }
105        Ok(())
106    }
107
108    /// Drop loaded packs and release resources.
109    pub async fn stop(&self) -> Result<()> {
110        self.tenants.write().await.clear();
111        Ok(())
112    }
113
114    /// Load a pack for the given tenant.
115    pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
116        let config = self
117            .configs
118            .get(tenant)
119            .cloned()
120            .with_context(|| format!("tenant {tenant} not registered"))?;
121        if config.tenant != tenant {
122            bail!(
123                "tenant mismatch: config declares '{}' but '{tenant}' was requested",
124                config.tenant
125            );
126        }
127
128        let pack = Arc::new(
129            PackRuntime::load(pack_path, Arc::clone(&config), None, None)
130                .await
131                .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
132        );
133        let engine = Arc::new(
134            FlowEngine::new(Arc::clone(&pack), Arc::clone(&config))
135                .await
136                .context("failed to prime flow engine")?,
137        );
138        let runtime = Arc::new(TenantRuntime {
139            config,
140            pack,
141            engine,
142            mocks: None,
143        });
144        self.tenants
145            .write()
146            .await
147            .insert(tenant.to_string(), runtime);
148        tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
149        Ok(())
150    }
151
152    /// Dispatch an activity to the appropriate flow and capture responses.
153    pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
154        let runtime = self
155            .tenants
156            .read()
157            .await
158            .get(tenant)
159            .cloned()
160            .with_context(|| format!("tenant {tenant} not loaded"))?;
161        let flow_id = resolve_flow_id(&runtime, &activity)?;
162        let action = activity.action().map(|value| value.to_string());
163        let session = activity.session_id().map(|value| value.to_string());
164        let provider = activity.provider_id().map(|value| value.to_string());
165        let payload = activity.into_payload();
166
167        let result = runtime
168            .engine
169            .execute(
170                FlowContext {
171                    tenant,
172                    flow_id: &flow_id,
173                    node_id: None,
174                    tool: None,
175                    action: action.as_deref(),
176                    session_id: session.as_deref(),
177                    provider_id: provider.as_deref(),
178                    retry_config: runtime.config.mcp_retry_config().into(),
179                    observer: None,
180                    mocks: runtime.mocks.as_deref(),
181                },
182                payload,
183            )
184            .await?;
185
186        Ok(normalize_replies(result, tenant))
187    }
188
189    /// Retrieve a handle to the tenant runtime.
190    pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
191        self.tenants
192            .read()
193            .await
194            .get(tenant)
195            .map(|runtime| TenantHandle {
196                config: Arc::clone(&runtime.config),
197                pack: Arc::clone(&runtime.pack),
198                engine: Arc::clone(&runtime.engine),
199            })
200    }
201}
202
203impl TenantHandle {
204    /// Borrow the tenant configuration.
205    pub fn config(&self) -> Arc<HostConfig> {
206        Arc::clone(&self.config)
207    }
208
209    /// Borrow the loaded pack runtime.
210    pub fn pack(&self) -> Arc<PackRuntime> {
211        Arc::clone(&self.pack)
212    }
213
214    /// Borrow the flow engine.
215    pub fn engine(&self) -> Arc<FlowEngine> {
216        Arc::clone(&self.engine)
217    }
218}
219
220fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
221    if let Some(flow_id) = activity.flow_id() {
222        return Ok(flow_id.to_string());
223    }
224
225    let flow_type = activity
226        .flow_type()
227        .ok_or_else(|| anyhow!("activity is missing a flow type hint"))?;
228    runtime
229        .engine
230        .flow_by_type(flow_type)
231        .map(|descriptor| descriptor.id.clone())
232        .ok_or_else(|| anyhow!("no flow registered for type {flow_type}"))
233}
234
235fn normalize_replies(value: Value, tenant: &str) -> Vec<Activity> {
236    match value {
237        Value::Array(values) => values
238            .into_iter()
239            .map(|payload| Activity::from_output(payload, tenant))
240            .collect(),
241        other => vec![Activity::from_output(other, tenant)],
242    }
243}