greentic_runner_host/
config.rs

1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use anyhow::{Context, Result};
6use serde::Deserialize;
7use serde_yaml_bw as serde_yaml;
8use std::collections::{HashMap, HashSet};
9use std::fs;
10use std::path::{Path, PathBuf};
11use std::str::FromStr;
12
13#[derive(Debug, Clone)]
14pub struct HostConfig {
15    pub tenant: String,
16    pub bindings_path: PathBuf,
17    pub flow_type_bindings: HashMap<String, FlowBinding>,
18    pub rate_limits: RateLimits,
19    pub retry: FlowRetryConfig,
20    pub http_enabled: bool,
21    pub secrets_policy: SecretsPolicy,
22    pub state_store_policy: StateStorePolicy,
23    pub webhook_policy: WebhookPolicy,
24    pub timers: Vec<TimerBinding>,
25    pub oauth: Option<OAuthConfig>,
26    pub mocks: Option<MocksConfig>,
27    pub pack_bindings: Vec<PackBinding>,
28    pub env_passthrough: Vec<String>,
29}
30
31#[derive(Debug, Clone, Deserialize)]
32pub struct BindingsFile {
33    pub tenant: String,
34    #[serde(default)]
35    pub flow_type_bindings: HashMap<String, FlowBinding>,
36    #[serde(default)]
37    pub rate_limits: RateLimits,
38    #[serde(default)]
39    pub retry: FlowRetryConfig,
40    #[serde(default)]
41    pub timers: Vec<TimerBinding>,
42    #[serde(default)]
43    pub oauth: Option<OAuthConfig>,
44    #[serde(default)]
45    pub mocks: Option<MocksConfig>,
46    #[serde(default)]
47    pub state_store: StateStorePolicy,
48}
49
50#[derive(Debug, Clone, Deserialize)]
51pub struct FlowBinding {
52    pub adapter: String,
53    #[serde(default)]
54    pub config: serde_yaml::Value,
55    #[serde(default)]
56    pub secrets: Vec<String>,
57}
58
59#[derive(Debug, Clone, Deserialize)]
60pub struct RateLimits {
61    #[serde(default = "default_messaging_qps")]
62    pub messaging_send_qps: u32,
63    #[serde(default = "default_messaging_burst")]
64    pub messaging_burst: u32,
65}
66
67#[derive(Debug, Clone)]
68pub struct SecretsPolicy {
69    allowed: HashSet<String>,
70    allow_all: bool,
71}
72
73#[derive(Debug, Clone, Deserialize)]
74pub struct FlowRetryConfig {
75    #[serde(default = "default_retry_attempts")]
76    pub max_attempts: u32,
77    #[serde(default = "default_retry_base_delay_ms")]
78    pub base_delay_ms: u64,
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct WebhookPolicy {
83    allow_paths: Vec<String>,
84    deny_paths: Vec<String>,
85}
86
87#[derive(Debug, Clone, Deserialize)]
88pub struct StateStorePolicy {
89    #[serde(default = "default_state_store_allow")]
90    pub allow: bool,
91}
92
93#[derive(Debug, Clone, Deserialize)]
94pub struct WebhookBindingConfig {
95    #[serde(default)]
96    pub allow_paths: Vec<String>,
97    #[serde(default)]
98    pub deny_paths: Vec<String>,
99}
100
101#[derive(Debug, Clone, Deserialize)]
102pub struct TimerBinding {
103    pub flow_id: String,
104    pub cron: String,
105    #[serde(default)]
106    pub schedule_id: Option<String>,
107}
108
109#[derive(Debug, Clone, Deserialize)]
110pub struct OAuthConfig {
111    pub http_base_url: String,
112    pub nats_url: String,
113    pub provider: String,
114    #[serde(default)]
115    pub env: Option<String>,
116    #[serde(default)]
117    pub team: Option<String>,
118}
119
120impl HostConfig {
121    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
122        let path = path.as_ref();
123        let content = fs::read_to_string(path)
124            .with_context(|| format!("failed to read bindings file {path:?}"))?;
125        let bindings: BindingsFile = serde_yaml::from_str(&content)
126            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
127
128        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
129        let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
130        let webhook_policy = bindings
131            .flow_type_bindings
132            .get("webhook")
133            .and_then(|binding| {
134                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
135                    .map(WebhookPolicy::from)
136                    .map_err(|err| {
137                        tracing::warn!(error = %err, "failed to parse webhook binding config");
138                        err
139                    })
140                    .ok()
141            })
142            .unwrap_or_default();
143
144        Ok(Self {
145            tenant: bindings.tenant.clone(),
146            bindings_path: path.to_path_buf(),
147            flow_type_bindings: bindings.flow_type_bindings.clone(),
148            rate_limits: bindings.rate_limits.clone(),
149            retry: bindings.retry.clone(),
150            http_enabled,
151            secrets_policy,
152            state_store_policy: bindings.state_store.clone(),
153            webhook_policy,
154            timers: bindings.timers.clone(),
155            oauth: bindings.oauth.clone(),
156            mocks: bindings.mocks.clone(),
157            pack_bindings: Vec::new(),
158            env_passthrough: Vec::new(),
159        })
160    }
161
162    pub fn from_gtbind(bindings: TenantBindings) -> Self {
163        Self {
164            tenant: bindings.tenant,
165            bindings_path: PathBuf::from("<gtbind>"),
166            flow_type_bindings: HashMap::new(),
167            rate_limits: RateLimits::default(),
168            retry: FlowRetryConfig::default(),
169            http_enabled: false,
170            secrets_policy: SecretsPolicy::allow_all(),
171            state_store_policy: StateStorePolicy::default(),
172            webhook_policy: WebhookPolicy::default(),
173            timers: Vec::new(),
174            oauth: None,
175            mocks: None,
176            pack_bindings: bindings.packs,
177            env_passthrough: bindings.env_passthrough,
178        }
179    }
180
181    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
182        self.flow_type_bindings.get("messaging")
183    }
184
185    pub fn retry_config(&self) -> FlowRetryConfig {
186        self.retry.clone()
187    }
188
189    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
190        let oauth = self.oauth.as_ref()?;
191        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
192        if !oauth.provider.is_empty() {
193            cfg.default_provider = Some(oauth.provider.clone());
194        }
195        if let Some(team) = &oauth.team
196            && !team.is_empty()
197        {
198            cfg.team = Some(team.clone());
199        }
200        Some(cfg)
201    }
202
203    /// Derive a tenant context for the current configuration. This is used when
204    /// evaluating scoped requirements (e.g. secrets).
205    pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
206        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
207        let env_id = greentic_types::EnvId::from_str(&env)
208            .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
209        let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
210            .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
211        greentic_types::TenantCtx::new(env_id, tenant_id)
212    }
213}
214
215impl SecretsPolicy {
216    fn from_bindings(bindings: &BindingsFile) -> Self {
217        let allowed = bindings
218            .flow_type_bindings
219            .values()
220            .flat_map(|binding| binding.secrets.iter().cloned())
221            .collect::<HashSet<_>>();
222        Self {
223            allowed,
224            allow_all: false,
225        }
226    }
227
228    pub fn is_allowed(&self, key: &str) -> bool {
229        self.allow_all || self.allowed.contains(key)
230    }
231
232    pub fn allow_all() -> Self {
233        Self {
234            allowed: HashSet::new(),
235            allow_all: true,
236        }
237    }
238}
239
240impl Default for RateLimits {
241    fn default() -> Self {
242        Self {
243            messaging_send_qps: default_messaging_qps(),
244            messaging_burst: default_messaging_burst(),
245        }
246    }
247}
248
249impl Default for StateStorePolicy {
250    fn default() -> Self {
251        Self {
252            allow: default_state_store_allow(),
253        }
254    }
255}
256
257fn default_messaging_qps() -> u32 {
258    10
259}
260
261fn default_messaging_burst() -> u32 {
262    20
263}
264
265fn default_state_store_allow() -> bool {
266    true
267}
268
269impl From<WebhookBindingConfig> for WebhookPolicy {
270    fn from(value: WebhookBindingConfig) -> Self {
271        Self {
272            allow_paths: value.allow_paths,
273            deny_paths: value.deny_paths,
274        }
275    }
276}
277
278impl WebhookPolicy {
279    pub fn is_allowed(&self, path: &str) -> bool {
280        if self
281            .deny_paths
282            .iter()
283            .any(|prefix| path.starts_with(prefix))
284        {
285            return false;
286        }
287
288        if self.allow_paths.is_empty() {
289            return true;
290        }
291
292        self.allow_paths
293            .iter()
294            .any(|prefix| path.starts_with(prefix))
295    }
296}
297
298impl TimerBinding {
299    pub fn schedule_id(&self) -> &str {
300        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
301    }
302}
303
304impl Default for FlowRetryConfig {
305    fn default() -> Self {
306        Self {
307            max_attempts: default_retry_attempts(),
308            base_delay_ms: default_retry_base_delay_ms(),
309        }
310    }
311}
312
313fn default_retry_attempts() -> u32 {
314    3
315}
316
317fn default_retry_base_delay_ms() -> u64 {
318    250
319}
320
321#[cfg(test)]
322#[allow(clippy::items_after_test_module)]
323mod tests {
324    use super::*;
325    use std::collections::HashMap;
326    use std::path::PathBuf;
327
328    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
329        HostConfig {
330            tenant: "tenant-a".to_string(),
331            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
332            flow_type_bindings: HashMap::new(),
333            rate_limits: RateLimits::default(),
334            retry: FlowRetryConfig::default(),
335            http_enabled: false,
336            secrets_policy: SecretsPolicy::allow_all(),
337            state_store_policy: StateStorePolicy::default(),
338            webhook_policy: WebhookPolicy::default(),
339            timers: Vec::new(),
340            oauth,
341            mocks: None,
342            pack_bindings: Vec::new(),
343            env_passthrough: Vec::new(),
344        }
345    }
346
347    #[test]
348    fn oauth_broker_config_absent_without_block() {
349        let cfg = host_config_with_oauth(None);
350        assert!(cfg.oauth_broker_config().is_none());
351    }
352
353    #[test]
354    fn oauth_broker_config_maps_fields() {
355        let cfg = host_config_with_oauth(Some(OAuthConfig {
356            http_base_url: "https://oauth.example/".into(),
357            nats_url: "nats://broker:4222".into(),
358            provider: "demo".into(),
359            env: None,
360            team: Some("ops".into()),
361        }));
362        let broker = cfg.oauth_broker_config().expect("missing broker config");
363        assert_eq!(broker.http_base_url, "https://oauth.example/");
364        assert_eq!(broker.nats_url, "nats://broker:4222");
365        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
366        assert_eq!(broker.team.as_deref(), Some("ops"));
367    }
368}