1use crate::oauth::OAuthBrokerConfig;
2use crate::runner::mocks::MocksConfig;
3use anyhow::{Context, Result};
4use serde::Deserialize;
5use serde_yaml_bw as serde_yaml;
6use std::collections::{HashMap, HashSet};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::str::FromStr;
10
11#[derive(Debug, Clone)]
12pub struct HostConfig {
13 pub tenant: String,
14 pub bindings_path: PathBuf,
15 pub flow_type_bindings: HashMap<String, FlowBinding>,
16 pub rate_limits: RateLimits,
17 pub retry: FlowRetryConfig,
18 pub http_enabled: bool,
19 pub secrets_policy: SecretsPolicy,
20 pub state_store_policy: StateStorePolicy,
21 pub webhook_policy: WebhookPolicy,
22 pub timers: Vec<TimerBinding>,
23 pub oauth: Option<OAuthConfig>,
24 pub mocks: Option<MocksConfig>,
25}
26
27#[derive(Debug, Clone, Deserialize)]
28pub struct BindingsFile {
29 pub tenant: String,
30 #[serde(default)]
31 pub flow_type_bindings: HashMap<String, FlowBinding>,
32 #[serde(default)]
33 pub rate_limits: RateLimits,
34 #[serde(default)]
35 pub retry: FlowRetryConfig,
36 #[serde(default)]
37 pub timers: Vec<TimerBinding>,
38 #[serde(default)]
39 pub oauth: Option<OAuthConfig>,
40 #[serde(default)]
41 pub mocks: Option<MocksConfig>,
42 #[serde(default)]
43 pub state_store: StateStorePolicy,
44}
45
46#[derive(Debug, Clone, Deserialize)]
47pub struct FlowBinding {
48 pub adapter: String,
49 #[serde(default)]
50 pub config: serde_yaml::Value,
51 #[serde(default)]
52 pub secrets: Vec<String>,
53}
54
55#[derive(Debug, Clone, Deserialize)]
56pub struct RateLimits {
57 #[serde(default = "default_messaging_qps")]
58 pub messaging_send_qps: u32,
59 #[serde(default = "default_messaging_burst")]
60 pub messaging_burst: u32,
61}
62
63#[derive(Debug, Clone)]
64pub struct SecretsPolicy {
65 allowed: HashSet<String>,
66 allow_all: bool,
67}
68
69#[derive(Debug, Clone, Deserialize)]
70pub struct FlowRetryConfig {
71 #[serde(default = "default_retry_attempts")]
72 pub max_attempts: u32,
73 #[serde(default = "default_retry_base_delay_ms")]
74 pub base_delay_ms: u64,
75}
76
77#[derive(Debug, Clone, Default)]
78pub struct WebhookPolicy {
79 allow_paths: Vec<String>,
80 deny_paths: Vec<String>,
81}
82
83#[derive(Debug, Clone, Deserialize)]
84pub struct StateStorePolicy {
85 #[serde(default = "default_state_store_allow")]
86 pub allow: bool,
87}
88
89#[derive(Debug, Clone, Deserialize)]
90pub struct WebhookBindingConfig {
91 #[serde(default)]
92 pub allow_paths: Vec<String>,
93 #[serde(default)]
94 pub deny_paths: Vec<String>,
95}
96
97#[derive(Debug, Clone, Deserialize)]
98pub struct TimerBinding {
99 pub flow_id: String,
100 pub cron: String,
101 #[serde(default)]
102 pub schedule_id: Option<String>,
103}
104
105#[derive(Debug, Clone, Deserialize)]
106pub struct OAuthConfig {
107 pub http_base_url: String,
108 pub nats_url: String,
109 pub provider: String,
110 #[serde(default)]
111 pub env: Option<String>,
112 #[serde(default)]
113 pub team: Option<String>,
114}
115
116impl HostConfig {
117 pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
118 let path = path.as_ref();
119 let content = fs::read_to_string(path)
120 .with_context(|| format!("failed to read bindings file {path:?}"))?;
121 let bindings: BindingsFile = serde_yaml::from_str(&content)
122 .with_context(|| format!("failed to parse bindings file {path:?}"))?;
123
124 let secrets_policy = SecretsPolicy::from_bindings(&bindings);
125 let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
126 let webhook_policy = bindings
127 .flow_type_bindings
128 .get("webhook")
129 .and_then(|binding| {
130 serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
131 .map(WebhookPolicy::from)
132 .map_err(|err| {
133 tracing::warn!(error = %err, "failed to parse webhook binding config");
134 err
135 })
136 .ok()
137 })
138 .unwrap_or_default();
139
140 Ok(Self {
141 tenant: bindings.tenant.clone(),
142 bindings_path: path.to_path_buf(),
143 flow_type_bindings: bindings.flow_type_bindings.clone(),
144 rate_limits: bindings.rate_limits.clone(),
145 retry: bindings.retry.clone(),
146 http_enabled,
147 secrets_policy,
148 state_store_policy: bindings.state_store.clone(),
149 webhook_policy,
150 timers: bindings.timers.clone(),
151 oauth: bindings.oauth.clone(),
152 mocks: bindings.mocks.clone(),
153 })
154 }
155
156 pub fn messaging_binding(&self) -> Option<&FlowBinding> {
157 self.flow_type_bindings.get("messaging")
158 }
159
160 pub fn retry_config(&self) -> FlowRetryConfig {
161 self.retry.clone()
162 }
163
164 pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
165 let oauth = self.oauth.as_ref()?;
166 let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
167 if !oauth.provider.is_empty() {
168 cfg.default_provider = Some(oauth.provider.clone());
169 }
170 if let Some(team) = &oauth.team
171 && !team.is_empty()
172 {
173 cfg.team = Some(team.clone());
174 }
175 Some(cfg)
176 }
177
178 pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
181 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
182 let env_id = greentic_types::EnvId::from_str(&env)
183 .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
184 let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
185 .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
186 greentic_types::TenantCtx::new(env_id, tenant_id)
187 }
188}
189
190impl SecretsPolicy {
191 fn from_bindings(bindings: &BindingsFile) -> Self {
192 let allowed = bindings
193 .flow_type_bindings
194 .values()
195 .flat_map(|binding| binding.secrets.iter().cloned())
196 .collect::<HashSet<_>>();
197 Self {
198 allowed,
199 allow_all: false,
200 }
201 }
202
203 pub fn is_allowed(&self, key: &str) -> bool {
204 self.allow_all || self.allowed.contains(key)
205 }
206
207 pub fn allow_all() -> Self {
208 Self {
209 allowed: HashSet::new(),
210 allow_all: true,
211 }
212 }
213}
214
215impl Default for RateLimits {
216 fn default() -> Self {
217 Self {
218 messaging_send_qps: default_messaging_qps(),
219 messaging_burst: default_messaging_burst(),
220 }
221 }
222}
223
224impl Default for StateStorePolicy {
225 fn default() -> Self {
226 Self {
227 allow: default_state_store_allow(),
228 }
229 }
230}
231
232fn default_messaging_qps() -> u32 {
233 10
234}
235
236fn default_messaging_burst() -> u32 {
237 20
238}
239
240fn default_state_store_allow() -> bool {
241 true
242}
243
244impl From<WebhookBindingConfig> for WebhookPolicy {
245 fn from(value: WebhookBindingConfig) -> Self {
246 Self {
247 allow_paths: value.allow_paths,
248 deny_paths: value.deny_paths,
249 }
250 }
251}
252
253impl WebhookPolicy {
254 pub fn is_allowed(&self, path: &str) -> bool {
255 if self
256 .deny_paths
257 .iter()
258 .any(|prefix| path.starts_with(prefix))
259 {
260 return false;
261 }
262
263 if self.allow_paths.is_empty() {
264 return true;
265 }
266
267 self.allow_paths
268 .iter()
269 .any(|prefix| path.starts_with(prefix))
270 }
271}
272
273impl TimerBinding {
274 pub fn schedule_id(&self) -> &str {
275 self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
276 }
277}
278
279impl Default for FlowRetryConfig {
280 fn default() -> Self {
281 Self {
282 max_attempts: default_retry_attempts(),
283 base_delay_ms: default_retry_base_delay_ms(),
284 }
285 }
286}
287
288fn default_retry_attempts() -> u32 {
289 3
290}
291
292fn default_retry_base_delay_ms() -> u64 {
293 250
294}
295
296#[cfg(test)]
297#[allow(clippy::items_after_test_module)]
298mod tests {
299 use super::*;
300 use std::collections::HashMap;
301 use std::path::PathBuf;
302
303 fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
304 HostConfig {
305 tenant: "tenant-a".to_string(),
306 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
307 flow_type_bindings: HashMap::new(),
308 rate_limits: RateLimits::default(),
309 retry: FlowRetryConfig::default(),
310 http_enabled: false,
311 secrets_policy: SecretsPolicy::allow_all(),
312 state_store_policy: StateStorePolicy::default(),
313 webhook_policy: WebhookPolicy::default(),
314 timers: Vec::new(),
315 oauth,
316 mocks: None,
317 }
318 }
319
320 #[test]
321 fn oauth_broker_config_absent_without_block() {
322 let cfg = host_config_with_oauth(None);
323 assert!(cfg.oauth_broker_config().is_none());
324 }
325
326 #[test]
327 fn oauth_broker_config_maps_fields() {
328 let cfg = host_config_with_oauth(Some(OAuthConfig {
329 http_base_url: "https://oauth.example/".into(),
330 nats_url: "nats://broker:4222".into(),
331 provider: "demo".into(),
332 env: None,
333 team: Some("ops".into()),
334 }));
335 let broker = cfg.oauth_broker_config().expect("missing broker config");
336 assert_eq!(broker.http_base_url, "https://oauth.example/");
337 assert_eq!(broker.nats_url, "nats://broker:4222");
338 assert_eq!(broker.default_provider.as_deref(), Some("demo"));
339 assert_eq!(broker.team.as_deref(), Some("ops"));
340 }
341}