greentic_runner_host/
host.rs1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7use tokio::sync::RwLock;
8
9use crate::activity::Activity;
10use crate::config::HostConfig;
11use crate::pack::PackRuntime;
12use crate::runner::engine::{FlowContext, FlowEngine};
13use crate::runner::mocks::MockLayer;
14
15#[cfg(feature = "telemetry")]
16pub use greentic_telemetry::OtlpConfig as TelemetryCfg;
17#[cfg(feature = "telemetry")]
18use greentic_telemetry::init_otlp;
19
20pub struct HostBuilder {
22 configs: HashMap<String, HostConfig>,
23 #[cfg(feature = "telemetry")]
24 telemetry: Option<TelemetryCfg>,
25}
26
27impl HostBuilder {
28 pub fn new() -> Self {
30 Self {
31 configs: HashMap::new(),
32 #[cfg(feature = "telemetry")]
33 telemetry: None,
34 }
35 }
36
37 pub fn with_config(mut self, config: HostConfig) -> Self {
39 self.configs.insert(config.tenant.clone(), config);
40 self
41 }
42
43 #[cfg(feature = "telemetry")]
45 pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
46 self.telemetry = Some(telemetry);
47 self
48 }
49
50 pub fn build(self) -> Result<RunnerHost> {
52 if self.configs.is_empty() {
53 bail!("at least one tenant configuration is required");
54 }
55 let configs = self
56 .configs
57 .into_iter()
58 .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
59 .collect();
60 Ok(RunnerHost {
61 configs,
62 tenants: RwLock::new(HashMap::new()),
63 #[cfg(feature = "telemetry")]
64 telemetry: self.telemetry,
65 })
66 }
67}
68
69impl Default for HostBuilder {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75pub struct RunnerHost {
77 configs: HashMap<String, Arc<HostConfig>>,
78 tenants: RwLock<HashMap<String, Arc<TenantRuntime>>>,
79 #[cfg(feature = "telemetry")]
80 telemetry: Option<TelemetryCfg>,
81}
82
83struct TenantRuntime {
84 config: Arc<HostConfig>,
85 pack: Arc<PackRuntime>,
86 engine: Arc<FlowEngine>,
87 mocks: Option<Arc<MockLayer>>,
88}
89
90#[derive(Clone)]
92pub struct TenantHandle {
93 config: Arc<HostConfig>,
94 pack: Arc<PackRuntime>,
95 engine: Arc<FlowEngine>,
96}
97
98impl RunnerHost {
99 pub async fn start(&self) -> Result<()> {
101 #[cfg(feature = "telemetry")]
102 if let Some(cfg) = &self.telemetry {
103 init_otlp(cfg.clone(), Vec::new()).map_err(|err| anyhow!(err.to_string()))?;
104 }
105 Ok(())
106 }
107
108 pub async fn stop(&self) -> Result<()> {
110 self.tenants.write().await.clear();
111 Ok(())
112 }
113
114 pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
116 let config = self
117 .configs
118 .get(tenant)
119 .cloned()
120 .with_context(|| format!("tenant {tenant} not registered"))?;
121 if config.tenant != tenant {
122 bail!(
123 "tenant mismatch: config declares '{}' but '{tenant}' was requested",
124 config.tenant
125 );
126 }
127
128 let pack = Arc::new(
129 PackRuntime::load(pack_path, Arc::clone(&config), None, None)
130 .await
131 .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
132 );
133 let engine = Arc::new(
134 FlowEngine::new(Arc::clone(&pack), Arc::clone(&config))
135 .await
136 .context("failed to prime flow engine")?,
137 );
138 let runtime = Arc::new(TenantRuntime {
139 config,
140 pack,
141 engine,
142 mocks: None,
143 });
144 self.tenants
145 .write()
146 .await
147 .insert(tenant.to_string(), runtime);
148 tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
149 Ok(())
150 }
151
152 pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
154 let runtime = self
155 .tenants
156 .read()
157 .await
158 .get(tenant)
159 .cloned()
160 .with_context(|| format!("tenant {tenant} not loaded"))?;
161 let flow_id = resolve_flow_id(&runtime, &activity)?;
162 let action = activity.action().map(|value| value.to_string());
163 let session = activity.session_id().map(|value| value.to_string());
164 let provider = activity.provider_id().map(|value| value.to_string());
165 let payload = activity.into_payload();
166
167 let result = runtime
168 .engine
169 .execute(
170 FlowContext {
171 tenant,
172 flow_id: &flow_id,
173 node_id: None,
174 tool: None,
175 action: action.as_deref(),
176 session_id: session.as_deref(),
177 provider_id: provider.as_deref(),
178 retry_config: runtime.config.mcp_retry_config().into(),
179 observer: None,
180 mocks: runtime.mocks.as_deref(),
181 },
182 payload,
183 )
184 .await?;
185
186 Ok(normalize_replies(result, tenant))
187 }
188
189 pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
191 self.tenants
192 .read()
193 .await
194 .get(tenant)
195 .map(|runtime| TenantHandle {
196 config: Arc::clone(&runtime.config),
197 pack: Arc::clone(&runtime.pack),
198 engine: Arc::clone(&runtime.engine),
199 })
200 }
201}
202
203impl TenantHandle {
204 pub fn config(&self) -> Arc<HostConfig> {
206 Arc::clone(&self.config)
207 }
208
209 pub fn pack(&self) -> Arc<PackRuntime> {
211 Arc::clone(&self.pack)
212 }
213
214 pub fn engine(&self) -> Arc<FlowEngine> {
216 Arc::clone(&self.engine)
217 }
218}
219
220fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<String> {
221 if let Some(flow_id) = activity.flow_id() {
222 return Ok(flow_id.to_string());
223 }
224
225 let flow_type = activity
226 .flow_type()
227 .ok_or_else(|| anyhow!("activity is missing a flow type hint"))?;
228 runtime
229 .engine
230 .flow_by_type(flow_type)
231 .map(|descriptor| descriptor.id.clone())
232 .ok_or_else(|| anyhow!("no flow registered for type {flow_type}"))
233}
234
235fn normalize_replies(value: Value, tenant: &str) -> Vec<Activity> {
236 match value {
237 Value::Array(values) => values
238 .into_iter()
239 .map(|payload| Activity::from_output(payload, tenant))
240 .collect(),
241 other => vec![Activity::from_output(other, tenant)],
242 }
243}