greentic_runner_host/
config.rs

1use std::collections::{HashMap, HashSet};
2#[cfg(feature = "mcp")]
3use std::env;
4use std::fs;
5use std::path::{Path, PathBuf};
6#[cfg(feature = "mcp")]
7use std::time::Duration;
8
9use anyhow::{Context, Result};
10#[cfg(feature = "mcp")]
11use greentic_mcp::{ExecConfig, RuntimePolicy, ToolStore, VerifyPolicy};
12use serde::Deserialize;
13use serde_yaml_bw as serde_yaml;
14
15#[derive(Debug, Clone)]
16pub struct HostConfig {
17    pub tenant: String,
18    pub bindings_path: PathBuf,
19    pub flow_type_bindings: HashMap<String, FlowBinding>,
20    pub mcp: McpConfig,
21    pub rate_limits: RateLimits,
22    pub http_enabled: bool,
23    pub secrets_policy: SecretsPolicy,
24    pub webhook_policy: WebhookPolicy,
25    pub timers: Vec<TimerBinding>,
26}
27
28#[derive(Debug, Clone, Deserialize)]
29pub struct BindingsFile {
30    pub tenant: String,
31    #[serde(default)]
32    pub flow_type_bindings: HashMap<String, FlowBinding>,
33    pub mcp: McpConfig,
34    #[serde(default)]
35    pub rate_limits: RateLimits,
36    #[serde(default)]
37    pub timers: Vec<TimerBinding>,
38}
39
40#[derive(Debug, Clone, Deserialize)]
41pub struct FlowBinding {
42    pub adapter: String,
43    #[serde(default)]
44    pub config: serde_yaml::Value,
45    #[serde(default)]
46    pub secrets: Vec<String>,
47}
48
49#[derive(Debug, Clone, Deserialize)]
50pub struct McpConfig {
51    pub store: serde_yaml::Value,
52    #[serde(default)]
53    pub security: serde_yaml::Value,
54    #[serde(default)]
55    pub runtime: serde_yaml::Value,
56    #[serde(default)]
57    pub http_enabled: Option<bool>,
58    #[serde(default)]
59    pub retry: Option<McpRetryConfig>,
60}
61
62#[derive(Debug, Clone, Deserialize)]
63pub struct RateLimits {
64    #[serde(default = "default_messaging_qps")]
65    pub messaging_send_qps: u32,
66    #[serde(default = "default_messaging_burst")]
67    pub messaging_burst: u32,
68}
69
70#[derive(Debug, Clone)]
71pub struct SecretsPolicy {
72    allowed: HashSet<String>,
73    allow_all: bool,
74}
75
76#[derive(Debug, Clone, Deserialize)]
77pub struct McpRetryConfig {
78    #[serde(default = "default_mcp_retry_attempts")]
79    pub max_attempts: u32,
80    #[serde(default = "default_mcp_retry_base_delay_ms")]
81    pub base_delay_ms: u64,
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct WebhookPolicy {
86    allow_paths: Vec<String>,
87    deny_paths: Vec<String>,
88}
89
90#[derive(Debug, Clone, Deserialize)]
91pub struct WebhookBindingConfig {
92    #[serde(default)]
93    pub allow_paths: Vec<String>,
94    #[serde(default)]
95    pub deny_paths: Vec<String>,
96}
97
98#[derive(Debug, Clone, Deserialize)]
99pub struct TimerBinding {
100    pub flow_id: String,
101    pub cron: String,
102    #[serde(default)]
103    pub schedule_id: Option<String>,
104}
105
106impl HostConfig {
107    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
108        let path = path.as_ref();
109        let content = fs::read_to_string(path)
110            .with_context(|| format!("failed to read bindings file {path:?}"))?;
111        let bindings: BindingsFile = serde_yaml::from_str(&content)
112            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
113
114        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
115        let http_enabled = bindings
116            .mcp
117            .http_enabled
118            .unwrap_or(bindings.flow_type_bindings.contains_key("messaging"));
119        let webhook_policy = bindings
120            .flow_type_bindings
121            .get("webhook")
122            .and_then(|binding| {
123                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
124                    .map(WebhookPolicy::from)
125                    .map_err(|err| {
126                        tracing::warn!(error = %err, "failed to parse webhook binding config");
127                        err
128                    })
129                    .ok()
130            })
131            .unwrap_or_default();
132
133        Ok(Self {
134            tenant: bindings.tenant.clone(),
135            bindings_path: path.to_path_buf(),
136            flow_type_bindings: bindings.flow_type_bindings.clone(),
137            mcp: bindings.mcp.clone(),
138            rate_limits: bindings.rate_limits.clone(),
139            http_enabled,
140            secrets_policy,
141            webhook_policy,
142            timers: bindings.timers.clone(),
143        })
144    }
145
146    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
147        self.flow_type_bindings.get("messaging")
148    }
149
150    pub fn mcp_retry_config(&self) -> McpRetryConfig {
151        self.mcp.retry.clone().unwrap_or_default()
152    }
153
154    #[cfg(feature = "mcp")]
155    pub fn mcp_exec_config(&self) -> Result<ExecConfig> {
156        self.mcp
157            .to_exec_config(self.bindings_path.parent())
158            .context("failed to build MCP exec configuration")
159    }
160}
161
162impl SecretsPolicy {
163    fn from_bindings(bindings: &BindingsFile) -> Self {
164        let allowed = bindings
165            .flow_type_bindings
166            .values()
167            .flat_map(|binding| binding.secrets.iter().cloned())
168            .collect::<HashSet<_>>();
169        Self {
170            allowed,
171            allow_all: false,
172        }
173    }
174
175    pub fn is_allowed(&self, key: &str) -> bool {
176        self.allow_all || self.allowed.contains(key)
177    }
178
179    pub fn allow_all() -> Self {
180        Self {
181            allowed: HashSet::new(),
182            allow_all: true,
183        }
184    }
185
186    pub fn from_allowed<I, S>(iter: I) -> Self
187    where
188        I: IntoIterator<Item = S>,
189        S: Into<String>,
190    {
191        Self {
192            allowed: iter.into_iter().map(Into::into).collect(),
193            allow_all: false,
194        }
195    }
196}
197
198impl Default for RateLimits {
199    fn default() -> Self {
200        Self {
201            messaging_send_qps: default_messaging_qps(),
202            messaging_burst: default_messaging_burst(),
203        }
204    }
205}
206
207fn default_messaging_qps() -> u32 {
208    10
209}
210
211fn default_messaging_burst() -> u32 {
212    20
213}
214
215impl From<WebhookBindingConfig> for WebhookPolicy {
216    fn from(value: WebhookBindingConfig) -> Self {
217        Self {
218            allow_paths: value.allow_paths,
219            deny_paths: value.deny_paths,
220        }
221    }
222}
223
224impl WebhookPolicy {
225    pub fn is_allowed(&self, path: &str) -> bool {
226        if self
227            .deny_paths
228            .iter()
229            .any(|prefix| path.starts_with(prefix))
230        {
231            return false;
232        }
233
234        if self.allow_paths.is_empty() {
235            return true;
236        }
237
238        self.allow_paths
239            .iter()
240            .any(|prefix| path.starts_with(prefix))
241    }
242}
243
244impl TimerBinding {
245    pub fn schedule_id(&self) -> &str {
246        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
247    }
248}
249
250#[cfg(feature = "mcp")]
251#[derive(Debug, Deserialize)]
252#[serde(tag = "kind", rename_all = "kebab-case")]
253enum StoreBinding {
254    #[serde(rename = "http-single")]
255    HttpSingle {
256        name: String,
257        url: String,
258        #[serde(default)]
259        cache_dir: Option<String>,
260    },
261    #[serde(rename = "local-dir")]
262    LocalDir { path: String },
263}
264
265#[cfg(feature = "mcp")]
266#[derive(Debug, Default, Deserialize)]
267struct RuntimeBinding {
268    #[serde(default)]
269    max_memory_mb: Option<u64>,
270    #[serde(default)]
271    timeout_ms: Option<u64>,
272    #[serde(default)]
273    fuel: Option<u64>,
274    #[serde(default)]
275    per_call_timeout_ms: Option<u64>,
276    #[serde(default)]
277    max_attempts: Option<u32>,
278    #[serde(default)]
279    base_backoff_ms: Option<u64>,
280}
281
282#[cfg(feature = "mcp")]
283#[derive(Debug, Default, Deserialize)]
284struct SecurityBinding {
285    #[serde(default)]
286    require_signature: bool,
287    #[serde(default)]
288    required_digests: HashMap<String, String>,
289    #[serde(default)]
290    trusted_signers: Vec<String>,
291}
292
293#[cfg(feature = "mcp")]
294impl McpConfig {
295    fn to_exec_config(&self, base_dir: Option<&Path>) -> Result<ExecConfig> {
296        let store_cfg: StoreBinding = serde_yaml::from_value(self.store.clone())
297            .context("invalid MCP store configuration")?;
298        let runtime_cfg: RuntimeBinding =
299            serde_yaml::from_value(self.runtime.clone()).unwrap_or_default();
300        let security_cfg: SecurityBinding =
301            serde_yaml::from_value(self.security.clone()).unwrap_or_default();
302
303        let store = match store_cfg {
304            StoreBinding::HttpSingle {
305                name,
306                url,
307                cache_dir,
308            } => ToolStore::HttpSingleFile {
309                name,
310                url,
311                cache_dir: resolve_optional_path(base_dir, cache_dir)
312                    .unwrap_or_else(|| default_cache_dir(base_dir)),
313            },
314            StoreBinding::LocalDir { path } => {
315                ToolStore::LocalDir(resolve_required_path(base_dir, path))
316            }
317        };
318
319        let runtime = RuntimePolicy {
320            fuel: runtime_cfg.fuel,
321            max_memory: runtime_cfg.max_memory_mb.map(|mb| mb * 1024 * 1024),
322            wallclock_timeout: Duration::from_millis(runtime_cfg.timeout_ms.unwrap_or(30_000)),
323            per_call_timeout: Duration::from_millis(
324                runtime_cfg.per_call_timeout_ms.unwrap_or(10_000),
325            ),
326            max_attempts: runtime_cfg.max_attempts.unwrap_or(1),
327            base_backoff: Duration::from_millis(runtime_cfg.base_backoff_ms.unwrap_or(100)),
328        };
329
330        let security = VerifyPolicy {
331            allow_unverified: !security_cfg.require_signature,
332            required_digests: security_cfg.required_digests,
333            trusted_signers: security_cfg.trusted_signers,
334        };
335
336        Ok(ExecConfig {
337            store,
338            security,
339            runtime,
340            http_enabled: self.http_enabled.unwrap_or(false),
341        })
342    }
343}
344
345impl Default for McpRetryConfig {
346    fn default() -> Self {
347        Self {
348            max_attempts: default_mcp_retry_attempts(),
349            base_delay_ms: default_mcp_retry_base_delay_ms(),
350        }
351    }
352}
353
354fn default_mcp_retry_attempts() -> u32 {
355    3
356}
357
358fn default_mcp_retry_base_delay_ms() -> u64 {
359    250
360}
361
362#[cfg(feature = "mcp")]
363fn resolve_required_path(base: Option<&Path>, value: String) -> PathBuf {
364    let candidate = PathBuf::from(&value);
365    if candidate.is_absolute() {
366        candidate
367    } else if let Some(base) = base {
368        base.join(candidate)
369    } else {
370        PathBuf::from(value)
371    }
372}
373
374#[cfg(feature = "mcp")]
375fn resolve_optional_path(base: Option<&Path>, value: Option<String>) -> Option<PathBuf> {
376    value.map(|v| resolve_required_path(base, v))
377}
378
379#[cfg(feature = "mcp")]
380fn default_cache_dir(base: Option<&Path>) -> PathBuf {
381    if let Some(dir) = env::var_os("GREENTIC_CACHE_DIR") {
382        PathBuf::from(dir)
383    } else if let Some(base) = base {
384        base.join(".greentic/tool-cache")
385    } else {
386        env::temp_dir().join("greentic-tool-cache")
387    }
388}