1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use crate::trace::TraceConfig;
6use crate::validate::ValidationConfig;
7use anyhow::{Context, Result};
8use serde::Deserialize;
9use serde_yaml_bw as serde_yaml;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14
15#[derive(Debug, Clone)]
16pub struct HostConfig {
17 pub tenant: String,
18 pub bindings_path: PathBuf,
19 pub flow_type_bindings: HashMap<String, FlowBinding>,
20 pub rate_limits: RateLimits,
21 pub retry: FlowRetryConfig,
22 pub http_enabled: bool,
23 pub secrets_policy: SecretsPolicy,
24 pub state_store_policy: StateStorePolicy,
25 pub webhook_policy: WebhookPolicy,
26 pub timers: Vec<TimerBinding>,
27 pub oauth: Option<OAuthConfig>,
28 pub mocks: Option<MocksConfig>,
29 pub pack_bindings: Vec<PackBinding>,
30 pub env_passthrough: Vec<String>,
31 pub trace: TraceConfig,
32 pub validation: ValidationConfig,
33}
34
35#[derive(Debug, Clone, Deserialize)]
36pub struct BindingsFile {
37 pub tenant: String,
38 #[serde(default)]
39 pub flow_type_bindings: HashMap<String, FlowBinding>,
40 #[serde(default)]
41 pub rate_limits: RateLimits,
42 #[serde(default)]
43 pub retry: FlowRetryConfig,
44 #[serde(default)]
45 pub timers: Vec<TimerBinding>,
46 #[serde(default)]
47 pub oauth: Option<OAuthConfig>,
48 #[serde(default)]
49 pub mocks: Option<MocksConfig>,
50 #[serde(default)]
51 pub state_store: StateStorePolicy,
52}
53
54#[derive(Debug, Clone, Deserialize)]
55pub struct FlowBinding {
56 pub adapter: String,
57 #[serde(default)]
58 pub config: serde_yaml::Value,
59 #[serde(default)]
60 pub secrets: Vec<String>,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64pub struct RateLimits {
65 #[serde(default = "default_messaging_qps")]
66 pub messaging_send_qps: u32,
67 #[serde(default = "default_messaging_burst")]
68 pub messaging_burst: u32,
69}
70
71#[derive(Debug, Clone)]
72pub struct SecretsPolicy {
73 allowed: HashSet<String>,
74 allow_all: bool,
75}
76
77#[derive(Debug, Clone, Deserialize)]
78pub struct FlowRetryConfig {
79 #[serde(default = "default_retry_attempts")]
80 pub max_attempts: u32,
81 #[serde(default = "default_retry_base_delay_ms")]
82 pub base_delay_ms: u64,
83}
84
85#[derive(Debug, Clone, Default)]
86pub struct WebhookPolicy {
87 allow_paths: Vec<String>,
88 deny_paths: Vec<String>,
89}
90
91#[derive(Debug, Clone, Deserialize)]
92pub struct StateStorePolicy {
93 #[serde(default = "default_state_store_allow")]
94 pub allow: bool,
95}
96
97#[derive(Debug, Clone, Deserialize)]
98pub struct WebhookBindingConfig {
99 #[serde(default)]
100 pub allow_paths: Vec<String>,
101 #[serde(default)]
102 pub deny_paths: Vec<String>,
103}
104
105#[derive(Debug, Clone, Deserialize)]
106pub struct TimerBinding {
107 pub flow_id: String,
108 pub cron: String,
109 #[serde(default)]
110 pub schedule_id: Option<String>,
111}
112
113#[derive(Debug, Clone, Deserialize)]
114pub struct OAuthConfig {
115 pub http_base_url: String,
116 pub nats_url: String,
117 pub provider: String,
118 #[serde(default)]
119 pub env: Option<String>,
120 #[serde(default)]
121 pub team: Option<String>,
122}
123
124impl HostConfig {
125 pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
126 let path = path.as_ref();
127 let content = fs::read_to_string(path)
128 .with_context(|| format!("failed to read bindings file {path:?}"))?;
129 let bindings: BindingsFile = serde_yaml::from_str(&content)
130 .with_context(|| format!("failed to parse bindings file {path:?}"))?;
131
132 let secrets_policy = SecretsPolicy::from_bindings(&bindings);
133 let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
134 let webhook_policy = bindings
135 .flow_type_bindings
136 .get("webhook")
137 .and_then(|binding| {
138 serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
139 .map(WebhookPolicy::from)
140 .map_err(|err| {
141 tracing::warn!(error = %err, "failed to parse webhook binding config");
142 err
143 })
144 .ok()
145 })
146 .unwrap_or_default();
147
148 Ok(Self {
149 tenant: bindings.tenant.clone(),
150 bindings_path: path.to_path_buf(),
151 flow_type_bindings: bindings.flow_type_bindings.clone(),
152 rate_limits: bindings.rate_limits.clone(),
153 retry: bindings.retry.clone(),
154 http_enabled,
155 secrets_policy,
156 state_store_policy: bindings.state_store.clone(),
157 webhook_policy,
158 timers: bindings.timers.clone(),
159 oauth: bindings.oauth.clone(),
160 mocks: bindings.mocks.clone(),
161 pack_bindings: Vec::new(),
162 env_passthrough: Vec::new(),
163 trace: TraceConfig::from_env(),
164 validation: ValidationConfig::from_env(),
165 })
166 }
167
168 pub fn from_gtbind(bindings: TenantBindings) -> Self {
169 Self {
170 tenant: bindings.tenant,
171 bindings_path: PathBuf::from("<gtbind>"),
172 flow_type_bindings: HashMap::new(),
173 rate_limits: RateLimits::default(),
174 retry: FlowRetryConfig::default(),
175 http_enabled: false,
176 secrets_policy: SecretsPolicy::allow_all(),
177 state_store_policy: StateStorePolicy::default(),
178 webhook_policy: WebhookPolicy::default(),
179 timers: Vec::new(),
180 oauth: None,
181 mocks: None,
182 pack_bindings: bindings.packs,
183 env_passthrough: bindings.env_passthrough,
184 trace: TraceConfig::from_env(),
185 validation: ValidationConfig::from_env(),
186 }
187 }
188
189 pub fn messaging_binding(&self) -> Option<&FlowBinding> {
190 self.flow_type_bindings.get("messaging")
191 }
192
193 pub fn retry_config(&self) -> FlowRetryConfig {
194 self.retry.clone()
195 }
196
197 pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
198 let oauth = self.oauth.as_ref()?;
199 let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
200 if !oauth.provider.is_empty() {
201 cfg.default_provider = Some(oauth.provider.clone());
202 }
203 if let Some(team) = &oauth.team
204 && !team.is_empty()
205 {
206 cfg.team = Some(team.clone());
207 }
208 Some(cfg)
209 }
210
211 pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
214 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
215 let env_id = greentic_types::EnvId::from_str(&env)
216 .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
217 let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
218 .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
219 greentic_types::TenantCtx::new(env_id, tenant_id)
220 }
221}
222
223impl SecretsPolicy {
224 fn from_bindings(bindings: &BindingsFile) -> Self {
225 let allowed = bindings
226 .flow_type_bindings
227 .values()
228 .flat_map(|binding| binding.secrets.iter().cloned())
229 .collect::<HashSet<_>>();
230 Self {
231 allowed,
232 allow_all: false,
233 }
234 }
235
236 pub fn is_allowed(&self, key: &str) -> bool {
237 self.allow_all || self.allowed.contains(key)
238 }
239
240 pub fn allow_all() -> Self {
241 Self {
242 allowed: HashSet::new(),
243 allow_all: true,
244 }
245 }
246}
247
248impl Default for RateLimits {
249 fn default() -> Self {
250 Self {
251 messaging_send_qps: default_messaging_qps(),
252 messaging_burst: default_messaging_burst(),
253 }
254 }
255}
256
257impl Default for StateStorePolicy {
258 fn default() -> Self {
259 Self {
260 allow: default_state_store_allow(),
261 }
262 }
263}
264
265fn default_messaging_qps() -> u32 {
266 10
267}
268
269fn default_messaging_burst() -> u32 {
270 20
271}
272
273fn default_state_store_allow() -> bool {
274 true
275}
276
277impl From<WebhookBindingConfig> for WebhookPolicy {
278 fn from(value: WebhookBindingConfig) -> Self {
279 Self {
280 allow_paths: value.allow_paths,
281 deny_paths: value.deny_paths,
282 }
283 }
284}
285
286impl WebhookPolicy {
287 pub fn is_allowed(&self, path: &str) -> bool {
288 if self
289 .deny_paths
290 .iter()
291 .any(|prefix| path.starts_with(prefix))
292 {
293 return false;
294 }
295
296 if self.allow_paths.is_empty() {
297 return true;
298 }
299
300 self.allow_paths
301 .iter()
302 .any(|prefix| path.starts_with(prefix))
303 }
304}
305
306impl TimerBinding {
307 pub fn schedule_id(&self) -> &str {
308 self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
309 }
310}
311
312impl Default for FlowRetryConfig {
313 fn default() -> Self {
314 Self {
315 max_attempts: default_retry_attempts(),
316 base_delay_ms: default_retry_base_delay_ms(),
317 }
318 }
319}
320
321fn default_retry_attempts() -> u32 {
322 3
323}
324
325fn default_retry_base_delay_ms() -> u64 {
326 250
327}
328
329#[cfg(test)]
330#[allow(clippy::items_after_test_module)]
331mod tests {
332 use super::*;
333 use std::collections::HashMap;
334 use std::path::PathBuf;
335
336 fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
337 HostConfig {
338 tenant: "tenant-a".to_string(),
339 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
340 flow_type_bindings: HashMap::new(),
341 rate_limits: RateLimits::default(),
342 retry: FlowRetryConfig::default(),
343 http_enabled: false,
344 secrets_policy: SecretsPolicy::allow_all(),
345 state_store_policy: StateStorePolicy::default(),
346 webhook_policy: WebhookPolicy::default(),
347 timers: Vec::new(),
348 oauth,
349 mocks: None,
350 pack_bindings: Vec::new(),
351 env_passthrough: Vec::new(),
352 trace: TraceConfig::from_env(),
353 validation: ValidationConfig::from_env(),
354 }
355 }
356
357 #[test]
358 fn oauth_broker_config_absent_without_block() {
359 let cfg = host_config_with_oauth(None);
360 assert!(cfg.oauth_broker_config().is_none());
361 }
362
363 #[test]
364 fn oauth_broker_config_maps_fields() {
365 let cfg = host_config_with_oauth(Some(OAuthConfig {
366 http_base_url: "https://oauth.example/".into(),
367 nats_url: "nats://broker:4222".into(),
368 provider: "demo".into(),
369 env: None,
370 team: Some("ops".into()),
371 }));
372 let broker = cfg.oauth_broker_config().expect("missing broker config");
373 assert_eq!(broker.http_base_url, "https://oauth.example/");
374 assert_eq!(broker.nats_url, "nats://broker:4222");
375 assert_eq!(broker.default_provider.as_deref(), Some("demo"));
376 assert_eq!(broker.team.as_deref(), Some("ops"));
377 }
378}