greentic_runner_host/
config.rs

1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use crate::trace::TraceConfig;
6use crate::validate::ValidationConfig;
7use anyhow::{Context, Result};
8use serde::Deserialize;
9use serde_yaml_bw as serde_yaml;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14
15#[derive(Debug, Clone)]
16pub struct HostConfig {
17    pub tenant: String,
18    pub bindings_path: PathBuf,
19    pub flow_type_bindings: HashMap<String, FlowBinding>,
20    pub rate_limits: RateLimits,
21    pub retry: FlowRetryConfig,
22    pub http_enabled: bool,
23    pub secrets_policy: SecretsPolicy,
24    pub state_store_policy: StateStorePolicy,
25    pub webhook_policy: WebhookPolicy,
26    pub timers: Vec<TimerBinding>,
27    pub oauth: Option<OAuthConfig>,
28    pub mocks: Option<MocksConfig>,
29    pub pack_bindings: Vec<PackBinding>,
30    pub env_passthrough: Vec<String>,
31    pub trace: TraceConfig,
32    pub validation: ValidationConfig,
33}
34
35#[derive(Debug, Clone, Deserialize)]
36pub struct BindingsFile {
37    pub tenant: String,
38    #[serde(default)]
39    pub flow_type_bindings: HashMap<String, FlowBinding>,
40    #[serde(default)]
41    pub rate_limits: RateLimits,
42    #[serde(default)]
43    pub retry: FlowRetryConfig,
44    #[serde(default)]
45    pub timers: Vec<TimerBinding>,
46    #[serde(default)]
47    pub oauth: Option<OAuthConfig>,
48    #[serde(default)]
49    pub mocks: Option<MocksConfig>,
50    #[serde(default)]
51    pub state_store: StateStorePolicy,
52}
53
54#[derive(Debug, Clone, Deserialize)]
55pub struct FlowBinding {
56    pub adapter: String,
57    #[serde(default)]
58    pub config: serde_yaml::Value,
59    #[serde(default)]
60    pub secrets: Vec<String>,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64pub struct RateLimits {
65    #[serde(default = "default_messaging_qps")]
66    pub messaging_send_qps: u32,
67    #[serde(default = "default_messaging_burst")]
68    pub messaging_burst: u32,
69}
70
71#[derive(Debug, Clone)]
72pub struct SecretsPolicy {
73    allowed: HashSet<String>,
74    allow_all: bool,
75}
76
77#[derive(Debug, Clone, Deserialize)]
78pub struct FlowRetryConfig {
79    #[serde(default = "default_retry_attempts")]
80    pub max_attempts: u32,
81    #[serde(default = "default_retry_base_delay_ms")]
82    pub base_delay_ms: u64,
83}
84
85#[derive(Debug, Clone, Default)]
86pub struct WebhookPolicy {
87    allow_paths: Vec<String>,
88    deny_paths: Vec<String>,
89}
90
91#[derive(Debug, Clone, Deserialize)]
92pub struct StateStorePolicy {
93    #[serde(default = "default_state_store_allow")]
94    pub allow: bool,
95}
96
97#[derive(Debug, Clone, Deserialize)]
98pub struct WebhookBindingConfig {
99    #[serde(default)]
100    pub allow_paths: Vec<String>,
101    #[serde(default)]
102    pub deny_paths: Vec<String>,
103}
104
105#[derive(Debug, Clone, Deserialize)]
106pub struct TimerBinding {
107    pub flow_id: String,
108    pub cron: String,
109    #[serde(default)]
110    pub schedule_id: Option<String>,
111}
112
113#[derive(Debug, Clone, Deserialize)]
114pub struct OAuthConfig {
115    pub http_base_url: String,
116    pub nats_url: String,
117    pub provider: String,
118    #[serde(default)]
119    pub env: Option<String>,
120    #[serde(default)]
121    pub team: Option<String>,
122}
123
124impl HostConfig {
125    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
126        let path = path.as_ref();
127        let content = fs::read_to_string(path)
128            .with_context(|| format!("failed to read bindings file {path:?}"))?;
129        let bindings: BindingsFile = serde_yaml::from_str(&content)
130            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
131
132        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
133        let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
134        let webhook_policy = bindings
135            .flow_type_bindings
136            .get("webhook")
137            .and_then(|binding| {
138                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
139                    .map(WebhookPolicy::from)
140                    .map_err(|err| {
141                        tracing::warn!(error = %err, "failed to parse webhook binding config");
142                        err
143                    })
144                    .ok()
145            })
146            .unwrap_or_default();
147
148        Ok(Self {
149            tenant: bindings.tenant.clone(),
150            bindings_path: path.to_path_buf(),
151            flow_type_bindings: bindings.flow_type_bindings.clone(),
152            rate_limits: bindings.rate_limits.clone(),
153            retry: bindings.retry.clone(),
154            http_enabled,
155            secrets_policy,
156            state_store_policy: bindings.state_store.clone(),
157            webhook_policy,
158            timers: bindings.timers.clone(),
159            oauth: bindings.oauth.clone(),
160            mocks: bindings.mocks.clone(),
161            pack_bindings: Vec::new(),
162            env_passthrough: Vec::new(),
163            trace: TraceConfig::from_env(),
164            validation: ValidationConfig::from_env(),
165        })
166    }
167
168    pub fn from_gtbind(bindings: TenantBindings) -> Self {
169        Self {
170            tenant: bindings.tenant,
171            bindings_path: PathBuf::from("<gtbind>"),
172            flow_type_bindings: HashMap::new(),
173            rate_limits: RateLimits::default(),
174            retry: FlowRetryConfig::default(),
175            http_enabled: false,
176            secrets_policy: SecretsPolicy::allow_all(),
177            state_store_policy: StateStorePolicy::default(),
178            webhook_policy: WebhookPolicy::default(),
179            timers: Vec::new(),
180            oauth: None,
181            mocks: None,
182            pack_bindings: bindings.packs,
183            env_passthrough: bindings.env_passthrough,
184            trace: TraceConfig::from_env(),
185            validation: ValidationConfig::from_env(),
186        }
187    }
188
189    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
190        self.flow_type_bindings.get("messaging")
191    }
192
193    pub fn retry_config(&self) -> FlowRetryConfig {
194        self.retry.clone()
195    }
196
197    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
198        let oauth = self.oauth.as_ref()?;
199        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
200        if !oauth.provider.is_empty() {
201            cfg.default_provider = Some(oauth.provider.clone());
202        }
203        if let Some(team) = &oauth.team
204            && !team.is_empty()
205        {
206            cfg.team = Some(team.clone());
207        }
208        Some(cfg)
209    }
210
211    /// Derive a tenant context for the current configuration. This is used when
212    /// evaluating scoped requirements (e.g. secrets).
213    pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
214        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
215        let env_id = greentic_types::EnvId::from_str(&env)
216            .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
217        let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
218            .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
219        greentic_types::TenantCtx::new(env_id, tenant_id)
220    }
221}
222
223impl SecretsPolicy {
224    fn from_bindings(bindings: &BindingsFile) -> Self {
225        let allowed = bindings
226            .flow_type_bindings
227            .values()
228            .flat_map(|binding| binding.secrets.iter().cloned())
229            .collect::<HashSet<_>>();
230        Self {
231            allowed,
232            allow_all: false,
233        }
234    }
235
236    pub fn is_allowed(&self, key: &str) -> bool {
237        self.allow_all || self.allowed.contains(key)
238    }
239
240    pub fn allow_all() -> Self {
241        Self {
242            allowed: HashSet::new(),
243            allow_all: true,
244        }
245    }
246}
247
248impl Default for RateLimits {
249    fn default() -> Self {
250        Self {
251            messaging_send_qps: default_messaging_qps(),
252            messaging_burst: default_messaging_burst(),
253        }
254    }
255}
256
257impl Default for StateStorePolicy {
258    fn default() -> Self {
259        Self {
260            allow: default_state_store_allow(),
261        }
262    }
263}
264
265fn default_messaging_qps() -> u32 {
266    10
267}
268
269fn default_messaging_burst() -> u32 {
270    20
271}
272
273fn default_state_store_allow() -> bool {
274    true
275}
276
277impl From<WebhookBindingConfig> for WebhookPolicy {
278    fn from(value: WebhookBindingConfig) -> Self {
279        Self {
280            allow_paths: value.allow_paths,
281            deny_paths: value.deny_paths,
282        }
283    }
284}
285
286impl WebhookPolicy {
287    pub fn is_allowed(&self, path: &str) -> bool {
288        if self
289            .deny_paths
290            .iter()
291            .any(|prefix| path.starts_with(prefix))
292        {
293            return false;
294        }
295
296        if self.allow_paths.is_empty() {
297            return true;
298        }
299
300        self.allow_paths
301            .iter()
302            .any(|prefix| path.starts_with(prefix))
303    }
304}
305
306impl TimerBinding {
307    pub fn schedule_id(&self) -> &str {
308        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
309    }
310}
311
312impl Default for FlowRetryConfig {
313    fn default() -> Self {
314        Self {
315            max_attempts: default_retry_attempts(),
316            base_delay_ms: default_retry_base_delay_ms(),
317        }
318    }
319}
320
321fn default_retry_attempts() -> u32 {
322    3
323}
324
325fn default_retry_base_delay_ms() -> u64 {
326    250
327}
328
329#[cfg(test)]
330#[allow(clippy::items_after_test_module)]
331mod tests {
332    use super::*;
333    use std::collections::HashMap;
334    use std::path::PathBuf;
335
336    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
337        HostConfig {
338            tenant: "tenant-a".to_string(),
339            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
340            flow_type_bindings: HashMap::new(),
341            rate_limits: RateLimits::default(),
342            retry: FlowRetryConfig::default(),
343            http_enabled: false,
344            secrets_policy: SecretsPolicy::allow_all(),
345            state_store_policy: StateStorePolicy::default(),
346            webhook_policy: WebhookPolicy::default(),
347            timers: Vec::new(),
348            oauth,
349            mocks: None,
350            pack_bindings: Vec::new(),
351            env_passthrough: Vec::new(),
352            trace: TraceConfig::from_env(),
353            validation: ValidationConfig::from_env(),
354        }
355    }
356
357    #[test]
358    fn oauth_broker_config_absent_without_block() {
359        let cfg = host_config_with_oauth(None);
360        assert!(cfg.oauth_broker_config().is_none());
361    }
362
363    #[test]
364    fn oauth_broker_config_maps_fields() {
365        let cfg = host_config_with_oauth(Some(OAuthConfig {
366            http_base_url: "https://oauth.example/".into(),
367            nats_url: "nats://broker:4222".into(),
368            provider: "demo".into(),
369            env: None,
370            team: Some("ops".into()),
371        }));
372        let broker = cfg.oauth_broker_config().expect("missing broker config");
373        assert_eq!(broker.http_base_url, "https://oauth.example/");
374        assert_eq!(broker.nats_url, "nats://broker:4222");
375        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
376        assert_eq!(broker.team.as_deref(), Some("ops"));
377    }
378}