greentic_runner_host/
config.rs

1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4#[cfg(feature = "mcp")]
5use std::time::Duration;
6
7use crate::oauth::OAuthBrokerConfig;
8use anyhow::{Context, Result};
9#[cfg(feature = "mcp")]
10use greentic_mcp::{ExecConfig, RuntimePolicy, ToolStore, VerifyPolicy};
11use serde::Deserialize;
12use serde_yaml_bw as serde_yaml;
13use std::env;
14
15#[derive(Debug, Clone)]
16pub struct HostConfig {
17    pub tenant: String,
18    pub bindings_path: PathBuf,
19    pub flow_type_bindings: HashMap<String, FlowBinding>,
20    pub mcp: McpConfig,
21    pub rate_limits: RateLimits,
22    pub http_enabled: bool,
23    pub secrets_policy: SecretsPolicy,
24    pub webhook_policy: WebhookPolicy,
25    pub timers: Vec<TimerBinding>,
26    pub oauth: Option<OAuthConfig>,
27}
28
29#[derive(Debug, Clone, Deserialize)]
30pub struct BindingsFile {
31    pub tenant: String,
32    #[serde(default)]
33    pub flow_type_bindings: HashMap<String, FlowBinding>,
34    pub mcp: McpConfig,
35    #[serde(default)]
36    pub rate_limits: RateLimits,
37    #[serde(default)]
38    pub timers: Vec<TimerBinding>,
39    #[serde(default)]
40    pub oauth: Option<OAuthConfig>,
41}
42
43#[derive(Debug, Clone, Deserialize)]
44pub struct FlowBinding {
45    pub adapter: String,
46    #[serde(default)]
47    pub config: serde_yaml::Value,
48    #[serde(default)]
49    pub secrets: Vec<String>,
50}
51
52#[derive(Debug, Clone, Deserialize)]
53pub struct McpConfig {
54    pub store: serde_yaml::Value,
55    #[serde(default)]
56    pub security: serde_yaml::Value,
57    #[serde(default)]
58    pub runtime: serde_yaml::Value,
59    #[serde(default)]
60    pub http_enabled: Option<bool>,
61    #[serde(default)]
62    pub retry: Option<McpRetryConfig>,
63}
64
65#[derive(Debug, Clone, Deserialize)]
66pub struct RateLimits {
67    #[serde(default = "default_messaging_qps")]
68    pub messaging_send_qps: u32,
69    #[serde(default = "default_messaging_burst")]
70    pub messaging_burst: u32,
71}
72
73#[derive(Debug, Clone)]
74pub struct SecretsPolicy {
75    allowed: HashSet<String>,
76    allow_all: bool,
77}
78
79#[derive(Debug, Clone, Deserialize)]
80pub struct McpRetryConfig {
81    #[serde(default = "default_mcp_retry_attempts")]
82    pub max_attempts: u32,
83    #[serde(default = "default_mcp_retry_base_delay_ms")]
84    pub base_delay_ms: u64,
85}
86
87#[derive(Debug, Clone, Default)]
88pub struct WebhookPolicy {
89    allow_paths: Vec<String>,
90    deny_paths: Vec<String>,
91}
92
93#[derive(Debug, Clone, Deserialize)]
94pub struct WebhookBindingConfig {
95    #[serde(default)]
96    pub allow_paths: Vec<String>,
97    #[serde(default)]
98    pub deny_paths: Vec<String>,
99}
100
101#[derive(Debug, Clone, Deserialize)]
102pub struct TimerBinding {
103    pub flow_id: String,
104    pub cron: String,
105    #[serde(default)]
106    pub schedule_id: Option<String>,
107}
108
109#[derive(Debug, Clone, Deserialize)]
110pub struct OAuthConfig {
111    pub http_base_url: String,
112    pub nats_url: String,
113    pub provider: String,
114    #[serde(default)]
115    pub env: Option<String>,
116    #[serde(default)]
117    pub team: Option<String>,
118}
119
120impl HostConfig {
121    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
122        let path = path.as_ref();
123        let content = fs::read_to_string(path)
124            .with_context(|| format!("failed to read bindings file {path:?}"))?;
125        let bindings: BindingsFile = serde_yaml::from_str(&content)
126            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
127
128        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
129        let http_enabled = bindings
130            .mcp
131            .http_enabled
132            .unwrap_or(bindings.flow_type_bindings.contains_key("messaging"));
133        let webhook_policy = bindings
134            .flow_type_bindings
135            .get("webhook")
136            .and_then(|binding| {
137                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
138                    .map(WebhookPolicy::from)
139                    .map_err(|err| {
140                        tracing::warn!(error = %err, "failed to parse webhook binding config");
141                        err
142                    })
143                    .ok()
144            })
145            .unwrap_or_default();
146
147        Ok(Self {
148            tenant: bindings.tenant.clone(),
149            bindings_path: path.to_path_buf(),
150            flow_type_bindings: bindings.flow_type_bindings.clone(),
151            mcp: bindings.mcp.clone(),
152            rate_limits: bindings.rate_limits.clone(),
153            http_enabled,
154            secrets_policy,
155            webhook_policy,
156            timers: bindings.timers.clone(),
157            oauth: bindings.oauth.clone(),
158        })
159    }
160
161    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
162        self.flow_type_bindings.get("messaging")
163    }
164
165    pub fn mcp_retry_config(&self) -> McpRetryConfig {
166        self.mcp.retry.clone().unwrap_or_default()
167    }
168
169    #[cfg(feature = "mcp")]
170    pub fn mcp_exec_config(&self) -> Result<ExecConfig> {
171        self.mcp
172            .to_exec_config(self.bindings_path.parent())
173            .context("failed to build MCP exec configuration")
174    }
175
176    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
177        let oauth = self.oauth.as_ref()?;
178        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
179        if !oauth.provider.is_empty() {
180            cfg.default_provider = Some(oauth.provider.clone());
181        }
182        if let Some(team) = &oauth.team
183            && !team.is_empty()
184        {
185            cfg.team = Some(team.clone());
186        }
187        Some(cfg)
188    }
189}
190
191impl SecretsPolicy {
192    fn from_bindings(bindings: &BindingsFile) -> Self {
193        let allowed = bindings
194            .flow_type_bindings
195            .values()
196            .flat_map(|binding| binding.secrets.iter().cloned())
197            .collect::<HashSet<_>>();
198        Self {
199            allowed,
200            allow_all: false,
201        }
202    }
203
204    pub fn is_allowed(&self, key: &str) -> bool {
205        self.allow_all || self.allowed.contains(key)
206    }
207
208    pub fn allow_all() -> Self {
209        Self {
210            allowed: HashSet::new(),
211            allow_all: true,
212        }
213    }
214
215    pub fn from_allowed<I, S>(iter: I) -> Self
216    where
217        I: IntoIterator<Item = S>,
218        S: Into<String>,
219    {
220        Self {
221            allowed: iter.into_iter().map(Into::into).collect(),
222            allow_all: false,
223        }
224    }
225}
226
227impl Default for RateLimits {
228    fn default() -> Self {
229        Self {
230            messaging_send_qps: default_messaging_qps(),
231            messaging_burst: default_messaging_burst(),
232        }
233    }
234}
235
236fn default_messaging_qps() -> u32 {
237    10
238}
239
240fn default_messaging_burst() -> u32 {
241    20
242}
243
244impl From<WebhookBindingConfig> for WebhookPolicy {
245    fn from(value: WebhookBindingConfig) -> Self {
246        Self {
247            allow_paths: value.allow_paths,
248            deny_paths: value.deny_paths,
249        }
250    }
251}
252
253impl WebhookPolicy {
254    pub fn is_allowed(&self, path: &str) -> bool {
255        if self
256            .deny_paths
257            .iter()
258            .any(|prefix| path.starts_with(prefix))
259        {
260            return false;
261        }
262
263        if self.allow_paths.is_empty() {
264            return true;
265        }
266
267        self.allow_paths
268            .iter()
269            .any(|prefix| path.starts_with(prefix))
270    }
271}
272
273impl TimerBinding {
274    pub fn schedule_id(&self) -> &str {
275        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use serde_yaml::Value;
283    use std::collections::HashMap;
284    use std::path::PathBuf;
285
286    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
287        HostConfig {
288            tenant: "tenant-a".to_string(),
289            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
290            flow_type_bindings: HashMap::new(),
291            mcp: McpConfig {
292                store: Value::Null(None),
293                security: Value::Null(None),
294                runtime: Value::Null(None),
295                http_enabled: Some(false),
296                retry: None,
297            },
298            rate_limits: RateLimits::default(),
299            http_enabled: false,
300            secrets_policy: SecretsPolicy::allow_all(),
301            webhook_policy: WebhookPolicy::default(),
302            timers: Vec::new(),
303            oauth,
304        }
305    }
306
307    #[test]
308    fn oauth_broker_config_absent_without_block() {
309        let cfg = host_config_with_oauth(None);
310        assert!(cfg.oauth_broker_config().is_none());
311    }
312
313    #[test]
314    fn oauth_broker_config_maps_fields() {
315        let cfg = host_config_with_oauth(Some(OAuthConfig {
316            http_base_url: "https://oauth.example/".into(),
317            nats_url: "nats://broker:4222".into(),
318            provider: "demo".into(),
319            env: None,
320            team: Some("ops".into()),
321        }));
322        let broker = cfg.oauth_broker_config().expect("missing broker config");
323        assert_eq!(broker.http_base_url, "https://oauth.example/");
324        assert_eq!(broker.nats_url, "nats://broker:4222");
325        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
326        assert_eq!(broker.team.as_deref(), Some("ops"));
327    }
328}
329
330#[cfg(feature = "mcp")]
331#[derive(Debug, Deserialize)]
332#[serde(tag = "kind", rename_all = "kebab-case")]
333enum StoreBinding {
334    #[serde(rename = "http-single")]
335    HttpSingle {
336        name: String,
337        url: String,
338        #[serde(default)]
339        cache_dir: Option<String>,
340    },
341    #[serde(rename = "local-dir")]
342    LocalDir { path: String },
343}
344
345#[cfg(feature = "mcp")]
346#[derive(Debug, Default, Deserialize)]
347struct RuntimeBinding {
348    #[serde(default)]
349    max_memory_mb: Option<u64>,
350    #[serde(default)]
351    timeout_ms: Option<u64>,
352    #[serde(default)]
353    fuel: Option<u64>,
354    #[serde(default)]
355    per_call_timeout_ms: Option<u64>,
356    #[serde(default)]
357    max_attempts: Option<u32>,
358    #[serde(default)]
359    base_backoff_ms: Option<u64>,
360}
361
362#[cfg(feature = "mcp")]
363#[derive(Debug, Default, Deserialize)]
364struct SecurityBinding {
365    #[serde(default)]
366    require_signature: bool,
367    #[serde(default)]
368    required_digests: HashMap<String, String>,
369    #[serde(default)]
370    trusted_signers: Vec<String>,
371}
372
373#[cfg(feature = "mcp")]
374impl McpConfig {
375    fn to_exec_config(&self, base_dir: Option<&Path>) -> Result<ExecConfig> {
376        let store_cfg: StoreBinding = serde_yaml::from_value(self.store.clone())
377            .context("invalid MCP store configuration")?;
378        let runtime_cfg: RuntimeBinding =
379            serde_yaml::from_value(self.runtime.clone()).unwrap_or_default();
380        let security_cfg: SecurityBinding =
381            serde_yaml::from_value(self.security.clone()).unwrap_or_default();
382
383        let store = match store_cfg {
384            StoreBinding::HttpSingle {
385                name,
386                url,
387                cache_dir,
388            } => ToolStore::HttpSingleFile {
389                name,
390                url,
391                cache_dir: resolve_optional_path(base_dir, cache_dir)
392                    .unwrap_or_else(|| default_cache_dir(base_dir)),
393            },
394            StoreBinding::LocalDir { path } => {
395                ToolStore::LocalDir(resolve_required_path(base_dir, path))
396            }
397        };
398
399        let runtime = RuntimePolicy {
400            fuel: runtime_cfg.fuel,
401            max_memory: runtime_cfg.max_memory_mb.map(|mb| mb * 1024 * 1024),
402            wallclock_timeout: Duration::from_millis(runtime_cfg.timeout_ms.unwrap_or(30_000)),
403            per_call_timeout: Duration::from_millis(
404                runtime_cfg.per_call_timeout_ms.unwrap_or(10_000),
405            ),
406            max_attempts: runtime_cfg.max_attempts.unwrap_or(1),
407            base_backoff: Duration::from_millis(runtime_cfg.base_backoff_ms.unwrap_or(100)),
408        };
409
410        let security = VerifyPolicy {
411            allow_unverified: !security_cfg.require_signature,
412            required_digests: security_cfg.required_digests,
413            trusted_signers: security_cfg.trusted_signers,
414        };
415
416        Ok(ExecConfig {
417            store,
418            security,
419            runtime,
420            http_enabled: self.http_enabled.unwrap_or(false),
421        })
422    }
423}
424
425impl Default for McpRetryConfig {
426    fn default() -> Self {
427        Self {
428            max_attempts: default_mcp_retry_attempts(),
429            base_delay_ms: default_mcp_retry_base_delay_ms(),
430        }
431    }
432}
433
434fn default_mcp_retry_attempts() -> u32 {
435    3
436}
437
438fn default_mcp_retry_base_delay_ms() -> u64 {
439    250
440}
441
442#[cfg(feature = "mcp")]
443fn resolve_required_path(base: Option<&Path>, value: String) -> PathBuf {
444    let candidate = PathBuf::from(&value);
445    if candidate.is_absolute() {
446        candidate
447    } else if let Some(base) = base {
448        base.join(candidate)
449    } else {
450        PathBuf::from(value)
451    }
452}
453
454#[cfg(feature = "mcp")]
455fn resolve_optional_path(base: Option<&Path>, value: Option<String>) -> Option<PathBuf> {
456    value.map(|v| resolve_required_path(base, v))
457}
458
459#[cfg(feature = "mcp")]
460fn default_cache_dir(base: Option<&Path>) -> PathBuf {
461    if let Some(dir) = env::var_os("GREENTIC_CACHE_DIR") {
462        PathBuf::from(dir)
463    } else if let Some(base) = base {
464        base.join(".greentic/tool-cache")
465    } else {
466        env::temp_dir().join("greentic-tool-cache")
467    }
468}