1use crate::gtbind::PackBinding;
2use crate::gtbind::TenantBindings;
3use crate::oauth::OAuthBrokerConfig;
4use crate::runner::mocks::MocksConfig;
5use anyhow::{Context, Result};
6use serde::Deserialize;
7use serde_yaml_bw as serde_yaml;
8use std::collections::{HashMap, HashSet};
9use std::fs;
10use std::path::{Path, PathBuf};
11use std::str::FromStr;
12
13#[derive(Debug, Clone)]
14pub struct HostConfig {
15 pub tenant: String,
16 pub bindings_path: PathBuf,
17 pub flow_type_bindings: HashMap<String, FlowBinding>,
18 pub rate_limits: RateLimits,
19 pub retry: FlowRetryConfig,
20 pub http_enabled: bool,
21 pub secrets_policy: SecretsPolicy,
22 pub state_store_policy: StateStorePolicy,
23 pub webhook_policy: WebhookPolicy,
24 pub timers: Vec<TimerBinding>,
25 pub oauth: Option<OAuthConfig>,
26 pub mocks: Option<MocksConfig>,
27 pub pack_bindings: Vec<PackBinding>,
28 pub env_passthrough: Vec<String>,
29}
30
31#[derive(Debug, Clone, Deserialize)]
32pub struct BindingsFile {
33 pub tenant: String,
34 #[serde(default)]
35 pub flow_type_bindings: HashMap<String, FlowBinding>,
36 #[serde(default)]
37 pub rate_limits: RateLimits,
38 #[serde(default)]
39 pub retry: FlowRetryConfig,
40 #[serde(default)]
41 pub timers: Vec<TimerBinding>,
42 #[serde(default)]
43 pub oauth: Option<OAuthConfig>,
44 #[serde(default)]
45 pub mocks: Option<MocksConfig>,
46 #[serde(default)]
47 pub state_store: StateStorePolicy,
48}
49
50#[derive(Debug, Clone, Deserialize)]
51pub struct FlowBinding {
52 pub adapter: String,
53 #[serde(default)]
54 pub config: serde_yaml::Value,
55 #[serde(default)]
56 pub secrets: Vec<String>,
57}
58
59#[derive(Debug, Clone, Deserialize)]
60pub struct RateLimits {
61 #[serde(default = "default_messaging_qps")]
62 pub messaging_send_qps: u32,
63 #[serde(default = "default_messaging_burst")]
64 pub messaging_burst: u32,
65}
66
67#[derive(Debug, Clone)]
68pub struct SecretsPolicy {
69 allowed: HashSet<String>,
70 allow_all: bool,
71}
72
73#[derive(Debug, Clone, Deserialize)]
74pub struct FlowRetryConfig {
75 #[serde(default = "default_retry_attempts")]
76 pub max_attempts: u32,
77 #[serde(default = "default_retry_base_delay_ms")]
78 pub base_delay_ms: u64,
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct WebhookPolicy {
83 allow_paths: Vec<String>,
84 deny_paths: Vec<String>,
85}
86
87#[derive(Debug, Clone, Deserialize)]
88pub struct StateStorePolicy {
89 #[serde(default = "default_state_store_allow")]
90 pub allow: bool,
91}
92
93#[derive(Debug, Clone, Deserialize)]
94pub struct WebhookBindingConfig {
95 #[serde(default)]
96 pub allow_paths: Vec<String>,
97 #[serde(default)]
98 pub deny_paths: Vec<String>,
99}
100
101#[derive(Debug, Clone, Deserialize)]
102pub struct TimerBinding {
103 pub flow_id: String,
104 pub cron: String,
105 #[serde(default)]
106 pub schedule_id: Option<String>,
107}
108
109#[derive(Debug, Clone, Deserialize)]
110pub struct OAuthConfig {
111 pub http_base_url: String,
112 pub nats_url: String,
113 pub provider: String,
114 #[serde(default)]
115 pub env: Option<String>,
116 #[serde(default)]
117 pub team: Option<String>,
118}
119
120impl HostConfig {
121 pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self> {
122 let path = path.as_ref();
123 let content = fs::read_to_string(path)
124 .with_context(|| format!("failed to read bindings file {path:?}"))?;
125 let bindings: BindingsFile = serde_yaml::from_str(&content)
126 .with_context(|| format!("failed to parse bindings file {path:?}"))?;
127
128 let secrets_policy = SecretsPolicy::from_bindings(&bindings);
129 let http_enabled = bindings.flow_type_bindings.contains_key("messaging");
130 let webhook_policy = bindings
131 .flow_type_bindings
132 .get("webhook")
133 .and_then(|binding| {
134 serde_yaml::from_value::<WebhookBindingConfig>(binding.config.clone())
135 .map(WebhookPolicy::from)
136 .map_err(|err| {
137 tracing::warn!(error = %err, "failed to parse webhook binding config");
138 err
139 })
140 .ok()
141 })
142 .unwrap_or_default();
143
144 Ok(Self {
145 tenant: bindings.tenant.clone(),
146 bindings_path: path.to_path_buf(),
147 flow_type_bindings: bindings.flow_type_bindings.clone(),
148 rate_limits: bindings.rate_limits.clone(),
149 retry: bindings.retry.clone(),
150 http_enabled,
151 secrets_policy,
152 state_store_policy: bindings.state_store.clone(),
153 webhook_policy,
154 timers: bindings.timers.clone(),
155 oauth: bindings.oauth.clone(),
156 mocks: bindings.mocks.clone(),
157 pack_bindings: Vec::new(),
158 env_passthrough: Vec::new(),
159 })
160 }
161
162 pub fn from_gtbind(bindings: TenantBindings) -> Self {
163 Self {
164 tenant: bindings.tenant,
165 bindings_path: PathBuf::from("<gtbind>"),
166 flow_type_bindings: HashMap::new(),
167 rate_limits: RateLimits::default(),
168 retry: FlowRetryConfig::default(),
169 http_enabled: false,
170 secrets_policy: SecretsPolicy::allow_all(),
171 state_store_policy: StateStorePolicy::default(),
172 webhook_policy: WebhookPolicy::default(),
173 timers: Vec::new(),
174 oauth: None,
175 mocks: None,
176 pack_bindings: bindings.packs,
177 env_passthrough: bindings.env_passthrough,
178 }
179 }
180
181 pub fn messaging_binding(&self) -> Option<&FlowBinding> {
182 self.flow_type_bindings.get("messaging")
183 }
184
185 pub fn retry_config(&self) -> FlowRetryConfig {
186 self.retry.clone()
187 }
188
189 pub fn oauth_broker_config(&self) -> Option<OAuthBrokerConfig> {
190 let oauth = self.oauth.as_ref()?;
191 let mut cfg = OAuthBrokerConfig::new(&oauth.http_base_url, &oauth.nats_url);
192 if !oauth.provider.is_empty() {
193 cfg.default_provider = Some(oauth.provider.clone());
194 }
195 if let Some(team) = &oauth.team
196 && !team.is_empty()
197 {
198 cfg.team = Some(team.clone());
199 }
200 Some(cfg)
201 }
202
203 pub fn tenant_ctx(&self) -> greentic_types::TenantCtx {
206 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
207 let env_id = greentic_types::EnvId::from_str(&env)
208 .unwrap_or_else(|_| greentic_types::EnvId::new("local").expect("local env id"));
209 let tenant_id = greentic_types::TenantId::from_str(&self.tenant)
210 .unwrap_or_else(|_| greentic_types::TenantId::new("local").expect("tenant id"));
211 greentic_types::TenantCtx::new(env_id, tenant_id)
212 }
213}
214
215impl SecretsPolicy {
216 fn from_bindings(bindings: &BindingsFile) -> Self {
217 let allowed = bindings
218 .flow_type_bindings
219 .values()
220 .flat_map(|binding| binding.secrets.iter().cloned())
221 .collect::<HashSet<_>>();
222 Self {
223 allowed,
224 allow_all: false,
225 }
226 }
227
228 pub fn is_allowed(&self, key: &str) -> bool {
229 self.allow_all || self.allowed.contains(key)
230 }
231
232 pub fn allow_all() -> Self {
233 Self {
234 allowed: HashSet::new(),
235 allow_all: true,
236 }
237 }
238}
239
240impl Default for RateLimits {
241 fn default() -> Self {
242 Self {
243 messaging_send_qps: default_messaging_qps(),
244 messaging_burst: default_messaging_burst(),
245 }
246 }
247}
248
249impl Default for StateStorePolicy {
250 fn default() -> Self {
251 Self {
252 allow: default_state_store_allow(),
253 }
254 }
255}
256
257fn default_messaging_qps() -> u32 {
258 10
259}
260
261fn default_messaging_burst() -> u32 {
262 20
263}
264
265fn default_state_store_allow() -> bool {
266 true
267}
268
269impl From<WebhookBindingConfig> for WebhookPolicy {
270 fn from(value: WebhookBindingConfig) -> Self {
271 Self {
272 allow_paths: value.allow_paths,
273 deny_paths: value.deny_paths,
274 }
275 }
276}
277
278impl WebhookPolicy {
279 pub fn is_allowed(&self, path: &str) -> bool {
280 if self
281 .deny_paths
282 .iter()
283 .any(|prefix| path.starts_with(prefix))
284 {
285 return false;
286 }
287
288 if self.allow_paths.is_empty() {
289 return true;
290 }
291
292 self.allow_paths
293 .iter()
294 .any(|prefix| path.starts_with(prefix))
295 }
296}
297
298impl TimerBinding {
299 pub fn schedule_id(&self) -> &str {
300 self.schedule_id.as_deref().unwrap_or(self.flow_id.as_str())
301 }
302}
303
304impl Default for FlowRetryConfig {
305 fn default() -> Self {
306 Self {
307 max_attempts: default_retry_attempts(),
308 base_delay_ms: default_retry_base_delay_ms(),
309 }
310 }
311}
312
313fn default_retry_attempts() -> u32 {
314 3
315}
316
317fn default_retry_base_delay_ms() -> u64 {
318 250
319}
320
321#[cfg(test)]
322#[allow(clippy::items_after_test_module)]
323mod tests {
324 use super::*;
325 use std::collections::HashMap;
326 use std::path::PathBuf;
327
328 fn host_config_with_oauth(oauth: Option<OAuthConfig>) -> HostConfig {
329 HostConfig {
330 tenant: "tenant-a".to_string(),
331 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
332 flow_type_bindings: HashMap::new(),
333 rate_limits: RateLimits::default(),
334 retry: FlowRetryConfig::default(),
335 http_enabled: false,
336 secrets_policy: SecretsPolicy::allow_all(),
337 state_store_policy: StateStorePolicy::default(),
338 webhook_policy: WebhookPolicy::default(),
339 timers: Vec::new(),
340 oauth,
341 mocks: None,
342 pack_bindings: Vec::new(),
343 env_passthrough: Vec::new(),
344 }
345 }
346
347 #[test]
348 fn oauth_broker_config_absent_without_block() {
349 let cfg = host_config_with_oauth(None);
350 assert!(cfg.oauth_broker_config().is_none());
351 }
352
353 #[test]
354 fn oauth_broker_config_maps_fields() {
355 let cfg = host_config_with_oauth(Some(OAuthConfig {
356 http_base_url: "https://oauth.example/".into(),
357 nats_url: "nats://broker:4222".into(),
358 provider: "demo".into(),
359 env: None,
360 team: Some("ops".into()),
361 }));
362 let broker = cfg.oauth_broker_config().expect("missing broker config");
363 assert_eq!(broker.http_base_url, "https://oauth.example/");
364 assert_eq!(broker.nats_url, "nats://broker:4222");
365 assert_eq!(broker.default_provider.as_deref(), Some("demo"));
366 assert_eq!(broker.team.as_deref(), Some("ops"));
367 }
368}