greentic_runner_host/
config.rs

1use crate::oauth::OAuthBrokerConfig;
2use crate::runner::mocks::MocksConfig;
3use anyhow::{Context, Result};
4use serde::Deserialize;
5use serde_yaml_bw as serde_yaml;
6use std::collections::{HashMap, HashSet};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::str::FromStr;
10
11#[derive(Debug, Clone)]
12pub struct HostConfig {
13    pub tenant: String,
14    pub bindings_path: PathBuf,
15    pub flow_type_bindings: HashMap<String, FlowBinding>,
16    pub rate_limits: RateLimits,
17    pub retry: FlowRetryConfig,
18    pub http_enabled: bool,
19    pub secrets_policy: SecretsPolicy,
20    pub webhook_policy: WebhookPolicy,
21    pub timers: Vec<TimerBinding>,
22    pub oauth: Option<OAuthConfig>,
23    pub mocks: Option<MocksConfig>,
24}
25
26#[derive(Debug, Clone, Deserialize)]
27pub struct BindingsFile {
28    pub tenant: String,
29    #[serde(default)]
30    pub flow_type_bindings: HashMap<String, FlowBinding>,
31    #[serde(default)]
32    pub rate_limits: RateLimits,
33    #[serde(default)]
34    pub retry: FlowRetryConfig,
35    #[serde(default)]
36    pub timers: Vec<TimerBinding>,
37    #[serde(default)]
38    pub oauth: Option<OAuthConfig>,
39    #[serde(default)]
40    pub mocks: Option<MocksConfig>,
41}
42
43#[derive(Debug, Clone, Deserialize)]
44pub struct FlowBinding {
45    pub adapter: String,
46    #[serde(default)]
47    pub config: serde_yaml::Value,
48    #[serde(default)]
49    pub secrets: Vec<String>,
50}
51
52#[derive(Debug, Clone, Deserialize)]
53pub struct RateLimits {
54    #[serde(default = "default_messaging_qps")]
55    pub messaging_send_qps: u32,
56    #[serde(default = "default_messaging_burst")]
57    pub messaging_burst: u32,
58}
59
60#[derive(Debug, Clone)]
61pub struct SecretsPolicy {
62    allowed: HashSet<String>,
63    allow_all: bool,
64}
65
66#[derive(Debug, Clone, Deserialize)]
67pub struct FlowRetryConfig {
68    #[serde(default = "default_retry_attempts")]
69    pub max_attempts: u32,
70    #[serde(default = "default_retry_base_delay_ms")]
71    pub base_delay_ms: u64,
72}
73
74#[derive(Debug, Clone, Default)]
75pub struct WebhookPolicy {
76    allow_paths: Vec<String>,
77    deny_paths: Vec<String>,
78}
79
80#[derive(Debug, Clone, Deserialize)]
81pub struct WebhookBindingConfig {
82    #[serde(default)]
83    pub allow_paths: Vec<String>,
84    #[serde(default)]
85    pub deny_paths: Vec<String>,
86}
87
88#[derive(Debug, Clone, Deserialize)]
89pub struct TimerBinding {
90    pub flow_id: String,
91    pub cron: String,
92    #[serde(default)]
93    pub schedule_id: Option<String>,
94}
95
96#[derive(Debug, Clone, Deserialize)]
97pub struct OAuthConfig {
98    pub http_base_url: String,
99    pub nats_url: String,
100    pub provider: String,
101    #[serde(default)]
102    pub env: Option<String>,
103    #[serde(default)]
104    pub team: Option<String>,
105}
106
107impl HostConfig {
108    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
109        let path = path.as_ref();
110        let content = fs::read_to_string(path)
111            .with_context(|| format!("failed to read bindings file {path:?}"))?;
112        let bindings: BindingsFile = serde_yaml::from_str(&content)
113            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
114
115        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
116        let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
117        let webhook_policy = bindings
118            .flow_type_bindings
119            .get("webhook")
120            .and_then(|binding| {
121                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
122                    .map(WebhookPolicy::from)
123                    .map_err(|err| {
124                        tracing::warn!(error = %err, "failed to parse webhook binding config");
125                        err
126                    })
127                    .ok()
128            })
129            .unwrap_or_default();
130
131        Ok(Self {
132            tenant: bindings.tenant.clone(),
133            bindings_path: path.to_path_buf(),
134            flow_type_bindings: bindings.flow_type_bindings.clone(),
135            rate_limits: bindings.rate_limits.clone(),
136            retry: bindings.retry.clone(),
137            http_enabled,
138            secrets_policy,
139            webhook_policy,
140            timers: bindings.timers.clone(),
141            oauth: bindings.oauth.clone(),
142            mocks: bindings.mocks.clone(),
143        })
144    }
145
146    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
147        self.flow_type_bindings.get("messaging")
148    }
149
150    pub fn retry_config(&self) -> FlowRetryConfig {
151        self.retry.clone()
152    }
153
154    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
155        let oauth = self.oauth.as_ref()?;
156        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
157        if !oauth.provider.is_empty() {
158            cfg.default_provider = Some(oauth.provider.clone());
159        }
160        if let Some(team) = &oauth.team
161            && !team.is_empty()
162        {
163            cfg.team = Some(team.clone());
164        }
165        Some(cfg)
166    }
167
168    /// Derive a tenant context for the current configuration. This is used when
169    /// evaluating scoped requirements (e.g. secrets).
170    pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
171        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
172        let env_id = greentic_types::EnvId::from_str(&env)
173            .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
174        let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
175            .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
176        greentic_types::TenantCtx::new(env_id, tenant_id)
177    }
178}
179
180impl SecretsPolicy {
181    fn from_bindings(bindings: &BindingsFile) -> Self {
182        let allowed = bindings
183            .flow_type_bindings
184            .values()
185            .flat_map(|binding| binding.secrets.iter().cloned())
186            .collect::<HashSet<_>>();
187        Self {
188            allowed,
189            allow_all: false,
190        }
191    }
192
193    pub fn is_allowed(&self, key: &str) -> bool {
194        self.allow_all || self.allowed.contains(key)
195    }
196
197    pub fn allow_all() -> Self {
198        Self {
199            allowed: HashSet::new(),
200            allow_all: true,
201        }
202    }
203}
204
205impl Default for RateLimits {
206    fn default() -> Self {
207        Self {
208            messaging_send_qps: default_messaging_qps(),
209            messaging_burst: default_messaging_burst(),
210        }
211    }
212}
213
214fn default_messaging_qps() -> u32 {
215    10
216}
217
218fn default_messaging_burst() -> u32 {
219    20
220}
221
222impl From<WebhookBindingConfig> for WebhookPolicy {
223    fn from(value: WebhookBindingConfig) -> Self {
224        Self {
225            allow_paths: value.allow_paths,
226            deny_paths: value.deny_paths,
227        }
228    }
229}
230
231impl WebhookPolicy {
232    pub fn is_allowed(&self, path: &str) -> bool {
233        if self
234            .deny_paths
235            .iter()
236            .any(|prefix| path.starts_with(prefix))
237        {
238            return false;
239        }
240
241        if self.allow_paths.is_empty() {
242            return true;
243        }
244
245        self.allow_paths
246            .iter()
247            .any(|prefix| path.starts_with(prefix))
248    }
249}
250
251impl TimerBinding {
252    pub fn schedule_id(&self) -> &str {
253        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
254    }
255}
256
257impl Default for FlowRetryConfig {
258    fn default() -> Self {
259        Self {
260            max_attempts: default_retry_attempts(),
261            base_delay_ms: default_retry_base_delay_ms(),
262        }
263    }
264}
265
266fn default_retry_attempts() -> u32 {
267    3
268}
269
270fn default_retry_base_delay_ms() -> u64 {
271    250
272}
273
274#[cfg(test)]
275#[allow(clippy::items_after_test_module)]
276mod tests {
277    use super::*;
278    use std::collections::HashMap;
279    use std::path::PathBuf;
280
281    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
282        HostConfig {
283            tenant: "tenant-a".to_string(),
284            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
285            flow_type_bindings: HashMap::new(),
286            rate_limits: RateLimits::default(),
287            retry: FlowRetryConfig::default(),
288            http_enabled: false,
289            secrets_policy: SecretsPolicy::allow_all(),
290            webhook_policy: WebhookPolicy::default(),
291            timers: Vec::new(),
292            oauth,
293            mocks: None,
294        }
295    }
296
297    #[test]
298    fn oauth_broker_config_absent_without_block() {
299        let cfg = host_config_with_oauth(None);
300        assert!(cfg.oauth_broker_config().is_none());
301    }
302
303    #[test]
304    fn oauth_broker_config_maps_fields() {
305        let cfg = host_config_with_oauth(Some(OAuthConfig {
306            http_base_url: "https://oauth.example/".into(),
307            nats_url: "nats://broker:4222".into(),
308            provider: "demo".into(),
309            env: None,
310            team: Some("ops".into()),
311        }));
312        let broker = cfg.oauth_broker_config().expect("missing broker config");
313        assert_eq!(broker.http_base_url, "https://oauth.example/");
314        assert_eq!(broker.nats_url, "nats://broker:4222");
315        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
316        assert_eq!(broker.team.as_deref(), Some("ops"));
317    }
318}