1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use crate::trace::TraceConfig;
6use crate::validate::ValidationConfig;
7use anyhow::{Context, Result};
8use serde::Deserialize;
9use serde_yaml_bw as serde_yaml;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14
15#[derive(Debug, Clone)]
16pub struct HostConfig {
17 pub tenant: String,
18 pub bindings_path: PathBuf,
19 pub flow_type_bindings: HashMap<String, FlowBinding>,
20 pub rate_limits: RateLimits,
21 pub retry: FlowRetryConfig,
22 pub http_enabled: bool,
23 pub secrets_policy: SecretsPolicy,
24 pub state_store_policy: StateStorePolicy,
25 pub webhook_policy: WebhookPolicy,
26 pub timers: Vec<TimerBinding>,
27 pub oauth: Option<OAuthConfig>,
28 pub mocks: Option<MocksConfig>,
29 pub pack_bindings: Vec<PackBinding>,
30 pub env_passthrough: Vec<String>,
31 pub trace: TraceConfig,
32 pub validation: ValidationConfig,
33 pub operator_policy: OperatorPolicy,
34}
35
36#[derive(Debug, Clone, Deserialize)]
37pub struct BindingsFile {
38 pub tenant: String,
39 #[serde(default)]
40 pub flow_type_bindings: HashMap<String, FlowBinding>,
41 #[serde(default)]
42 pub rate_limits: RateLimits,
43 #[serde(default)]
44 pub retry: FlowRetryConfig,
45 #[serde(default)]
46 pub timers: Vec<TimerBinding>,
47 #[serde(default)]
48 pub oauth: Option<OAuthConfig>,
49 #[serde(default)]
50 pub mocks: Option<MocksConfig>,
51 #[serde(default)]
52 pub state_store: StateStorePolicy,
53 #[serde(default)]
54 pub operator: OperatorPolicyConfig,
55}
56
57#[derive(Debug, Clone, Deserialize)]
58pub struct FlowBinding {
59 pub adapter: String,
60 #[serde(default)]
61 pub config: serde_yaml::Value,
62 #[serde(default)]
63 pub secrets: Vec<String>,
64}
65
66#[derive(Debug, Clone, Deserialize)]
67pub struct RateLimits {
68 #[serde(default = "default_messaging_qps")]
69 pub messaging_send_qps: u32,
70 #[serde(default = "default_messaging_burst")]
71 pub messaging_burst: u32,
72}
73
74#[derive(Debug, Clone)]
75pub struct SecretsPolicy {
76 allowed: HashSet<String>,
77 allow_all: bool,
78}
79
80#[derive(Debug, Clone, Deserialize, Default)]
81pub struct OperatorPolicyConfig {
82 #[serde(default)]
83 pub allowed_providers: Vec<String>,
84 #[serde(default)]
85 pub allowed_ops: HashMap<String, Vec<String>>,
86}
87
88#[derive(Debug, Clone)]
89pub struct OperatorPolicy {
90 allow_all: bool,
91 allowed_providers: HashSet<String>,
92 allowed_ops: HashMap<String, HashSet<String>>,
93}
94
95#[derive(Debug, Clone, Deserialize)]
96pub struct FlowRetryConfig {
97 #[serde(default = "default_retry_attempts")]
98 pub max_attempts: u32,
99 #[serde(default = "default_retry_base_delay_ms")]
100 pub base_delay_ms: u64,
101}
102
103#[derive(Debug, Clone, Default)]
104pub struct WebhookPolicy {
105 allow_paths: Vec<String>,
106 deny_paths: Vec<String>,
107}
108
109#[derive(Debug, Clone, Deserialize)]
110pub struct StateStorePolicy {
111 #[serde(default = "default_state_store_allow")]
112 pub allow: bool,
113}
114
115#[derive(Debug, Clone, Deserialize)]
116pub struct WebhookBindingConfig {
117 #[serde(default)]
118 pub allow_paths: Vec<String>,
119 #[serde(default)]
120 pub deny_paths: Vec<String>,
121}
122
123#[derive(Debug, Clone, Deserialize)]
124pub struct TimerBinding {
125 pub flow_id: String,
126 pub cron: String,
127 #[serde(default)]
128 pub schedule_id: Option<String>,
129}
130
131#[derive(Debug, Clone, Deserialize)]
132pub struct OAuthConfig {
133 pub http_base_url: String,
134 pub nats_url: String,
135 pub provider: String,
136 #[serde(default)]
137 pub env: Option<String>,
138 #[serde(default)]
139 pub team: Option<String>,
140}
141
142impl HostConfig {
143 pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
144 let path = path.as_ref();
145 let content = fs::read_to_string(path)
146 .with_context(|| format!("failed to read bindings file {path:?}"))?;
147 let bindings: BindingsFile = serde_yaml::from_str(&content)
148 .with_context(|| format!("failed to parse bindings file {path:?}"))?;
149
150 let secrets_policy = SecretsPolicy::from_bindings(&bindings);
151 let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
152 let webhook_policy = bindings
153 .flow_type_bindings
154 .get("webhook")
155 .and_then(|binding| {
156 serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
157 .map(WebhookPolicy::from)
158 .map_err(|err| {
159 tracing::warn!(error = %err, "failed to parse webhook binding config");
160 err
161 })
162 .ok()
163 })
164 .unwrap_or_default();
165
166 Ok(Self {
167 tenant: bindings.tenant.clone(),
168 bindings_path: path.to_path_buf(),
169 flow_type_bindings: bindings.flow_type_bindings.clone(),
170 rate_limits: bindings.rate_limits.clone(),
171 retry: bindings.retry.clone(),
172 http_enabled,
173 secrets_policy,
174 state_store_policy: bindings.state_store.clone(),
175 webhook_policy,
176 timers: bindings.timers.clone(),
177 oauth: bindings.oauth.clone(),
178 mocks: bindings.mocks.clone(),
179 pack_bindings: Vec::new(),
180 env_passthrough: Vec::new(),
181 trace: TraceConfig::from_env(),
182 validation: ValidationConfig::from_env(),
183 operator_policy: OperatorPolicy::from_config(bindings.operator.clone()),
184 })
185 }
186
187 pub fn from_gtbind(bindings: TenantBindings) -> Self {
188 Self {
189 tenant: bindings.tenant,
190 bindings_path: PathBuf::from("<gtbind>"),
191 flow_type_bindings: HashMap::new(),
192 rate_limits: RateLimits::default(),
193 retry: FlowRetryConfig::default(),
194 http_enabled: false,
195 secrets_policy: SecretsPolicy::allow_all(),
196 state_store_policy: StateStorePolicy::default(),
197 webhook_policy: WebhookPolicy::default(),
198 timers: Vec::new(),
199 oauth: None,
200 mocks: None,
201 pack_bindings: bindings.packs,
202 env_passthrough: bindings.env_passthrough,
203 trace: TraceConfig::from_env(),
204 validation: ValidationConfig::from_env(),
205 operator_policy: OperatorPolicy::allow_all(),
206 }
207 }
208
209 pub fn messaging_binding(&self) -> Option<&FlowBinding> {
210 self.flow_type_bindings.get("messaging")
211 }
212
213 pub fn retry_config(&self) -> FlowRetryConfig {
214 self.retry.clone()
215 }
216
217 pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
218 let oauth = self.oauth.as_ref()?;
219 let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
220 if !oauth.provider.is_empty() {
221 cfg.default_provider = Some(oauth.provider.clone());
222 }
223 if let Some(team) = &oauth.team
224 && !team.is_empty()
225 {
226 cfg.team = Some(team.clone());
227 }
228 Some(cfg)
229 }
230
231 pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
234 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
235 let env_id = greentic_types::EnvId::from_str(&env)
236 .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
237 let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
238 .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
239 greentic_types::TenantCtx::new(env_id, tenant_id)
240 }
241}
242
243impl SecretsPolicy {
244 fn from_bindings(bindings: &BindingsFile) -> Self {
245 let allowed = bindings
246 .flow_type_bindings
247 .values()
248 .flat_map(|binding| binding.secrets.iter().cloned())
249 .collect::<HashSet<_>>();
250 Self {
251 allowed,
252 allow_all: false,
253 }
254 }
255
256 pub fn is_allowed(&self, key: &str) -> bool {
257 self.allow_all || self.allowed.contains(key)
258 }
259
260 pub fn allow_all() -> Self {
261 Self {
262 allowed: HashSet::new(),
263 allow_all: true,
264 }
265 }
266}
267
268impl OperatorPolicy {
269 pub fn from_config(config: OperatorPolicyConfig) -> Self {
270 let allowed_providers = config.allowed_providers.into_iter().collect::<HashSet<_>>();
271 let allowed_ops = config
272 .allowed_ops
273 .into_iter()
274 .map(|(provider, ops)| (provider, ops.into_iter().collect::<HashSet<_>>()))
275 .collect::<HashMap<_, _>>();
276 let allow_all = allowed_providers.is_empty() && allowed_ops.is_empty();
277 Self {
278 allow_all,
279 allowed_providers,
280 allowed_ops,
281 }
282 }
283
284 pub fn allow_all() -> Self {
285 Self {
286 allow_all: true,
287 allowed_providers: HashSet::new(),
288 allowed_ops: HashMap::new(),
289 }
290 }
291
292 pub fn allows_provider(&self, provider_id: Option<&str>, provider_type: &str) -> bool {
293 if self.allow_all {
294 return true;
295 }
296 provider_id
297 .map(|id| self.allowed_providers.contains(id))
298 .unwrap_or(false)
299 || self.allowed_providers.contains(provider_type)
300 }
301
302 pub fn allows_op(&self, provider_id: Option<&str>, provider_type: &str, op_id: &str) -> bool {
303 if self.allow_all {
304 return true;
305 }
306 if let Some(ops) = provider_id.and_then(|id| self.allowed_ops.get(id)) {
307 return ops.contains(op_id);
308 }
309 if let Some(ops) = self.allowed_ops.get(provider_type) {
310 return ops.contains(op_id);
311 }
312 self.allows_provider(provider_id, provider_type)
313 }
314}
315
316impl Default for RateLimits {
317 fn default() -> Self {
318 Self {
319 messaging_send_qps: default_messaging_qps(),
320 messaging_burst: default_messaging_burst(),
321 }
322 }
323}
324
325impl Default for StateStorePolicy {
326 fn default() -> Self {
327 Self {
328 allow: default_state_store_allow(),
329 }
330 }
331}
332
333fn default_messaging_qps() -> u32 {
334 10
335}
336
337fn default_messaging_burst() -> u32 {
338 20
339}
340
341fn default_state_store_allow() -> bool {
342 true
343}
344
345impl From<WebhookBindingConfig> for WebhookPolicy {
346 fn from(value: WebhookBindingConfig) -> Self {
347 Self {
348 allow_paths: value.allow_paths,
349 deny_paths: value.deny_paths,
350 }
351 }
352}
353
354impl WebhookPolicy {
355 pub fn is_allowed(&self, path: &str) -> bool {
356 if self
357 .deny_paths
358 .iter()
359 .any(|prefix| path.starts_with(prefix))
360 {
361 return false;
362 }
363
364 if self.allow_paths.is_empty() {
365 return true;
366 }
367
368 self.allow_paths
369 .iter()
370 .any(|prefix| path.starts_with(prefix))
371 }
372}
373
374impl TimerBinding {
375 pub fn schedule_id(&self) -> &str {
376 self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
377 }
378}
379
380impl Default for FlowRetryConfig {
381 fn default() -> Self {
382 Self {
383 max_attempts: default_retry_attempts(),
384 base_delay_ms: default_retry_base_delay_ms(),
385 }
386 }
387}
388
389#[cfg(test)]
390mod operator_policy_tests {
391 use super::{OperatorPolicy, OperatorPolicyConfig};
392 use std::collections::HashMap;
393
394 #[test]
395 fn policy_allows_configured_provider_op() {
396 let mut allowed_ops = HashMap::new();
397 allowed_ops.insert("provider.allowed".into(), vec!["op1".into(), "op2".into()]);
398 let config = OperatorPolicyConfig {
399 allowed_providers: vec!["provider.allowed".into()],
400 allowed_ops,
401 };
402 let policy = OperatorPolicy::from_config(config);
403 assert!(policy.allows_provider(Some("provider.allowed"), "provider.allowed"));
404 assert!(policy.allows_op(Some("provider.allowed"), "provider.allowed", "op1"));
405 assert!(!policy.allows_op(Some("provider.allowed"), "provider.allowed", "other"));
406 assert!(!policy.allows_provider(Some("provider.denied"), "provider.denied"));
407 }
408
409 #[test]
410 fn policy_allow_all_defaults_true() {
411 let policy = OperatorPolicy::allow_all();
412 assert!(policy.allows_provider(None, "any"));
413 assert!(policy.allows_op(None, "any", "op"));
414 }
415}
416
417fn default_retry_attempts() -> u32 {
418 3
419}
420
421fn default_retry_base_delay_ms() -> u64 {
422 250
423}
424
425#[cfg(test)]
426#[allow(clippy::items_after_test_module)]
427mod tests {
428 use super::*;
429 use std::collections::HashMap;
430 use std::path::PathBuf;
431
432 fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
433 HostConfig {
434 tenant: "tenant-a".to_string(),
435 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
436 flow_type_bindings: HashMap::new(),
437 rate_limits: RateLimits::default(),
438 retry: FlowRetryConfig::default(),
439 http_enabled: false,
440 secrets_policy: SecretsPolicy::allow_all(),
441 state_store_policy: StateStorePolicy::default(),
442 webhook_policy: WebhookPolicy::default(),
443 timers: Vec::new(),
444 oauth,
445 mocks: None,
446 pack_bindings: Vec::new(),
447 env_passthrough: Vec::new(),
448 trace: TraceConfig::from_env(),
449 validation: ValidationConfig::from_env(),
450 operator_policy: OperatorPolicy::allow_all(),
451 }
452 }
453
454 #[test]
455 fn oauth_broker_config_absent_without_block() {
456 let cfg = host_config_with_oauth(None);
457 assert!(cfg.oauth_broker_config().is_none());
458 }
459
460 #[test]
461 fn oauth_broker_config_maps_fields() {
462 let cfg = host_config_with_oauth(Some(OAuthConfig {
463 http_base_url: "https://oauth.example/".into(),
464 nats_url: "nats://broker:4222".into(),
465 provider: "demo".into(),
466 env: None,
467 team: Some("ops".into()),
468 }));
469 let broker = cfg.oauth_broker_config().expect("missing broker config");
470 assert_eq!(broker.http_base_url, "https://oauth.example/");
471 assert_eq!(broker.nats_url, "nats://broker:4222");
472 assert_eq!(broker.default_provider.as_deref(), Some("demo"));
473 assert_eq!(broker.team.as_deref(), Some("ops"));
474 }
475}