1use crate::oauth::OAuthBrokerConfig;
2use crate::runner::mocks::MocksConfig;
3use anyhow::{Context, Result};
4use serde::Deserialize;
5use serde_yaml_bw as serde_yaml;
6use std::collections::{HashMap, HashSet};
7use std::fs;
8use std::path::{Path, PathBuf};
9
10#[derive(Debug, Clone)]
11pub struct HostConfig {
12 pub tenant: String,
13 pub bindings_path: PathBuf,
14 pub flow_type_bindings: HashMap<String, FlowBinding>,
15 pub rate_limits: RateLimits,
16 pub retry: FlowRetryConfig,
17 pub http_enabled: bool,
18 pub secrets_policy: SecretsPolicy,
19 pub webhook_policy: WebhookPolicy,
20 pub timers: Vec<TimerBinding>,
21 pub oauth: Option<OAuthConfig>,
22 pub mocks: Option<MocksConfig>,
23}
24
25#[derive(Debug, Clone, Deserialize)]
26pub struct BindingsFile {
27 pub tenant: String,
28 #[serde(default)]
29 pub flow_type_bindings: HashMap<String, FlowBinding>,
30 #[serde(default)]
31 pub rate_limits: RateLimits,
32 #[serde(default)]
33 pub retry: FlowRetryConfig,
34 #[serde(default)]
35 pub timers: Vec<TimerBinding>,
36 #[serde(default)]
37 pub oauth: Option<OAuthConfig>,
38 #[serde(default)]
39 pub mocks: Option<MocksConfig>,
40}
41
42#[derive(Debug, Clone, Deserialize)]
43pub struct FlowBinding {
44 pub adapter: String,
45 #[serde(default)]
46 pub config: serde_yaml::Value,
47 #[serde(default)]
48 pub secrets: Vec<String>,
49}
50
51#[derive(Debug, Clone, Deserialize)]
52pub struct RateLimits {
53 #[serde(default = "default_messaging_qps")]
54 pub messaging_send_qps: u32,
55 #[serde(default = "default_messaging_burst")]
56 pub messaging_burst: u32,
57}
58
59#[derive(Debug, Clone)]
60pub struct SecretsPolicy {
61 allowed: HashSet<String>,
62 allow_all: bool,
63}
64
65#[derive(Debug, Clone, Deserialize)]
66pub struct FlowRetryConfig {
67 #[serde(default = "default_retry_attempts")]
68 pub max_attempts: u32,
69 #[serde(default = "default_retry_base_delay_ms")]
70 pub base_delay_ms: u64,
71}
72
73#[derive(Debug, Clone, Default)]
74pub struct WebhookPolicy {
75 allow_paths: Vec<String>,
76 deny_paths: Vec<String>,
77}
78
79#[derive(Debug, Clone, Deserialize)]
80pub struct WebhookBindingConfig {
81 #[serde(default)]
82 pub allow_paths: Vec<String>,
83 #[serde(default)]
84 pub deny_paths: Vec<String>,
85}
86
87#[derive(Debug, Clone, Deserialize)]
88pub struct TimerBinding {
89 pub flow_id: String,
90 pub cron: String,
91 #[serde(default)]
92 pub schedule_id: Option<String>,
93}
94
95#[derive(Debug, Clone, Deserialize)]
96pub struct OAuthConfig {
97 pub http_base_url: String,
98 pub nats_url: String,
99 pub provider: String,
100 #[serde(default)]
101 pub env: Option<String>,
102 #[serde(default)]
103 pub team: Option<String>,
104}
105
106impl HostConfig {
107 pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
108 let path = path.as_ref();
109 let content = fs::read_to_string(path)
110 .with_context(|| format!("failed to read bindings file {path:?}"))?;
111 let bindings: BindingsFile = serde_yaml::from_str(&content)
112 .with_context(|| format!("failed to parse bindings file {path:?}"))?;
113
114 let secrets_policy = SecretsPolicy::from_bindings(&bindings);
115 let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
116 let webhook_policy = bindings
117 .flow_type_bindings
118 .get("webhook")
119 .and_then(|binding| {
120 serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
121 .map(WebhookPolicy::from)
122 .map_err(|err| {
123 tracing::warn!(error = %err, "failed to parse webhook binding config");
124 err
125 })
126 .ok()
127 })
128 .unwrap_or_default();
129
130 Ok(Self {
131 tenant: bindings.tenant.clone(),
132 bindings_path: path.to_path_buf(),
133 flow_type_bindings: bindings.flow_type_bindings.clone(),
134 rate_limits: bindings.rate_limits.clone(),
135 retry: bindings.retry.clone(),
136 http_enabled,
137 secrets_policy,
138 webhook_policy,
139 timers: bindings.timers.clone(),
140 oauth: bindings.oauth.clone(),
141 mocks: bindings.mocks.clone(),
142 })
143 }
144
145 pub fn messaging_binding(&self) -> Option<&FlowBinding> {
146 self.flow_type_bindings.get("messaging")
147 }
148
149 pub fn retry_config(&self) -> FlowRetryConfig {
150 self.retry.clone()
151 }
152
153 pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
154 let oauth = self.oauth.as_ref()?;
155 let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
156 if !oauth.provider.is_empty() {
157 cfg.default_provider = Some(oauth.provider.clone());
158 }
159 if let Some(team) = &oauth.team
160 && !team.is_empty()
161 {
162 cfg.team = Some(team.clone());
163 }
164 Some(cfg)
165 }
166}
167
168impl SecretsPolicy {
169 fn from_bindings(bindings: &BindingsFile) -> Self {
170 let allowed = bindings
171 .flow_type_bindings
172 .values()
173 .flat_map(|binding| binding.secrets.iter().cloned())
174 .collect::<HashSet<_>>();
175 Self {
176 allowed,
177 allow_all: false,
178 }
179 }
180
181 pub fn is_allowed(&self, key: &str) -> bool {
182 self.allow_all || self.allowed.contains(key)
183 }
184
185 pub fn allow_all() -> Self {
186 Self {
187 allowed: HashSet::new(),
188 allow_all: true,
189 }
190 }
191}
192
193impl Default for RateLimits {
194 fn default() -> Self {
195 Self {
196 messaging_send_qps: default_messaging_qps(),
197 messaging_burst: default_messaging_burst(),
198 }
199 }
200}
201
202fn default_messaging_qps() -> u32 {
203 10
204}
205
206fn default_messaging_burst() -> u32 {
207 20
208}
209
210impl From<WebhookBindingConfig> for WebhookPolicy {
211 fn from(value: WebhookBindingConfig) -> Self {
212 Self {
213 allow_paths: value.allow_paths,
214 deny_paths: value.deny_paths,
215 }
216 }
217}
218
219impl WebhookPolicy {
220 pub fn is_allowed(&self, path: &str) -> bool {
221 if self
222 .deny_paths
223 .iter()
224 .any(|prefix| path.starts_with(prefix))
225 {
226 return false;
227 }
228
229 if self.allow_paths.is_empty() {
230 return true;
231 }
232
233 self.allow_paths
234 .iter()
235 .any(|prefix| path.starts_with(prefix))
236 }
237}
238
239impl TimerBinding {
240 pub fn schedule_id(&self) -> &str {
241 self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
242 }
243}
244
245impl Default for FlowRetryConfig {
246 fn default() -> Self {
247 Self {
248 max_attempts: default_retry_attempts(),
249 base_delay_ms: default_retry_base_delay_ms(),
250 }
251 }
252}
253
254fn default_retry_attempts() -> u32 {
255 3
256}
257
258fn default_retry_base_delay_ms() -> u64 {
259 250
260}
261
262#[cfg(test)]
263#[allow(clippy::items_after_test_module)]
264mod tests {
265 use super::*;
266 use std::collections::HashMap;
267 use std::path::PathBuf;
268
269 fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
270 HostConfig {
271 tenant: "tenant-a".to_string(),
272 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
273 flow_type_bindings: HashMap::new(),
274 rate_limits: RateLimits::default(),
275 retry: FlowRetryConfig::default(),
276 http_enabled: false,
277 secrets_policy: SecretsPolicy::allow_all(),
278 webhook_policy: WebhookPolicy::default(),
279 timers: Vec::new(),
280 oauth,
281 mocks: None,
282 }
283 }
284
285 #[test]
286 fn oauth_broker_config_absent_without_block() {
287 let cfg = host_config_with_oauth(None);
288 assert!(cfg.oauth_broker_config().is_none());
289 }
290
291 #[test]
292 fn oauth_broker_config_maps_fields() {
293 let cfg = host_config_with_oauth(Some(OAuthConfig {
294 http_base_url: "https://oauth.example/".into(),
295 nats_url: "nats://broker:4222".into(),
296 provider: "demo".into(),
297 env: None,
298 team: Some("ops".into()),
299 }));
300 let broker = cfg.oauth_broker_config().expect("missing broker config");
301 assert_eq!(broker.http_base_url, "https://oauth.example/");
302 assert_eq!(broker.nats_url, "nats://broker:4222");
303 assert_eq!(broker.default_provider.as_deref(), Some("demo"));
304 assert_eq!(broker.team.as_deref(), Some("ops"));
305 }
306}