Skip to main content

greentic_runner_host/
config.rs

1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use crate::trace::TraceConfig;
6use crate::validate::ValidationConfig;
7use anyhow::{Context, Result};
8use parking_lot::RwLock;
9use serde::Deserialize;
10use serde_json::Value;
11use serde_yaml_bw as serde_yaml;
12use std::collections::{HashMap, HashSet};
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::str::FromStr;
16use std::sync::Arc;
17
18#[derive(Debug, Clone)]
19pub struct HostConfig {
20    pub tenant: String,
21    pub bindings_path: PathBuf,
22    pub flow_type_bindings: HashMap<String, FlowBinding>,
23    pub rate_limits: RateLimits,
24    pub retry: FlowRetryConfig,
25    pub http_enabled: bool,
26    pub secrets_policy: SecretsPolicy,
27    pub state_store_policy: StateStorePolicy,
28    pub webhook_policy: WebhookPolicy,
29    pub timers: Vec<TimerBinding>,
30    pub oauth: Option<OAuthConfig>,
31    pub mocks: Option<MocksConfig>,
32    pub pack_bindings: Vec<PackBinding>,
33    pub env_passthrough: Vec<String>,
34    pub trace: TraceConfig,
35    pub validation: ValidationConfig,
36    pub operator_policy: OperatorPolicy,
37    pub fast2flow: Fast2FlowRoutingConfig,
38}
39
40#[derive(Debug, Clone, Deserialize)]
41pub struct BindingsFile {
42    pub tenant: String,
43    #[serde(default)]
44    pub flow_type_bindings: HashMap<String, FlowBinding>,
45    #[serde(default)]
46    pub rate_limits: RateLimits,
47    #[serde(default)]
48    pub retry: FlowRetryConfig,
49    #[serde(default)]
50    pub timers: Vec<TimerBinding>,
51    #[serde(default)]
52    pub oauth: Option<OAuthConfig>,
53    #[serde(default)]
54    pub mocks: Option<MocksConfig>,
55    #[serde(default)]
56    pub state_store: StateStorePolicy,
57    #[serde(default)]
58    pub operator: OperatorPolicyConfig,
59    #[serde(default)]
60    pub fast2flow: Fast2FlowRoutingConfig,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64pub struct FlowBinding {
65    pub adapter: String,
66    #[serde(default)]
67    pub config: serde_yaml::Value,
68    #[serde(default)]
69    pub secrets: Vec<String>,
70}
71
72#[derive(Debug, Clone, Deserialize)]
73pub struct RateLimits {
74    #[serde(default = "default_messaging_qps")]
75    pub messaging_send_qps: u32,
76    #[serde(default = "default_messaging_burst")]
77    pub messaging_burst: u32,
78}
79
80#[derive(Debug, Clone)]
81pub struct SecretsPolicy {
82    /// Secret names allowed by the bindings file.
83    binding_allowed: HashSet<String>,
84    /// Secret names discovered at flow-load time from node configs that
85    /// reference secrets via fields ending in `_secret` (e.g.
86    /// `api_key_secret: "llm-api-key"`). Shared across all clones so that
87    /// flow loading can register names after policy construction.
88    flow_discovered: Arc<RwLock<HashSet<String>>>,
89    allow_all: bool,
90}
91
92#[derive(Debug, Clone, Deserialize, Default)]
93pub struct OperatorPolicyConfig {
94    #[serde(default)]
95    pub allowed_providers: Vec<String>,
96    #[serde(default)]
97    pub allowed_ops: HashMap<String, Vec<String>>,
98}
99
100#[derive(Debug, Clone)]
101pub struct OperatorPolicy {
102    allow_all: bool,
103    allowed_providers: HashSet<String>,
104    allowed_ops: HashMap<String, HashSet<String>>,
105}
106
107#[derive(Debug, Clone, Deserialize)]
108pub struct Fast2FlowRoutingConfig {
109    #[serde(default)]
110    pub enabled: bool,
111    #[serde(default = "default_fast2flow_component_ref")]
112    pub component_ref: String,
113    #[serde(default = "default_fast2flow_operation")]
114    pub operation: String,
115    #[serde(default)]
116    pub scope: Option<String>,
117    #[serde(default)]
118    pub registry_path: String,
119    #[serde(default)]
120    pub indexes_path: String,
121    #[serde(default = "default_fast2flow_time_budget_ms")]
122    pub time_budget_ms: u64,
123}
124
125impl Default for Fast2FlowRoutingConfig {
126    fn default() -> Self {
127        Self {
128            enabled: false,
129            component_ref: default_fast2flow_component_ref(),
130            operation: default_fast2flow_operation(),
131            scope: None,
132            registry_path: String::new(),
133            indexes_path: String::new(),
134            time_budget_ms: default_fast2flow_time_budget_ms(),
135        }
136    }
137}
138
139fn default_fast2flow_component_ref() -> String {
140    "fast2flow-routing".to_owned()
141}
142
143fn default_fast2flow_operation() -> String {
144    "route".to_owned()
145}
146
147fn default_fast2flow_time_budget_ms() -> u64 {
148    250
149}
150
151#[derive(Debug, Clone, Deserialize)]
152pub struct FlowRetryConfig {
153    #[serde(default = "default_retry_attempts")]
154    pub max_attempts: u32,
155    #[serde(default = "default_retry_base_delay_ms")]
156    pub base_delay_ms: u64,
157}
158
159#[derive(Debug, Clone, Default)]
160pub struct WebhookPolicy {
161    allow_paths: Vec<String>,
162    deny_paths: Vec<String>,
163}
164
165#[derive(Debug, Clone, Deserialize)]
166pub struct StateStorePolicy {
167    #[serde(default = "default_state_store_allow")]
168    pub allow: bool,
169}
170
171#[derive(Debug, Clone, Deserialize)]
172pub struct WebhookBindingConfig {
173    #[serde(default)]
174    pub allow_paths: Vec<String>,
175    #[serde(default)]
176    pub deny_paths: Vec<String>,
177}
178
179#[derive(Debug, Clone, Deserialize)]
180pub struct TimerBinding {
181    pub flow_id: String,
182    pub cron: String,
183    #[serde(default)]
184    pub schedule_id: Option<String>,
185}
186
187#[derive(Debug, Clone, Deserialize)]
188pub struct OAuthConfig {
189    pub http_base_url: String,
190    pub nats_url: String,
191    pub provider: String,
192    #[serde(default)]
193    pub env: Option<String>,
194    #[serde(default)]
195    pub team: Option<String>,
196}
197
198impl HostConfig {
199    pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
200        let path = path.as_ref();
201        let content = fs::read_to_string(path)
202            .with_context(|| format!("failed to read bindings file {path:?}"))?;
203        let bindings: BindingsFile = serde_yaml::from_str(&content)
204            .with_context(|| format!("failed to parse bindings file {path:?}"))?;
205
206        let secrets_policy = SecretsPolicy::from_bindings(&bindings);
207        let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
208        let webhook_policy = bindings
209            .flow_type_bindings
210            .get("webhook")
211            .and_then(|binding| {
212                serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
213                    .map(WebhookPolicy::from)
214                    .map_err(|err| {
215                        tracing::warn!(error = %err, "failed to parse webhook binding config");
216                        err
217                    })
218                    .ok()
219            })
220            .unwrap_or_default();
221
222        Ok(Self {
223            tenant: bindings.tenant.clone(),
224            bindings_path: path.to_path_buf(),
225            flow_type_bindings: bindings.flow_type_bindings.clone(),
226            rate_limits: bindings.rate_limits.clone(),
227            retry: bindings.retry.clone(),
228            http_enabled,
229            secrets_policy,
230            state_store_policy: bindings.state_store.clone(),
231            webhook_policy,
232            timers: bindings.timers.clone(),
233            oauth: bindings.oauth.clone(),
234            mocks: bindings.mocks.clone(),
235            pack_bindings: Vec::new(),
236            env_passthrough: Vec::new(),
237            trace: TraceConfig::from_env(),
238            validation: ValidationConfig::from_env(),
239            operator_policy: OperatorPolicy::from_config(bindings.operator.clone()),
240            fast2flow: bindings.fast2flow.clone(),
241        })
242    }
243
244    pub fn from_gtbind(bindings: TenantBindings) -> Self {
245        Self {
246            tenant: bindings.tenant,
247            bindings_path: PathBuf::from("<gtbind>"),
248            flow_type_bindings: HashMap::new(),
249            rate_limits: RateLimits::default(),
250            retry: FlowRetryConfig::default(),
251            // GTBind-backed embedded hosts do not populate flow_type_bindings,
252            // so the YAML-path heuristic cannot be used here. Keep outbound
253            // host HTTP enabled so components like component-llm-openai can
254            // reach their configured providers.
255            http_enabled: true,
256            secrets_policy: SecretsPolicy::allow_all(),
257            state_store_policy: StateStorePolicy::default(),
258            webhook_policy: WebhookPolicy::default(),
259            timers: Vec::new(),
260            oauth: None,
261            mocks: None,
262            pack_bindings: bindings.packs,
263            env_passthrough: bindings.env_passthrough,
264            trace: TraceConfig::from_env(),
265            validation: ValidationConfig::from_env(),
266            operator_policy: OperatorPolicy::allow_all(),
267            fast2flow: Fast2FlowRoutingConfig::default(),
268        }
269    }
270
271    pub fn messaging_binding(&self) -> Option<&FlowBinding> {
272        self.flow_type_bindings.get("messaging")
273    }
274
275    pub fn retry_config(&self) -> FlowRetryConfig {
276        self.retry.clone()
277    }
278
279    pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
280        let oauth = self.oauth.as_ref()?;
281        let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
282        if !oauth.provider.is_empty() {
283            cfg.default_provider = Some(oauth.provider.clone());
284        }
285        if let Some(team) = &oauth.team
286            && !team.is_empty()
287        {
288            cfg.team = Some(team.clone());
289        }
290        Some(cfg)
291    }
292
293    /// Derive a tenant context for the current configuration. This is used when
294    /// evaluating scoped requirements (e.g. secrets).
295    pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
296        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
297        let env_id = greentic_types::EnvId::from_str(&env)
298            .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
299        let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
300            .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
301        greentic_types::TenantCtx::new(env_id, tenant_id)
302    }
303}
304
305impl SecretsPolicy {
306    fn from_bindings(bindings: &BindingsFile) -> Self {
307        let binding_allowed = bindings
308            .flow_type_bindings
309            .values()
310            .flat_map(|binding| binding.secrets.iter().cloned())
311            .collect::<HashSet<_>>();
312        Self {
313            binding_allowed,
314            flow_discovered: Arc::new(RwLock::new(HashSet::new())),
315            allow_all: false,
316        }
317    }
318
319    pub fn is_allowed(&self, key: &str) -> bool {
320        if self.allow_all || self.binding_allowed.contains(key) {
321            return true;
322        }
323        self.flow_discovered.read().contains(key)
324    }
325
326    pub fn allow_all() -> Self {
327        Self {
328            binding_allowed: HashSet::new(),
329            flow_discovered: Arc::new(RwLock::new(HashSet::new())),
330            allow_all: true,
331        }
332    }
333
334    /// Register a secret name discovered while loading a flow's node config.
335    ///
336    /// Components are gated by [`is_allowed`]; the bindings file is the
337    /// authoritative source, but flows that reference secrets directly via
338    /// node config fields (`api_key_secret: "llm-api-key"`) would otherwise
339    /// have their lookups denied even when the secret is provisioned. Calling
340    /// this from the pack flow loader closes that gap without changing the
341    /// component manifest contract.
342    pub fn register_flow_secret(&self, name: &str) {
343        if name.is_empty() {
344            return;
345        }
346        self.flow_discovered.write().insert(name.to_string());
347    }
348
349    /// Walk a JSON value (typically a flow node's config block) and register
350    /// any string-valued field whose key ends in `_secret`. Recurses through
351    /// nested objects and arrays.
352    pub fn register_flow_secret_refs(&self, value: &Value) {
353        match value {
354            Value::Object(map) => {
355                for (key, val) in map {
356                    if key.ends_with("_secret")
357                        && let Value::String(name) = val
358                    {
359                        self.register_flow_secret(name);
360                    } else {
361                        self.register_flow_secret_refs(val);
362                    }
363                }
364            }
365            Value::Array(items) => {
366                for item in items {
367                    self.register_flow_secret_refs(item);
368                }
369            }
370            _ => {}
371        }
372    }
373}
374
375impl OperatorPolicy {
376    pub fn from_config(config: OperatorPolicyConfig) -> Self {
377        let allowed_providers = config.allowed_providers.into_iter().collect::<HashSet<_>>();
378        let allowed_ops = config
379            .allowed_ops
380            .into_iter()
381            .map(|(provider, ops)| (provider, ops.into_iter().collect::<HashSet<_>>()))
382            .collect::<HashMap<_, _>>();
383        let allow_all = allowed_providers.is_empty() && allowed_ops.is_empty();
384        Self {
385            allow_all,
386            allowed_providers,
387            allowed_ops,
388        }
389    }
390
391    pub fn allow_all() -> Self {
392        Self {
393            allow_all: true,
394            allowed_providers: HashSet::new(),
395            allowed_ops: HashMap::new(),
396        }
397    }
398
399    pub fn allows_provider(&self, provider_id: Option<&str>, provider_type: &str) -> bool {
400        if self.allow_all {
401            return true;
402        }
403        provider_id
404            .map(|id| self.allowed_providers.contains(id))
405            .unwrap_or(false)
406            || self.allowed_providers.contains(provider_type)
407    }
408
409    pub fn allows_op(&self, provider_id: Option<&str>, provider_type: &str, op_id: &str) -> bool {
410        if self.allow_all {
411            return true;
412        }
413        if let Some(ops) = provider_id.and_then(|id| self.allowed_ops.get(id)) {
414            return ops.contains(op_id);
415        }
416        if let Some(ops) = self.allowed_ops.get(provider_type) {
417            return ops.contains(op_id);
418        }
419        self.allows_provider(provider_id, provider_type)
420    }
421}
422
423impl Default for RateLimits {
424    fn default() -> Self {
425        Self {
426            messaging_send_qps: default_messaging_qps(),
427            messaging_burst: default_messaging_burst(),
428        }
429    }
430}
431
432impl Default for StateStorePolicy {
433    fn default() -> Self {
434        Self {
435            allow: default_state_store_allow(),
436        }
437    }
438}
439
440fn default_messaging_qps() -> u32 {
441    10
442}
443
444fn default_messaging_burst() -> u32 {
445    20
446}
447
448fn default_state_store_allow() -> bool {
449    true
450}
451
452impl From<WebhookBindingConfig> for WebhookPolicy {
453    fn from(value: WebhookBindingConfig) -> Self {
454        Self {
455            allow_paths: value.allow_paths,
456            deny_paths: value.deny_paths,
457        }
458    }
459}
460
461impl WebhookPolicy {
462    pub fn is_allowed(&self, path: &str) -> bool {
463        if self
464            .deny_paths
465            .iter()
466            .any(|prefix| path.starts_with(prefix))
467        {
468            return false;
469        }
470
471        if self.allow_paths.is_empty() {
472            return true;
473        }
474
475        self.allow_paths
476            .iter()
477            .any(|prefix| path.starts_with(prefix))
478    }
479}
480
481impl TimerBinding {
482    pub fn schedule_id(&self) -> &str {
483        self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
484    }
485}
486
487impl Default for FlowRetryConfig {
488    fn default() -> Self {
489        Self {
490            max_attempts: default_retry_attempts(),
491            base_delay_ms: default_retry_base_delay_ms(),
492        }
493    }
494}
495
496#[cfg(test)]
497mod operator_policy_tests {
498    use super::{OperatorPolicy, OperatorPolicyConfig};
499    use std::collections::HashMap;
500
501    #[test]
502    fn policy_allows_configured_provider_op() {
503        let mut allowed_ops = HashMap::new();
504        allowed_ops.insert("provider.allowed".into(), vec!["op1".into(), "op2".into()]);
505        let config = OperatorPolicyConfig {
506            allowed_providers: vec!["provider.allowed".into()],
507            allowed_ops,
508        };
509        let policy = OperatorPolicy::from_config(config);
510        assert!(policy.allows_provider(Some("provider.allowed"), "provider.allowed"));
511        assert!(policy.allows_op(Some("provider.allowed"), "provider.allowed", "op1"));
512        assert!(!policy.allows_op(Some("provider.allowed"), "provider.allowed", "other"));
513        assert!(!policy.allows_provider(Some("provider.denied"), "provider.denied"));
514    }
515
516    #[test]
517    fn policy_allow_all_defaults_true() {
518        let policy = OperatorPolicy::allow_all();
519        assert!(policy.allows_provider(None, "any"));
520        assert!(policy.allows_op(None, "any", "op"));
521    }
522}
523
524fn default_retry_attempts() -> u32 {
525    3
526}
527
528fn default_retry_base_delay_ms() -> u64 {
529    250
530}
531
532#[cfg(test)]
533#[allow(clippy::items_after_test_module)]
534mod tests {
535    use super::*;
536    use crate::gtbind::{PackBinding, TenantBindings};
537    use std::collections::HashMap;
538    use std::path::PathBuf;
539
540    fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
541        HostConfig {
542            tenant: "tenant-a".to_string(),
543            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
544            flow_type_bindings: HashMap::new(),
545            rate_limits: RateLimits::default(),
546            retry: FlowRetryConfig::default(),
547            http_enabled: false,
548            secrets_policy: SecretsPolicy::allow_all(),
549            state_store_policy: StateStorePolicy::default(),
550            webhook_policy: WebhookPolicy::default(),
551            timers: Vec::new(),
552            oauth,
553            mocks: None,
554            pack_bindings: Vec::new(),
555            env_passthrough: Vec::new(),
556            trace: TraceConfig::from_env(),
557            validation: ValidationConfig::from_env(),
558            operator_policy: OperatorPolicy::allow_all(),
559            fast2flow: Fast2FlowRoutingConfig::default(),
560        }
561    }
562
563    #[test]
564    fn host_config_loads_fast2flow_routing_block() {
565        let temp = tempfile::TempDir::new().expect("tempdir");
566        let path = temp.path().join("bindings.yaml");
567        std::fs::write(
568            &path,
569            r#"
570tenant: demo
571fast2flow:
572  enabled: true
573  component_ref: router.fast2flow
574  operation: handle-hook
575  scope: tenant-a
576  registry_path: /mnt/registry
577  indexes_path: /mnt/indexes
578  time_budget_ms: 750
579"#,
580        )
581        .expect("write bindings");
582
583        let cfg = HostConfig::load_from_path(&path).expect("load config");
584
585        assert!(cfg.fast2flow.enabled);
586        assert_eq!(cfg.fast2flow.component_ref, "router.fast2flow");
587        assert_eq!(cfg.fast2flow.operation, "handle-hook");
588        assert_eq!(cfg.fast2flow.scope.as_deref(), Some("tenant-a"));
589        assert_eq!(cfg.fast2flow.registry_path, "/mnt/registry");
590        assert_eq!(cfg.fast2flow.indexes_path, "/mnt/indexes");
591        assert_eq!(cfg.fast2flow.time_budget_ms, 750);
592    }
593
594    #[test]
595    fn secrets_policy_register_flow_secret_refs_walks_nested_objects() {
596        let policy = SecretsPolicy {
597            binding_allowed: HashSet::new(),
598            flow_discovered: Arc::new(RwLock::new(HashSet::new())),
599            allow_all: false,
600        };
601
602        // Bare bindings deny anything by default.
603        assert!(!policy.is_allowed("llm-api-key"));
604
605        // Simulate a flow node config block carrying both a top-level
606        // *_secret reference and a nested one inside an arbitrary subtree.
607        let node_config = serde_json::json!({
608            "api_key_secret": "llm-api-key",
609            "provider": "openai",
610            "fallback": {
611                "secondary_api_key_secret": "openrouter-key",
612                "model": "gpt-4o",
613            },
614            "list": [
615                { "tertiary_secret": "another-key" },
616                { "non_secret_field": "ignored" },
617            ],
618            "ignored_field": ""
619        });
620
621        policy.register_flow_secret_refs(&node_config);
622
623        assert!(policy.is_allowed("llm-api-key"));
624        assert!(policy.is_allowed("openrouter-key"));
625        assert!(policy.is_allowed("another-key"));
626        assert!(!policy.is_allowed("non_secret_field"));
627        assert!(!policy.is_allowed("ignored"));
628    }
629
630    #[test]
631    fn secrets_policy_ignores_empty_or_non_string_secret_values() {
632        let policy = SecretsPolicy {
633            binding_allowed: HashSet::new(),
634            flow_discovered: Arc::new(RwLock::new(HashSet::new())),
635            allow_all: false,
636        };
637
638        let node_config = serde_json::json!({
639            "api_key_secret": "",
640            "fallback_secret": null,
641            "numeric_secret": 42,
642            "real_secret": "good-key",
643        });
644
645        policy.register_flow_secret_refs(&node_config);
646
647        assert!(policy.is_allowed("good-key"));
648        assert!(!policy.is_allowed(""));
649    }
650
651    #[test]
652    fn oauth_broker_config_absent_without_block() {
653        let cfg = host_config_with_oauth(None);
654        assert!(cfg.oauth_broker_config().is_none());
655    }
656
657    #[test]
658    fn oauth_broker_config_maps_fields() {
659        let cfg = host_config_with_oauth(Some(OAuthConfig {
660            http_base_url: "https://oauth.example/".into(),
661            nats_url: "nats://broker:4222".into(),
662            provider: "demo".into(),
663            env: None,
664            team: Some("ops".into()),
665        }));
666        let broker = cfg.oauth_broker_config().expect("missing broker config");
667        assert_eq!(broker.http_base_url, "https://oauth.example/");
668        assert_eq!(broker.nats_url, "nats://broker:4222");
669        assert_eq!(broker.default_provider.as_deref(), Some("demo"));
670        assert_eq!(broker.team.as_deref(), Some("ops"));
671    }
672
673    #[test]
674    fn gtbind_configs_enable_outbound_http() {
675        let cfg = HostConfig::from_gtbind(TenantBindings {
676            tenant: "demo".into(),
677            packs: vec![PackBinding {
678                pack_id: "deep-research-demo".into(),
679                pack_ref: "deep-research-demo@0.1.0".into(),
680                pack_locator: None,
681                flows: vec!["main".into()],
682            }],
683            env_passthrough: Vec::new(),
684        });
685
686        assert!(
687            cfg.http_enabled,
688            "gtbind-backed tenants should allow outbound component HTTP"
689        );
690    }
691}