greentic_runner_host/
config.rs

1use crate::oauth::OAuthBrokerConfig;
2use crate::runner::mocks::MocksConfig;
3use anyhow::{Context, Result};
4use serde::Deserialize;
5use serde_yaml_bw as serde_yaml;
6use std::collections::{HashMap, HashSet};
7use std::fs;
8use std::path::{Path, PathBuf};
9
10#[derive(Debug, Clone)]
11pub struct HostConfig {
12    pub tenant: String,
13    pub bindings_path: PathBuf,
14    pub flow_type_bindings: HashMap<String, FlowBinding>,
15    pub rate_limits: RateLimits,
16    pub retry: FlowRetryConfig,
17    pub http_enabled: bool,
18    pub secrets_policy: SecretsPolicy,
19    pub webhook_policy: WebhookPolicy,
20    pub timers: Vec<TimerBinding>,
21    pub oauth: Option<OAuthConfig>,
22    pub mocks: Option<MocksConfig>,
23}
24
25#[derive(Debug, Clone, Deserialize)]
26pub struct BindingsFile {
27    pub tenant: String,
28    #[serde(default)]
29    pub flow_type_bindings: HashMap<String, FlowBinding>,
30    #[serde(default)]
31    pub rate_limits: RateLimits,
32    #[serde(default)]
33    pub retry: FlowRetryConfig,
34    #[serde(default)]
35    pub timers: Vec<TimerBinding>,
36    #[serde(default)]
37    pub oauth: Option<OAuthConfig>,
38    #[serde(default)]
39    pub mocks: Option<MocksConfig>,
40}
41
42#[derive(Debug, Clone, Deserialize)]
43pub struct FlowBinding {
44    pub adapter: String,
45    #[serde(default)]
46    pub config: serde_yaml::Value,
47    #[serde(default)]
48    pub secrets: Vec<String>,
49}
50
51#[derive(Debug, Clone, Deserialize)]
52pub struct RateLimits {
53    #[serde(default = "default_messaging_qps")]
54    pub messaging_send_qps: u32,
55    #[serde(default = "default_messaging_burst")]
56    pub messaging_burst: u32,
57}
58
59#[derive(Debug, Clone)]
60pub struct SecretsPolicy {
61    allowed: HashSet<String>,
62    allow_all: bool,
63}
64
65#[derive(Debug, Clone, Deserialize)]
66pub struct FlowRetryConfig {
67    #[serde(default = "default_retry_attempts")]
68    pub max_attempts: u32,
69    #[serde(default = "default_retry_base_delay_ms")]
70    pub base_delay_ms: u64,
71}
72
73#[derive(Debug, Clone, Default)]
74pub struct WebhookPolicy {
75    allow_paths: Vec<String>,
76    deny_paths: Vec<String>,
77}
78
79#[derive(Debug, Clone, Deserialize)]
80pub struct WebhookBindingConfig {
81    #[serde(default)]
82    pub allow_paths: Vec<String>,
83    #[serde(default)]
84    pub deny_paths: Vec<String>,
85}
86
87#[derive(Debug, Clone, Deserialize)]
88pub struct TimerBinding {
89    pub flow_id: String,
90    pub cron: String,
91    #[serde(default)]
92    pub schedule_id: Option<String>,
93}
94
95#[derive(Debug, Clone, Deserialize)]
96pub struct OAuthConfig {
97    pub http_base_url: String,
98    pub nats_url: String,
99    pub provider: String,
100    #[serde(default)]
101    pub env: Option<String>,
102    #[serde(default)]
103    pub team: Option<String>,
104}
105
106impl HostConfig {
107    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
108        let path = path.as_ref();
109        let content = fs::read_to_string(path)
110            .with_context(|| format!("failed to read bindings file {path:?}"))?;
111        let bindings: BindingsFile = serde_yaml::from_str(&content)
112            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
113
114        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
115        let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
116        let webhook_policy = bindings
117            .flow_type_bindings
118            .get("webhook")
119            .and_then(|binding| {
120                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
121                    .map(WebhookPolicy::from)
122                    .map_err(|err| {
123                        tracing::warn!(error = %err, "failed to parse webhook binding config");
124                        err
125                    })
126                    .ok()
127            })
128            .unwrap_or_default();
129
130        Ok(Self {
131            tenant: bindings.tenant.clone(),
132            bindings_path: path.to_path_buf(),
133            flow_type_bindings: bindings.flow_type_bindings.clone(),
134            rate_limits: bindings.rate_limits.clone(),
135            retry: bindings.retry.clone(),
136            http_enabled,
137            secrets_policy,
138            webhook_policy,
139            timers: bindings.timers.clone(),
140            oauth: bindings.oauth.clone(),
141            mocks: bindings.mocks.clone(),
142        })
143    }
144
145    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
146        self.flow_type_bindings.get("messaging")
147    }
148
149    pub fn retry_config(&self) -> FlowRetryConfig {
150        self.retry.clone()
151    }
152
153    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
154        let oauth = self.oauth.as_ref()?;
155        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
156        if !oauth.provider.is_empty() {
157            cfg.default_provider = Some(oauth.provider.clone());
158        }
159        if let Some(team) = &oauth.team
160            && !team.is_empty()
161        {
162            cfg.team = Some(team.clone());
163        }
164        Some(cfg)
165    }
166}
167
168impl SecretsPolicy {
169    fn from_bindings(bindings: &BindingsFile) -> Self {
170        let allowed = bindings
171            .flow_type_bindings
172            .values()
173            .flat_map(|binding| binding.secrets.iter().cloned())
174            .collect::<HashSet<_>>();
175        Self {
176            allowed,
177            allow_all: false,
178        }
179    }
180
181    pub fn is_allowed(&self, key: &str) -> bool {
182        self.allow_all || self.allowed.contains(key)
183    }
184
185    pub fn allow_all() -> Self {
186        Self {
187            allowed: HashSet::new(),
188            allow_all: true,
189        }
190    }
191}
192
193impl Default for RateLimits {
194    fn default() -> Self {
195        Self {
196            messaging_send_qps: default_messaging_qps(),
197            messaging_burst: default_messaging_burst(),
198        }
199    }
200}
201
202fn default_messaging_qps() -> u32 {
203    10
204}
205
206fn default_messaging_burst() -> u32 {
207    20
208}
209
210impl From<WebhookBindingConfig> for WebhookPolicy {
211    fn from(value: WebhookBindingConfig) -> Self {
212        Self {
213            allow_paths: value.allow_paths,
214            deny_paths: value.deny_paths,
215        }
216    }
217}
218
219impl WebhookPolicy {
220    pub fn is_allowed(&self, path: &str) -> bool {
221        if self
222            .deny_paths
223            .iter()
224            .any(|prefix| path.starts_with(prefix))
225        {
226            return false;
227        }
228
229        if self.allow_paths.is_empty() {
230            return true;
231        }
232
233        self.allow_paths
234            .iter()
235            .any(|prefix| path.starts_with(prefix))
236    }
237}
238
239impl TimerBinding {
240    pub fn schedule_id(&self) -> &str {
241        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
242    }
243}
244
245impl Default for FlowRetryConfig {
246    fn default() -> Self {
247        Self {
248            max_attempts: default_retry_attempts(),
249            base_delay_ms: default_retry_base_delay_ms(),
250        }
251    }
252}
253
254fn default_retry_attempts() -> u32 {
255    3
256}
257
258fn default_retry_base_delay_ms() -> u64 {
259    250
260}
261
262#[cfg(test)]
263#[allow(clippy::items_after_test_module)]
264mod tests {
265    use super::*;
266    use std::collections::HashMap;
267    use std::path::PathBuf;
268
269    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
270        HostConfig {
271            tenant: "tenant-a".to_string(),
272            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
273            flow_type_bindings: HashMap::new(),
274            rate_limits: RateLimits::default(),
275            retry: FlowRetryConfig::default(),
276            http_enabled: false,
277            secrets_policy: SecretsPolicy::allow_all(),
278            webhook_policy: WebhookPolicy::default(),
279            timers: Vec::new(),
280            oauth,
281            mocks: None,
282        }
283    }
284
285    #[test]
286    fn oauth_broker_config_absent_without_block() {
287        let cfg = host_config_with_oauth(None);
288        assert!(cfg.oauth_broker_config().is_none());
289    }
290
291    #[test]
292    fn oauth_broker_config_maps_fields() {
293        let cfg = host_config_with_oauth(Some(OAuthConfig {
294            http_base_url: "https://oauth.example/".into(),
295            nats_url: "nats://broker:4222".into(),
296            provider: "demo".into(),
297            env: None,
298            team: Some("ops".into()),
299        }));
300        let broker = cfg.oauth_broker_config().expect("missing broker config");
301        assert_eq!(broker.http_base_url, "https://oauth.example/");
302        assert_eq!(broker.nats_url, "nats://broker:4222");
303        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
304        assert_eq!(broker.team.as_deref(), Some("ops"));
305    }
306}