1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use crate::trace::TraceConfig;
6use crate::validate::ValidationConfig;
7use anyhow::{Context, Result};
8use parking_lot::RwLock;
9use serde::Deserialize;
10use serde_json::Value;
11use serde_yaml_bw as serde_yaml;
12use std::collections::{HashMap, HashSet};
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::str::FromStr;
16use std::sync::Arc;
17
18#[derive(Debug, Clone)]
19pub struct HostConfig {
20 pub tenant: String,
21 pub bindings_path: PathBuf,
22 pub flow_type_bindings: HashMap<String, FlowBinding>,
23 pub rate_limits: RateLimits,
24 pub retry: FlowRetryConfig,
25 pub http_enabled: bool,
26 pub secrets_policy: SecretsPolicy,
27 pub state_store_policy: StateStorePolicy,
28 pub webhook_policy: WebhookPolicy,
29 pub timers: Vec<TimerBinding>,
30 pub oauth: Option<OAuthConfig>,
31 pub mocks: Option<MocksConfig>,
32 pub pack_bindings: Vec<PackBinding>,
33 pub env_passthrough: Vec<String>,
34 pub trace: TraceConfig,
35 pub validation: ValidationConfig,
36 pub operator_policy: OperatorPolicy,
37 pub fast2flow: Fast2FlowRoutingConfig,
38}
39
40#[derive(Debug, Clone, Deserialize)]
41pub struct BindingsFile {
42 pub tenant: String,
43 #[serde(default)]
44 pub flow_type_bindings: HashMap<String, FlowBinding>,
45 #[serde(default)]
46 pub rate_limits: RateLimits,
47 #[serde(default)]
48 pub retry: FlowRetryConfig,
49 #[serde(default)]
50 pub timers: Vec<TimerBinding>,
51 #[serde(default)]
52 pub oauth: Option<OAuthConfig>,
53 #[serde(default)]
54 pub mocks: Option<MocksConfig>,
55 #[serde(default)]
56 pub state_store: StateStorePolicy,
57 #[serde(default)]
58 pub operator: OperatorPolicyConfig,
59 #[serde(default)]
60 pub fast2flow: Fast2FlowRoutingConfig,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64pub struct FlowBinding {
65 pub adapter: String,
66 #[serde(default)]
67 pub config: serde_yaml::Value,
68 #[serde(default)]
69 pub secrets: Vec<String>,
70}
71
72#[derive(Debug, Clone, Deserialize)]
73pub struct RateLimits {
74 #[serde(default = "default_messaging_qps")]
75 pub messaging_send_qps: u32,
76 #[serde(default = "default_messaging_burst")]
77 pub messaging_burst: u32,
78}
79
80#[derive(Debug, Clone)]
81pub struct SecretsPolicy {
82 binding_allowed: HashSet<String>,
84 flow_discovered: Arc<RwLock<HashSet<String>>>,
89 allow_all: bool,
90}
91
92#[derive(Debug, Clone, Deserialize, Default)]
93pub struct OperatorPolicyConfig {
94 #[serde(default)]
95 pub allowed_providers: Vec<String>,
96 #[serde(default)]
97 pub allowed_ops: HashMap<String, Vec<String>>,
98}
99
100#[derive(Debug, Clone)]
101pub struct OperatorPolicy {
102 allow_all: bool,
103 allowed_providers: HashSet<String>,
104 allowed_ops: HashMap<String, HashSet<String>>,
105}
106
107#[derive(Debug, Clone, Deserialize)]
108pub struct Fast2FlowRoutingConfig {
109 #[serde(default)]
110 pub enabled: bool,
111 #[serde(default = "default_fast2flow_component_ref")]
112 pub component_ref: String,
113 #[serde(default = "default_fast2flow_operation")]
114 pub operation: String,
115 #[serde(default)]
116 pub scope: Option<String>,
117 #[serde(default)]
118 pub registry_path: String,
119 #[serde(default)]
120 pub indexes_path: String,
121 #[serde(default = "default_fast2flow_time_budget_ms")]
122 pub time_budget_ms: u64,
123}
124
125impl Default for Fast2FlowRoutingConfig {
126 fn default() -> Self {
127 Self {
128 enabled: false,
129 component_ref: default_fast2flow_component_ref(),
130 operation: default_fast2flow_operation(),
131 scope: None,
132 registry_path: String::new(),
133 indexes_path: String::new(),
134 time_budget_ms: default_fast2flow_time_budget_ms(),
135 }
136 }
137}
138
139fn default_fast2flow_component_ref() -> String {
140 "fast2flow-routing".to_owned()
141}
142
143fn default_fast2flow_operation() -> String {
144 "route".to_owned()
145}
146
147fn default_fast2flow_time_budget_ms() -> u64 {
148 250
149}
150
151#[derive(Debug, Clone, Deserialize)]
152pub struct FlowRetryConfig {
153 #[serde(default = "default_retry_attempts")]
154 pub max_attempts: u32,
155 #[serde(default = "default_retry_base_delay_ms")]
156 pub base_delay_ms: u64,
157}
158
159#[derive(Debug, Clone, Default)]
160pub struct WebhookPolicy {
161 allow_paths: Vec<String>,
162 deny_paths: Vec<String>,
163}
164
165#[derive(Debug, Clone, Deserialize)]
166pub struct StateStorePolicy {
167 #[serde(default = "default_state_store_allow")]
168 pub allow: bool,
169}
170
171#[derive(Debug, Clone, Deserialize)]
172pub struct WebhookBindingConfig {
173 #[serde(default)]
174 pub allow_paths: Vec<String>,
175 #[serde(default)]
176 pub deny_paths: Vec<String>,
177}
178
179#[derive(Debug, Clone, Deserialize)]
180pub struct TimerBinding {
181 pub flow_id: String,
182 pub cron: String,
183 #[serde(default)]
184 pub schedule_id: Option<String>,
185}
186
187#[derive(Debug, Clone, Deserialize)]
188pub struct OAuthConfig {
189 pub http_base_url: String,
190 pub nats_url: String,
191 pub provider: String,
192 #[serde(default)]
193 pub env: Option<String>,
194 #[serde(default)]
195 pub team: Option<String>,
196}
197
198impl HostConfig {
199 pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
200 let path = path.as_ref();
201 let content = fs::read_to_string(path)
202 .with_context(|| format!("failed to read bindings file {path:?}"))?;
203 let bindings: BindingsFile = serde_yaml::from_str(&content)
204 .with_context(|| format!("failed to parse bindings file {path:?}"))?;
205
206 let secrets_policy = SecretsPolicy::from_bindings(&bindings);
207 let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
208 let webhook_policy = bindings
209 .flow_type_bindings
210 .get("webhook")
211 .and_then(|binding| {
212 serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
213 .map(WebhookPolicy::from)
214 .map_err(|err| {
215 tracing::warn!(error = %err, "failed to parse webhook binding config");
216 err
217 })
218 .ok()
219 })
220 .unwrap_or_default();
221
222 Ok(Self {
223 tenant: bindings.tenant.clone(),
224 bindings_path: path.to_path_buf(),
225 flow_type_bindings: bindings.flow_type_bindings.clone(),
226 rate_limits: bindings.rate_limits.clone(),
227 retry: bindings.retry.clone(),
228 http_enabled,
229 secrets_policy,
230 state_store_policy: bindings.state_store.clone(),
231 webhook_policy,
232 timers: bindings.timers.clone(),
233 oauth: bindings.oauth.clone(),
234 mocks: bindings.mocks.clone(),
235 pack_bindings: Vec::new(),
236 env_passthrough: Vec::new(),
237 trace: TraceConfig::from_env(),
238 validation: ValidationConfig::from_env(),
239 operator_policy: OperatorPolicy::from_config(bindings.operator.clone()),
240 fast2flow: bindings.fast2flow.clone(),
241 })
242 }
243
244 pub fn from_gtbind(bindings: TenantBindings) -> Self {
245 Self {
246 tenant: bindings.tenant,
247 bindings_path: PathBuf::from("<gtbind>"),
248 flow_type_bindings: HashMap::new(),
249 rate_limits: RateLimits::default(),
250 retry: FlowRetryConfig::default(),
251 http_enabled: true,
256 secrets_policy: SecretsPolicy::allow_all(),
257 state_store_policy: StateStorePolicy::default(),
258 webhook_policy: WebhookPolicy::default(),
259 timers: Vec::new(),
260 oauth: None,
261 mocks: None,
262 pack_bindings: bindings.packs,
263 env_passthrough: bindings.env_passthrough,
264 trace: TraceConfig::from_env(),
265 validation: ValidationConfig::from_env(),
266 operator_policy: OperatorPolicy::allow_all(),
267 fast2flow: Fast2FlowRoutingConfig::default(),
268 }
269 }
270
271 pub fn messaging_binding(&self) -> Option<&FlowBinding> {
272 self.flow_type_bindings.get("messaging")
273 }
274
275 pub fn retry_config(&self) -> FlowRetryConfig {
276 self.retry.clone()
277 }
278
279 pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
280 let oauth = self.oauth.as_ref()?;
281 let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
282 if !oauth.provider.is_empty() {
283 cfg.default_provider = Some(oauth.provider.clone());
284 }
285 if let Some(team) = &oauth.team
286 && !team.is_empty()
287 {
288 cfg.team = Some(team.clone());
289 }
290 Some(cfg)
291 }
292
293 pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
296 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
297 let env_id = greentic_types::EnvId::from_str(&env)
298 .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
299 let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
300 .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
301 greentic_types::TenantCtx::new(env_id, tenant_id)
302 }
303}
304
305impl SecretsPolicy {
306 fn from_bindings(bindings: &BindingsFile) -> Self {
307 let binding_allowed = bindings
308 .flow_type_bindings
309 .values()
310 .flat_map(|binding| binding.secrets.iter().cloned())
311 .collect::<HashSet<_>>();
312 Self {
313 binding_allowed,
314 flow_discovered: Arc::new(RwLock::new(HashSet::new())),
315 allow_all: false,
316 }
317 }
318
319 pub fn is_allowed(&self, key: &str) -> bool {
320 if self.allow_all || self.binding_allowed.contains(key) {
321 return true;
322 }
323 self.flow_discovered.read().contains(key)
324 }
325
326 pub fn allow_all() -> Self {
327 Self {
328 binding_allowed: HashSet::new(),
329 flow_discovered: Arc::new(RwLock::new(HashSet::new())),
330 allow_all: true,
331 }
332 }
333
334 pub fn register_flow_secret(&self, name: &str) {
343 if name.is_empty() {
344 return;
345 }
346 self.flow_discovered.write().insert(name.to_string());
347 }
348
349 pub fn register_flow_secret_refs(&self, value: &Value) {
353 match value {
354 Value::Object(map) => {
355 for (key, val) in map {
356 if key.ends_with("_secret")
357 && let Value::String(name) = val
358 {
359 self.register_flow_secret(name);
360 } else {
361 self.register_flow_secret_refs(val);
362 }
363 }
364 }
365 Value::Array(items) => {
366 for item in items {
367 self.register_flow_secret_refs(item);
368 }
369 }
370 _ => {}
371 }
372 }
373}
374
375impl OperatorPolicy {
376 pub fn from_config(config: OperatorPolicyConfig) -> Self {
377 let allowed_providers = config.allowed_providers.into_iter().collect::<HashSet<_>>();
378 let allowed_ops = config
379 .allowed_ops
380 .into_iter()
381 .map(|(provider, ops)| (provider, ops.into_iter().collect::<HashSet<_>>()))
382 .collect::<HashMap<_, _>>();
383 let allow_all = allowed_providers.is_empty() && allowed_ops.is_empty();
384 Self {
385 allow_all,
386 allowed_providers,
387 allowed_ops,
388 }
389 }
390
391 pub fn allow_all() -> Self {
392 Self {
393 allow_all: true,
394 allowed_providers: HashSet::new(),
395 allowed_ops: HashMap::new(),
396 }
397 }
398
399 pub fn allows_provider(&self, provider_id: Option<&str>, provider_type: &str) -> bool {
400 if self.allow_all {
401 return true;
402 }
403 provider_id
404 .map(|id| self.allowed_providers.contains(id))
405 .unwrap_or(false)
406 || self.allowed_providers.contains(provider_type)
407 }
408
409 pub fn allows_op(&self, provider_id: Option<&str>, provider_type: &str, op_id: &str) -> bool {
410 if self.allow_all {
411 return true;
412 }
413 if let Some(ops) = provider_id.and_then(|id| self.allowed_ops.get(id)) {
414 return ops.contains(op_id);
415 }
416 if let Some(ops) = self.allowed_ops.get(provider_type) {
417 return ops.contains(op_id);
418 }
419 self.allows_provider(provider_id, provider_type)
420 }
421}
422
423impl Default for RateLimits {
424 fn default() -> Self {
425 Self {
426 messaging_send_qps: default_messaging_qps(),
427 messaging_burst: default_messaging_burst(),
428 }
429 }
430}
431
432impl Default for StateStorePolicy {
433 fn default() -> Self {
434 Self {
435 allow: default_state_store_allow(),
436 }
437 }
438}
439
440fn default_messaging_qps() -> u32 {
441 10
442}
443
444fn default_messaging_burst() -> u32 {
445 20
446}
447
448fn default_state_store_allow() -> bool {
449 true
450}
451
452impl From<WebhookBindingConfig> for WebhookPolicy {
453 fn from(value: WebhookBindingConfig) -> Self {
454 Self {
455 allow_paths: value.allow_paths,
456 deny_paths: value.deny_paths,
457 }
458 }
459}
460
461impl WebhookPolicy {
462 pub fn is_allowed(&self, path: &str) -> bool {
463 if self
464 .deny_paths
465 .iter()
466 .any(|prefix| path.starts_with(prefix))
467 {
468 return false;
469 }
470
471 if self.allow_paths.is_empty() {
472 return true;
473 }
474
475 self.allow_paths
476 .iter()
477 .any(|prefix| path.starts_with(prefix))
478 }
479}
480
481impl TimerBinding {
482 pub fn schedule_id(&self) -> &str {
483 self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
484 }
485}
486
487impl Default for FlowRetryConfig {
488 fn default() -> Self {
489 Self {
490 max_attempts: default_retry_attempts(),
491 base_delay_ms: default_retry_base_delay_ms(),
492 }
493 }
494}
495
496#[cfg(test)]
497mod operator_policy_tests {
498 use super::{OperatorPolicy, OperatorPolicyConfig};
499 use std::collections::HashMap;
500
501 #[test]
502 fn policy_allows_configured_provider_op() {
503 let mut allowed_ops = HashMap::new();
504 allowed_ops.insert("provider.allowed".into(), vec!["op1".into(), "op2".into()]);
505 let config = OperatorPolicyConfig {
506 allowed_providers: vec!["provider.allowed".into()],
507 allowed_ops,
508 };
509 let policy = OperatorPolicy::from_config(config);
510 assert!(policy.allows_provider(Some("provider.allowed"), "provider.allowed"));
511 assert!(policy.allows_op(Some("provider.allowed"), "provider.allowed", "op1"));
512 assert!(!policy.allows_op(Some("provider.allowed"), "provider.allowed", "other"));
513 assert!(!policy.allows_provider(Some("provider.denied"), "provider.denied"));
514 }
515
516 #[test]
517 fn policy_allow_all_defaults_true() {
518 let policy = OperatorPolicy::allow_all();
519 assert!(policy.allows_provider(None, "any"));
520 assert!(policy.allows_op(None, "any", "op"));
521 }
522}
523
524fn default_retry_attempts() -> u32 {
525 3
526}
527
528fn default_retry_base_delay_ms() -> u64 {
529 250
530}
531
532#[cfg(test)]
533#[allow(clippy::items_after_test_module)]
534mod tests {
535 use super::*;
536 use crate::gtbind::{PackBinding, TenantBindings};
537 use std::collections::HashMap;
538 use std::path::PathBuf;
539
540 fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
541 HostConfig {
542 tenant: "tenant-a".to_string(),
543 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
544 flow_type_bindings: HashMap::new(),
545 rate_limits: RateLimits::default(),
546 retry: FlowRetryConfig::default(),
547 http_enabled: false,
548 secrets_policy: SecretsPolicy::allow_all(),
549 state_store_policy: StateStorePolicy::default(),
550 webhook_policy: WebhookPolicy::default(),
551 timers: Vec::new(),
552 oauth,
553 mocks: None,
554 pack_bindings: Vec::new(),
555 env_passthrough: Vec::new(),
556 trace: TraceConfig::from_env(),
557 validation: ValidationConfig::from_env(),
558 operator_policy: OperatorPolicy::allow_all(),
559 fast2flow: Fast2FlowRoutingConfig::default(),
560 }
561 }
562
563 #[test]
564 fn host_config_loads_fast2flow_routing_block() {
565 let temp = tempfile::TempDir::new().expect("tempdir");
566 let path = temp.path().join("bindings.yaml");
567 std::fs::write(
568 &path,
569 r#"
570tenant: demo
571fast2flow:
572 enabled: true
573 component_ref: router.fast2flow
574 operation: handle-hook
575 scope: tenant-a
576 registry_path: /mnt/registry
577 indexes_path: /mnt/indexes
578 time_budget_ms: 750
579"#,
580 )
581 .expect("write bindings");
582
583 let cfg = HostConfig::load_from_path(&path).expect("load config");
584
585 assert!(cfg.fast2flow.enabled);
586 assert_eq!(cfg.fast2flow.component_ref, "router.fast2flow");
587 assert_eq!(cfg.fast2flow.operation, "handle-hook");
588 assert_eq!(cfg.fast2flow.scope.as_deref(), Some("tenant-a"));
589 assert_eq!(cfg.fast2flow.registry_path, "/mnt/registry");
590 assert_eq!(cfg.fast2flow.indexes_path, "/mnt/indexes");
591 assert_eq!(cfg.fast2flow.time_budget_ms, 750);
592 }
593
594 #[test]
595 fn secrets_policy_register_flow_secret_refs_walks_nested_objects() {
596 let policy = SecretsPolicy {
597 binding_allowed: HashSet::new(),
598 flow_discovered: Arc::new(RwLock::new(HashSet::new())),
599 allow_all: false,
600 };
601
602 assert!(!policy.is_allowed("llm-api-key"));
604
605 let node_config = serde_json::json!({
608 "api_key_secret": "llm-api-key",
609 "provider": "openai",
610 "fallback": {
611 "secondary_api_key_secret": "openrouter-key",
612 "model": "gpt-4o",
613 },
614 "list": [
615 { "tertiary_secret": "another-key" },
616 { "non_secret_field": "ignored" },
617 ],
618 "ignored_field": ""
619 });
620
621 policy.register_flow_secret_refs(&node_config);
622
623 assert!(policy.is_allowed("llm-api-key"));
624 assert!(policy.is_allowed("openrouter-key"));
625 assert!(policy.is_allowed("another-key"));
626 assert!(!policy.is_allowed("non_secret_field"));
627 assert!(!policy.is_allowed("ignored"));
628 }
629
630 #[test]
631 fn secrets_policy_ignores_empty_or_non_string_secret_values() {
632 let policy = SecretsPolicy {
633 binding_allowed: HashSet::new(),
634 flow_discovered: Arc::new(RwLock::new(HashSet::new())),
635 allow_all: false,
636 };
637
638 let node_config = serde_json::json!({
639 "api_key_secret": "",
640 "fallback_secret": null,
641 "numeric_secret": 42,
642 "real_secret": "good-key",
643 });
644
645 policy.register_flow_secret_refs(&node_config);
646
647 assert!(policy.is_allowed("good-key"));
648 assert!(!policy.is_allowed(""));
649 }
650
651 #[test]
652 fn oauth_broker_config_absent_without_block() {
653 let cfg = host_config_with_oauth(None);
654 assert!(cfg.oauth_broker_config().is_none());
655 }
656
657 #[test]
658 fn oauth_broker_config_maps_fields() {
659 let cfg = host_config_with_oauth(Some(OAuthConfig {
660 http_base_url: "https://oauth.example/".into(),
661 nats_url: "nats://broker:4222".into(),
662 provider: "demo".into(),
663 env: None,
664 team: Some("ops".into()),
665 }));
666 let broker = cfg.oauth_broker_config().expect("missing broker config");
667 assert_eq!(broker.http_base_url, "https://oauth.example/");
668 assert_eq!(broker.nats_url, "nats://broker:4222");
669 assert_eq!(broker.default_provider.as_deref(), Some("demo"));
670 assert_eq!(broker.team.as_deref(), Some("ops"));
671 }
672
673 #[test]
674 fn gtbind_configs_enable_outbound_http() {
675 let cfg = HostConfig::from_gtbind(TenantBindings {
676 tenant: "demo".into(),
677 packs: vec![PackBinding {
678 pack_id: "deep-research-demo".into(),
679 pack_ref: "deep-research-demo@0.1.0".into(),
680 pack_locator: None,
681 flows: vec!["main".into()],
682 }],
683 env_passthrough: Vec::new(),
684 });
685
686 assert!(
687 cfg.http_enabled,
688 "gtbind-backed tenants should allow outbound component HTTP"
689 );
690 }
691}