Skip to main content

greentic_runner_host/
config.rs

1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use crate::trace::TraceConfig;
6use crate::validate::ValidationConfig;
7use anyhow::{Context, Result};
8use serde::Deserialize;
9use serde_yaml_bw as serde_yaml;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14
15#[derive(Debug, Clone)]
16pub struct HostConfig {
17    pub tenant: String,
18    pub bindings_path: PathBuf,
19    pub flow_type_bindings: HashMap<String, FlowBinding>,
20    pub rate_limits: RateLimits,
21    pub retry: FlowRetryConfig,
22    pub http_enabled: bool,
23    pub secrets_policy: SecretsPolicy,
24    pub state_store_policy: StateStorePolicy,
25    pub webhook_policy: WebhookPolicy,
26    pub timers: Vec<TimerBinding>,
27    pub oauth: Option<OAuthConfig>,
28    pub mocks: Option<MocksConfig>,
29    pub pack_bindings: Vec<PackBinding>,
30    pub env_passthrough: Vec<String>,
31    pub trace: TraceConfig,
32    pub validation: ValidationConfig,
33    pub operator_policy: OperatorPolicy,
34}
35
36#[derive(Debug, Clone, Deserialize)]
37pub struct BindingsFile {
38    pub tenant: String,
39    #[serde(default)]
40    pub flow_type_bindings: HashMap<String, FlowBinding>,
41    #[serde(default)]
42    pub rate_limits: RateLimits,
43    #[serde(default)]
44    pub retry: FlowRetryConfig,
45    #[serde(default)]
46    pub timers: Vec<TimerBinding>,
47    #[serde(default)]
48    pub oauth: Option<OAuthConfig>,
49    #[serde(default)]
50    pub mocks: Option<MocksConfig>,
51    #[serde(default)]
52    pub state_store: StateStorePolicy,
53    #[serde(default)]
54    pub operator: OperatorPolicyConfig,
55}
56
57#[derive(Debug, Clone, Deserialize)]
58pub struct FlowBinding {
59    pub adapter: String,
60    #[serde(default)]
61    pub config: serde_yaml::Value,
62    #[serde(default)]
63    pub secrets: Vec<String>,
64}
65
66#[derive(Debug, Clone, Deserialize)]
67pub struct RateLimits {
68    #[serde(default = "default_messaging_qps")]
69    pub messaging_send_qps: u32,
70    #[serde(default = "default_messaging_burst")]
71    pub messaging_burst: u32,
72}
73
74#[derive(Debug, Clone)]
75pub struct SecretsPolicy {
76    allowed: HashSet<String>,
77    allow_all: bool,
78}
79
80#[derive(Debug, Clone, Deserialize, Default)]
81pub struct OperatorPolicyConfig {
82    #[serde(default)]
83    pub allowed_providers: Vec<String>,
84    #[serde(default)]
85    pub allowed_ops: HashMap<String, Vec<String>>,
86}
87
88#[derive(Debug, Clone)]
89pub struct OperatorPolicy {
90    allow_all: bool,
91    allowed_providers: HashSet<String>,
92    allowed_ops: HashMap<String, HashSet<String>>,
93}
94
95#[derive(Debug, Clone, Deserialize)]
96pub struct FlowRetryConfig {
97    #[serde(default = "default_retry_attempts")]
98    pub max_attempts: u32,
99    #[serde(default = "default_retry_base_delay_ms")]
100    pub base_delay_ms: u64,
101}
102
103#[derive(Debug, Clone, Default)]
104pub struct WebhookPolicy {
105    allow_paths: Vec<String>,
106    deny_paths: Vec<String>,
107}
108
109#[derive(Debug, Clone, Deserialize)]
110pub struct StateStorePolicy {
111    #[serde(default = "default_state_store_allow")]
112    pub allow: bool,
113}
114
115#[derive(Debug, Clone, Deserialize)]
116pub struct WebhookBindingConfig {
117    #[serde(default)]
118    pub allow_paths: Vec<String>,
119    #[serde(default)]
120    pub deny_paths: Vec<String>,
121}
122
123#[derive(Debug, Clone, Deserialize)]
124pub struct TimerBinding {
125    pub flow_id: String,
126    pub cron: String,
127    #[serde(default)]
128    pub schedule_id: Option<String>,
129}
130
131#[derive(Debug, Clone, Deserialize)]
132pub struct OAuthConfig {
133    pub http_base_url: String,
134    pub nats_url: String,
135    pub provider: String,
136    #[serde(default)]
137    pub env: Option<String>,
138    #[serde(default)]
139    pub team: Option<String>,
140}
141
142impl HostConfig {
143    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
144        let path = path.as_ref();
145        let content = fs::read_to_string(path)
146            .with_context(|| format!("failed to read bindings file {path:?}"))?;
147        let bindings: BindingsFile = serde_yaml::from_str(&content)
148            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
149
150        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
151        let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
152        let webhook_policy = bindings
153            .flow_type_bindings
154            .get("webhook")
155            .and_then(|binding| {
156                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
157                    .map(WebhookPolicy::from)
158                    .map_err(|err| {
159                        tracing::warn!(error = %err, "failed to parse webhook binding config");
160                        err
161                    })
162                    .ok()
163            })
164            .unwrap_or_default();
165
166        Ok(Self {
167            tenant: bindings.tenant.clone(),
168            bindings_path: path.to_path_buf(),
169            flow_type_bindings: bindings.flow_type_bindings.clone(),
170            rate_limits: bindings.rate_limits.clone(),
171            retry: bindings.retry.clone(),
172            http_enabled,
173            secrets_policy,
174            state_store_policy: bindings.state_store.clone(),
175            webhook_policy,
176            timers: bindings.timers.clone(),
177            oauth: bindings.oauth.clone(),
178            mocks: bindings.mocks.clone(),
179            pack_bindings: Vec::new(),
180            env_passthrough: Vec::new(),
181            trace: TraceConfig::from_env(),
182            validation: ValidationConfig::from_env(),
183            operator_policy: OperatorPolicy::from_config(bindings.operator.clone()),
184        })
185    }
186
187    pub fn from_gtbind(bindings: TenantBindings) -> Self {
188        Self {
189            tenant: bindings.tenant,
190            bindings_path: PathBuf::from("<gtbind>"),
191            flow_type_bindings: HashMap::new(),
192            rate_limits: RateLimits::default(),
193            retry: FlowRetryConfig::default(),
194            http_enabled: false,
195            secrets_policy: SecretsPolicy::allow_all(),
196            state_store_policy: StateStorePolicy::default(),
197            webhook_policy: WebhookPolicy::default(),
198            timers: Vec::new(),
199            oauth: None,
200            mocks: None,
201            pack_bindings: bindings.packs,
202            env_passthrough: bindings.env_passthrough,
203            trace: TraceConfig::from_env(),
204            validation: ValidationConfig::from_env(),
205            operator_policy: OperatorPolicy::allow_all(),
206        }
207    }
208
209    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
210        self.flow_type_bindings.get("messaging")
211    }
212
213    pub fn retry_config(&self) -> FlowRetryConfig {
214        self.retry.clone()
215    }
216
217    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
218        let oauth = self.oauth.as_ref()?;
219        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
220        if !oauth.provider.is_empty() {
221            cfg.default_provider = Some(oauth.provider.clone());
222        }
223        if let Some(team) = &oauth.team
224            && !team.is_empty()
225        {
226            cfg.team = Some(team.clone());
227        }
228        Some(cfg)
229    }
230
231    /// Derive a tenant context for the current configuration. This is used when
232    /// evaluating scoped requirements (e.g. secrets).
233    pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
234        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
235        let env_id = greentic_types::EnvId::from_str(&env)
236            .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
237        let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
238            .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
239        greentic_types::TenantCtx::new(env_id, tenant_id)
240    }
241}
242
243impl SecretsPolicy {
244    fn from_bindings(bindings: &BindingsFile) -> Self {
245        let allowed = bindings
246            .flow_type_bindings
247            .values()
248            .flat_map(|binding| binding.secrets.iter().cloned())
249            .collect::<HashSet<_>>();
250        Self {
251            allowed,
252            allow_all: false,
253        }
254    }
255
256    pub fn is_allowed(&self, key: &str) -> bool {
257        self.allow_all || self.allowed.contains(key)
258    }
259
260    pub fn allow_all() -> Self {
261        Self {
262            allowed: HashSet::new(),
263            allow_all: true,
264        }
265    }
266}
267
268impl OperatorPolicy {
269    pub fn from_config(config: OperatorPolicyConfig) -> Self {
270        let allowed_providers = config.allowed_providers.into_iter().collect::<HashSet<_>>();
271        let allowed_ops = config
272            .allowed_ops
273            .into_iter()
274            .map(|(provider, ops)| (provider, ops.into_iter().collect::<HashSet<_>>()))
275            .collect::<HashMap<_, _>>();
276        let allow_all = allowed_providers.is_empty() && allowed_ops.is_empty();
277        Self {
278            allow_all,
279            allowed_providers,
280            allowed_ops,
281        }
282    }
283
284    pub fn allow_all() -> Self {
285        Self {
286            allow_all: true,
287            allowed_providers: HashSet::new(),
288            allowed_ops: HashMap::new(),
289        }
290    }
291
292    pub fn allows_provider(&self, provider_id: Option<&str>, provider_type: &str) -> bool {
293        if self.allow_all {
294            return true;
295        }
296        provider_id
297            .map(|id| self.allowed_providers.contains(id))
298            .unwrap_or(false)
299            || self.allowed_providers.contains(provider_type)
300    }
301
302    pub fn allows_op(&self, provider_id: Option<&str>, provider_type: &str, op_id: &str) -> bool {
303        if self.allow_all {
304            return true;
305        }
306        if let Some(ops) = provider_id.and_then(|id| self.allowed_ops.get(id)) {
307            return ops.contains(op_id);
308        }
309        if let Some(ops) = self.allowed_ops.get(provider_type) {
310            return ops.contains(op_id);
311        }
312        self.allows_provider(provider_id, provider_type)
313    }
314}
315
316impl Default for RateLimits {
317    fn default() -> Self {
318        Self {
319            messaging_send_qps: default_messaging_qps(),
320            messaging_burst: default_messaging_burst(),
321        }
322    }
323}
324
325impl Default for StateStorePolicy {
326    fn default() -> Self {
327        Self {
328            allow: default_state_store_allow(),
329        }
330    }
331}
332
333fn default_messaging_qps() -> u32 {
334    10
335}
336
337fn default_messaging_burst() -> u32 {
338    20
339}
340
341fn default_state_store_allow() -> bool {
342    true
343}
344
345impl From<WebhookBindingConfig> for WebhookPolicy {
346    fn from(value: WebhookBindingConfig) -> Self {
347        Self {
348            allow_paths: value.allow_paths,
349            deny_paths: value.deny_paths,
350        }
351    }
352}
353
354impl WebhookPolicy {
355    pub fn is_allowed(&self, path: &str) -> bool {
356        if self
357            .deny_paths
358            .iter()
359            .any(|prefix| path.starts_with(prefix))
360        {
361            return false;
362        }
363
364        if self.allow_paths.is_empty() {
365            return true;
366        }
367
368        self.allow_paths
369            .iter()
370            .any(|prefix| path.starts_with(prefix))
371    }
372}
373
374impl TimerBinding {
375    pub fn schedule_id(&self) -> &str {
376        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
377    }
378}
379
380impl Default for FlowRetryConfig {
381    fn default() -> Self {
382        Self {
383            max_attempts: default_retry_attempts(),
384            base_delay_ms: default_retry_base_delay_ms(),
385        }
386    }
387}
388
389#[cfg(test)]
390mod operator_policy_tests {
391    use super::{OperatorPolicy, OperatorPolicyConfig};
392    use std::collections::HashMap;
393
394    #[test]
395    fn policy_allows_configured_provider_op() {
396        let mut allowed_ops = HashMap::new();
397        allowed_ops.insert("provider.allowed".into(), vec!["op1".into(), "op2".into()]);
398        let config = OperatorPolicyConfig {
399            allowed_providers: vec!["provider.allowed".into()],
400            allowed_ops,
401        };
402        let policy = OperatorPolicy::from_config(config);
403        assert!(policy.allows_provider(Some("provider.allowed"), "provider.allowed"));
404        assert!(policy.allows_op(Some("provider.allowed"), "provider.allowed", "op1"));
405        assert!(!policy.allows_op(Some("provider.allowed"), "provider.allowed", "other"));
406        assert!(!policy.allows_provider(Some("provider.denied"), "provider.denied"));
407    }
408
409    #[test]
410    fn policy_allow_all_defaults_true() {
411        let policy = OperatorPolicy::allow_all();
412        assert!(policy.allows_provider(None, "any"));
413        assert!(policy.allows_op(None, "any", "op"));
414    }
415}
416
417fn default_retry_attempts() -> u32 {
418    3
419}
420
421fn default_retry_base_delay_ms() -> u64 {
422    250
423}
424
425#[cfg(test)]
426#[allow(clippy::items_after_test_module)]
427mod tests {
428    use super::*;
429    use std::collections::HashMap;
430    use std::path::PathBuf;
431
432    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
433        HostConfig {
434            tenant: "tenant-a".to_string(),
435            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
436            flow_type_bindings: HashMap::new(),
437            rate_limits: RateLimits::default(),
438            retry: FlowRetryConfig::default(),
439            http_enabled: false,
440            secrets_policy: SecretsPolicy::allow_all(),
441            state_store_policy: StateStorePolicy::default(),
442            webhook_policy: WebhookPolicy::default(),
443            timers: Vec::new(),
444            oauth,
445            mocks: None,
446            pack_bindings: Vec::new(),
447            env_passthrough: Vec::new(),
448            trace: TraceConfig::from_env(),
449            validation: ValidationConfig::from_env(),
450            operator_policy: OperatorPolicy::allow_all(),
451        }
452    }
453
454    #[test]
455    fn oauth_broker_config_absent_without_block() {
456        let cfg = host_config_with_oauth(None);
457        assert!(cfg.oauth_broker_config().is_none());
458    }
459
460    #[test]
461    fn oauth_broker_config_maps_fields() {
462        let cfg = host_config_with_oauth(Some(OAuthConfig {
463            http_base_url: "https://oauth.example/".into(),
464            nats_url: "nats://broker:4222".into(),
465            provider: "demo".into(),
466            env: None,
467            team: Some("ops".into()),
468        }));
469        let broker = cfg.oauth_broker_config().expect("missing broker config");
470        assert_eq!(broker.http_base_url, "https://oauth.example/");
471        assert_eq!(broker.nats_url, "nats://broker:4222");
472        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
473        assert_eq!(broker.team.as_deref(), Some("ops"));
474    }
475}