greentic_runner_host/
config.rs

1use crate::oauth::OAuthBrokerConfig;
2use crate::runner::mocks::MocksConfig;
3use anyhow::{Context, Result};
4use serde::Deserialize;
5use serde_yaml_bw as serde_yaml;
6use std::collections::{HashMap, HashSet};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::str::FromStr;
10
11#[derive(Debug, Clone)]
12pub struct HostConfig {
13    pub tenant: String,
14    pub bindings_path: PathBuf,
15    pub flow_type_bindings: HashMap<String, FlowBinding>,
16    pub rate_limits: RateLimits,
17    pub retry: FlowRetryConfig,
18    pub http_enabled: bool,
19    pub secrets_policy: SecretsPolicy,
20    pub state_store_policy: StateStorePolicy,
21    pub webhook_policy: WebhookPolicy,
22    pub timers: Vec<TimerBinding>,
23    pub oauth: Option<OAuthConfig>,
24    pub mocks: Option<MocksConfig>,
25}
26
27#[derive(Debug, Clone, Deserialize)]
28pub struct BindingsFile {
29    pub tenant: String,
30    #[serde(default)]
31    pub flow_type_bindings: HashMap<String, FlowBinding>,
32    #[serde(default)]
33    pub rate_limits: RateLimits,
34    #[serde(default)]
35    pub retry: FlowRetryConfig,
36    #[serde(default)]
37    pub timers: Vec<TimerBinding>,
38    #[serde(default)]
39    pub oauth: Option<OAuthConfig>,
40    #[serde(default)]
41    pub mocks: Option<MocksConfig>,
42    #[serde(default)]
43    pub state_store: StateStorePolicy,
44}
45
46#[derive(Debug, Clone, Deserialize)]
47pub struct FlowBinding {
48    pub adapter: String,
49    #[serde(default)]
50    pub config: serde_yaml::Value,
51    #[serde(default)]
52    pub secrets: Vec<String>,
53}
54
55#[derive(Debug, Clone, Deserialize)]
56pub struct RateLimits {
57    #[serde(default = "default_messaging_qps")]
58    pub messaging_send_qps: u32,
59    #[serde(default = "default_messaging_burst")]
60    pub messaging_burst: u32,
61}
62
63#[derive(Debug, Clone)]
64pub struct SecretsPolicy {
65    allowed: HashSet<String>,
66    allow_all: bool,
67}
68
69#[derive(Debug, Clone, Deserialize)]
70pub struct FlowRetryConfig {
71    #[serde(default = "default_retry_attempts")]
72    pub max_attempts: u32,
73    #[serde(default = "default_retry_base_delay_ms")]
74    pub base_delay_ms: u64,
75}
76
77#[derive(Debug, Clone, Default)]
78pub struct WebhookPolicy {
79    allow_paths: Vec<String>,
80    deny_paths: Vec<String>,
81}
82
83#[derive(Debug, Clone, Deserialize)]
84pub struct StateStorePolicy {
85    #[serde(default = "default_state_store_allow")]
86    pub allow: bool,
87}
88
89#[derive(Debug, Clone, Deserialize)]
90pub struct WebhookBindingConfig {
91    #[serde(default)]
92    pub allow_paths: Vec<String>,
93    #[serde(default)]
94    pub deny_paths: Vec<String>,
95}
96
97#[derive(Debug, Clone, Deserialize)]
98pub struct TimerBinding {
99    pub flow_id: String,
100    pub cron: String,
101    #[serde(default)]
102    pub schedule_id: Option<String>,
103}
104
105#[derive(Debug, Clone, Deserialize)]
106pub struct OAuthConfig {
107    pub http_base_url: String,
108    pub nats_url: String,
109    pub provider: String,
110    #[serde(default)]
111    pub env: Option<String>,
112    #[serde(default)]
113    pub team: Option<String>,
114}
115
116impl HostConfig {
117    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
118        let path = path.as_ref();
119        let content = fs::read_to_string(path)
120            .with_context(|| format!("failed to read bindings file {path:?}"))?;
121        let bindings: BindingsFile = serde_yaml::from_str(&content)
122            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
123
124        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
125        let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
126        let webhook_policy = bindings
127            .flow_type_bindings
128            .get("webhook")
129            .and_then(|binding| {
130                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
131                    .map(WebhookPolicy::from)
132                    .map_err(|err| {
133                        tracing::warn!(error = %err, "failed to parse webhook binding config");
134                        err
135                    })
136                    .ok()
137            })
138            .unwrap_or_default();
139
140        Ok(Self {
141            tenant: bindings.tenant.clone(),
142            bindings_path: path.to_path_buf(),
143            flow_type_bindings: bindings.flow_type_bindings.clone(),
144            rate_limits: bindings.rate_limits.clone(),
145            retry: bindings.retry.clone(),
146            http_enabled,
147            secrets_policy,
148            state_store_policy: bindings.state_store.clone(),
149            webhook_policy,
150            timers: bindings.timers.clone(),
151            oauth: bindings.oauth.clone(),
152            mocks: bindings.mocks.clone(),
153        })
154    }
155
156    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
157        self.flow_type_bindings.get("messaging")
158    }
159
160    pub fn retry_config(&self) -> FlowRetryConfig {
161        self.retry.clone()
162    }
163
164    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
165        let oauth = self.oauth.as_ref()?;
166        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
167        if !oauth.provider.is_empty() {
168            cfg.default_provider = Some(oauth.provider.clone());
169        }
170        if let Some(team) = &oauth.team
171            && !team.is_empty()
172        {
173            cfg.team = Some(team.clone());
174        }
175        Some(cfg)
176    }
177
178    /// Derive a tenant context for the current configuration. This is used when
179    /// evaluating scoped requirements (e.g. secrets).
180    pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
181        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
182        let env_id = greentic_types::EnvId::from_str(&env)
183            .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
184        let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
185            .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
186        greentic_types::TenantCtx::new(env_id, tenant_id)
187    }
188}
189
190impl SecretsPolicy {
191    fn from_bindings(bindings: &BindingsFile) -> Self {
192        let allowed = bindings
193            .flow_type_bindings
194            .values()
195            .flat_map(|binding| binding.secrets.iter().cloned())
196            .collect::<HashSet<_>>();
197        Self {
198            allowed,
199            allow_all: false,
200        }
201    }
202
203    pub fn is_allowed(&self, key: &str) -> bool {
204        self.allow_all || self.allowed.contains(key)
205    }
206
207    pub fn allow_all() -> Self {
208        Self {
209            allowed: HashSet::new(),
210            allow_all: true,
211        }
212    }
213}
214
215impl Default for RateLimits {
216    fn default() -> Self {
217        Self {
218            messaging_send_qps: default_messaging_qps(),
219            messaging_burst: default_messaging_burst(),
220        }
221    }
222}
223
224impl Default for StateStorePolicy {
225    fn default() -> Self {
226        Self {
227            allow: default_state_store_allow(),
228        }
229    }
230}
231
232fn default_messaging_qps() -> u32 {
233    10
234}
235
236fn default_messaging_burst() -> u32 {
237    20
238}
239
240fn default_state_store_allow() -> bool {
241    true
242}
243
244impl From<WebhookBindingConfig> for WebhookPolicy {
245    fn from(value: WebhookBindingConfig) -> Self {
246        Self {
247            allow_paths: value.allow_paths,
248            deny_paths: value.deny_paths,
249        }
250    }
251}
252
253impl WebhookPolicy {
254    pub fn is_allowed(&self, path: &str) -> bool {
255        if self
256            .deny_paths
257            .iter()
258            .any(|prefix| path.starts_with(prefix))
259        {
260            return false;
261        }
262
263        if self.allow_paths.is_empty() {
264            return true;
265        }
266
267        self.allow_paths
268            .iter()
269            .any(|prefix| path.starts_with(prefix))
270    }
271}
272
273impl TimerBinding {
274    pub fn schedule_id(&self) -> &str {
275        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
276    }
277}
278
279impl Default for FlowRetryConfig {
280    fn default() -> Self {
281        Self {
282            max_attempts: default_retry_attempts(),
283            base_delay_ms: default_retry_base_delay_ms(),
284        }
285    }
286}
287
288fn default_retry_attempts() -> u32 {
289    3
290}
291
292fn default_retry_base_delay_ms() -> u64 {
293    250
294}
295
296#[cfg(test)]
297#[allow(clippy::items_after_test_module)]
298mod tests {
299    use super::*;
300    use std::collections::HashMap;
301    use std::path::PathBuf;
302
303    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
304        HostConfig {
305            tenant: "tenant-a".to_string(),
306            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
307            flow_type_bindings: HashMap::new(),
308            rate_limits: RateLimits::default(),
309            retry: FlowRetryConfig::default(),
310            http_enabled: false,
311            secrets_policy: SecretsPolicy::allow_all(),
312            state_store_policy: StateStorePolicy::default(),
313            webhook_policy: WebhookPolicy::default(),
314            timers: Vec::new(),
315            oauth,
316            mocks: None,
317        }
318    }
319
320    #[test]
321    fn oauth_broker_config_absent_without_block() {
322        let cfg = host_config_with_oauth(None);
323        assert!(cfg.oauth_broker_config().is_none());
324    }
325
326    #[test]
327    fn oauth_broker_config_maps_fields() {
328        let cfg = host_config_with_oauth(Some(OAuthConfig {
329            http_base_url: "https://oauth.example/".into(),
330            nats_url: "nats://broker:4222".into(),
331            provider: "demo".into(),
332            env: None,
333            team: Some("ops".into()),
334        }));
335        let broker = cfg.oauth_broker_config().expect("missing broker config");
336        assert_eq!(broker.http_base_url, "https://oauth.example/");
337        assert_eq!(broker.nats_url, "nats://broker:4222");
338        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
339        assert_eq!(broker.team.as_deref(), Some("ops"));
340    }
341}