1use anyhow::Error;
2use lazy_static::lazy_static;
3use std::collections::{BTreeMap, HashMap};
4use tembo_controller::{
5 apis::postgres_parameters::PgConfig,
6 app_service::types::{AppService, EnvVar},
7 extensions::types::{Extension, ExtensionInstallLocation, TrunkInstall},
8};
9use tracing::{instrument, warn};
10
11use crate::apps::types::{App, AppConfig, AppType, MergedConfigs};
12
13lazy_static! {
14 pub static ref HTTP: App =
15 serde_yaml::from_str(include_str!("http.yaml")).expect("http.yaml not found");
16 pub static ref RESTAPI: App =
17 serde_yaml::from_str(include_str!("restapi.yaml")).expect("restapi.yaml not found");
18 pub static ref MQ: App =
19 serde_yaml::from_str(include_str!("mq.yaml")).expect("mq.yaml not found");
20 pub static ref EMBEDDINGS: App =
21 serde_yaml::from_str(include_str!("embeddings.yaml")).expect("embeddings.yaml not found");
22 pub static ref PGANALYZE: App =
23 serde_yaml::from_str(include_str!("pganalyze.yaml")).expect("pganalyze.yaml not found");
24 pub static ref SQLRUNNER: App =
25 serde_yaml::from_str(include_str!("sql-runner.yaml")).expect("sql-runner.yaml not found");
26}
27
28#[instrument(skip(user_apps, stack_apps, extensions, trunk_installs, pg_configs))]
30pub fn merge_app_reqs(
31 user_apps: Option<Vec<AppType>>,
32 stack_apps: Option<Vec<AppService>>,
33 extensions: Option<Vec<Extension>>,
34 trunk_installs: Option<Vec<TrunkInstall>>,
35 pg_configs: Option<Vec<PgConfig>>,
36) -> Result<MergedConfigs, Error> {
37 let mut fin_app_extensions: Vec<Extension> = vec![];
38 let mut fin_app_trunk_installs: Vec<TrunkInstall> = vec![];
39 let mut final_pg_configs: Vec<PgConfig> = vec![];
40
41 let mut user_app_services: Vec<AppService> = vec![];
42 if let Some(apps) = user_apps {
44 for app in apps {
45 match app {
46 AppType::RestAPI(config) => {
47 let mut restapi = RESTAPI.clone().app_services.unwrap().clone()[0].clone();
49 if let Some(cfg) = config {
51 restapi = merge_app_configs(restapi, cfg);
52 };
53 user_app_services.push(restapi);
54 }
57 AppType::HTTP(config) => {
58 let http = HTTP.clone();
59 let mut http_app_svc = http.app_services.unwrap()[0].clone();
60 if let Some(cfg) = config {
61 http_app_svc = merge_app_configs(http_app_svc, cfg);
62 };
63 user_app_services.push(http_app_svc);
64 if let Some(extensions) = http.extensions {
65 fin_app_extensions.extend(extensions);
66 }
67 if let Some(trunks) = http.trunk_installs {
68 fin_app_trunk_installs.extend(trunks);
69 }
70 }
71 AppType::MQ(config) => {
72 let mq = MQ.clone();
73 let mut mq_app_svc = mq.app_services.unwrap()[0].clone();
74 if let Some(cfg) = config {
75 mq_app_svc = merge_app_configs(mq_app_svc, cfg);
76 }
77 user_app_services.push(mq_app_svc);
78 }
79 AppType::Embeddings(config) => {
80 let embedding_app = EMBEDDINGS.clone();
81 let mut embedding_app_svc = embedding_app.app_services.unwrap()[0].clone();
83 if let Some(cfg) = config {
84 embedding_app_svc = merge_app_configs(embedding_app_svc, cfg);
85 }
86 user_app_services.push(embedding_app_svc);
87 if let Some(extensions) = embedding_app.extensions {
89 fin_app_extensions.extend(extensions);
90 }
91 if let Some(trunks) = embedding_app.trunk_installs {
93 fin_app_trunk_installs.extend(trunks);
94 }
95
96 if let Some(pg_cfg) = embedding_app.postgres_config {
97 final_pg_configs.extend(pg_cfg);
98 }
99 }
100 AppType::PgAnalyze(config) => {
101 let pg_analyze = PGANALYZE.clone();
103 let mut pg_analyze_app_svc = pg_analyze.app_services.unwrap()[0].clone();
104 if let Some(cfg) = config {
106 pg_analyze_app_svc = merge_app_configs(pg_analyze_app_svc, cfg);
107 }
108 user_app_services.push(pg_analyze_app_svc);
109 if let Some(extensions) = pg_analyze.extensions {
111 fin_app_extensions.extend(extensions);
112 }
113 if let Some(trunks) = pg_analyze.trunk_installs {
115 fin_app_trunk_installs.extend(trunks);
116 }
117 if let Some(pg_cfg) = pg_analyze.postgres_config {
119 final_pg_configs.extend(pg_cfg);
120 }
121 }
122 AppType::SqlRunner(_) => {
123 let sqlrunner = SQLRUNNER.clone().app_services.unwrap().clone()[0].clone();
125 user_app_services.push(sqlrunner);
126 }
129 AppType::Custom(custom_app) => {
130 user_app_services.push(custom_app);
131 }
132 }
133 }
134 }
135
136 let final_apps = match stack_apps {
138 Some(s_apps) => {
139 let merged_apps = merge_apps(user_app_services, s_apps)?;
140 Some(merged_apps)
141 }
142 None => {
143 if user_app_services.is_empty() {
144 None
145 } else {
146 Some(user_app_services)
147 }
148 }
149 };
150
151 let mut final_extensions: Vec<Extension> = match extensions {
152 Some(exts) => exts.clone(),
153 None => vec![],
154 };
155
156 for app_ext in fin_app_extensions {
157 for loc in app_ext.locations {
158 final_extensions =
159 merge_location_into_extensions(&app_ext.name, &loc, final_extensions);
160 }
161 }
162 let fe = if final_extensions.is_empty() {
164 None
165 } else {
166 Some(final_extensions)
167 };
168
169 let final_trunks = match trunk_installs {
170 Some(trunks) => Some(merge_trunk_installs(trunks, fin_app_trunk_installs)),
171 None => {
172 if fin_app_trunk_installs.is_empty() {
173 None
174 } else {
175 Some(fin_app_trunk_installs)
176 }
177 }
178 };
179
180 let final_pg_configs = match pg_configs {
182 Some(cfgs) => Some(merge_pg_configs(cfgs, final_pg_configs)),
183 None => {
184 if final_pg_configs.is_empty() {
185 None
186 } else {
187 Some(final_pg_configs)
188 }
189 }
190 };
191
192 Ok(MergedConfigs {
193 extensions: fe,
194 trunk_installs: final_trunks,
195 app_services: final_apps,
196 pg_configs: final_pg_configs,
197 })
198}
199
200#[instrument(skip(opt1, opt2))]
202pub fn merge_options<T>(opt1: Option<Vec<T>>, opt2: Option<Vec<T>>) -> Option<Vec<T>>
203where
204 T: Clone,
205{
206 match (opt1, opt2) {
207 (Some(mut vec1), Some(vec2)) => {
208 vec1.extend(vec2);
209 Some(vec1)
210 }
211 (Some(vec), None) | (None, Some(vec)) => Some(vec),
212 (None, None) => None,
213 }
214}
215
216#[instrument]
217pub fn merge_location_into_extensions(
218 extension_name: &str,
219 new_location: &ExtensionInstallLocation,
220 current_extensions: Vec<Extension>,
221) -> Vec<Extension> {
222 let mut new_extensions = current_extensions.clone();
223 for extension in &mut new_extensions {
224 if extension.name == extension_name {
226 for location in &mut extension.locations {
227 if location.database == new_location.database {
229 *location = new_location.clone();
231 return new_extensions;
232 }
233 }
234 extension.locations.push(new_location.clone());
236 extension
239 .locations
240 .sort_by(|a, b| a.database.cmp(&b.database).then(a.schema.cmp(&b.schema)));
241 return new_extensions;
242 }
243 }
244 new_extensions.push(Extension {
246 name: extension_name.to_string(),
247 description: None,
248 locations: vec![new_location.clone()],
249 });
250 new_extensions.sort_by(|a, b| a.name.cmp(&b.name));
252 new_extensions
253}
254
255#[instrument]
257fn merge_apps(
258 user_apps: Vec<AppService>,
259 stack_apps: Vec<AppService>,
260) -> Result<Vec<AppService>, Error> {
261 let mut final_apps: HashMap<String, AppService> = HashMap::new();
264 for app in stack_apps {
265 final_apps.insert(app.name.clone(), app);
266 }
267 for app in user_apps {
268 final_apps.insert(app.name.clone(), app);
269 }
270 Ok(final_apps.into_values().collect())
271}
272
273#[instrument(skip(cfg1, cfg2))]
276pub fn merge_pg_configs(cfg1: Vec<PgConfig>, cfg2: Vec<PgConfig>) -> Vec<PgConfig> {
277 let mut map: BTreeMap<String, PgConfig> = BTreeMap::new();
278 for cfg in cfg1 {
279 map.insert(cfg.name.clone(), cfg);
280 }
281 for cfg in cfg2 {
282 map.insert(cfg.name.clone(), cfg);
283 }
284 map.into_values().collect()
285}
286
287pub fn merge_extensions(vec1: Vec<Extension>, vec2: Vec<Extension>) -> Vec<Extension> {
290 let mut map = HashMap::new();
291
292 for ext in vec1 {
293 map.insert(ext.name.clone(), ext);
294 }
295
296 for ext in vec2 {
297 map.insert(ext.name.clone(), ext);
298 }
299
300 map.into_values().collect()
301}
302
303#[instrument(skip(vec1, vec2))]
304pub fn merge_trunk_installs(vec1: Vec<TrunkInstall>, vec2: Vec<TrunkInstall>) -> Vec<TrunkInstall> {
305 let mut map = HashMap::new();
306
307 for ext in vec1 {
308 map.insert(ext.name.clone(), ext);
309 }
310
311 for ext in vec2 {
312 map.insert(ext.name.clone(), ext);
313 }
314
315 map.into_values().collect()
316}
317
318pub fn merge_app_configs(mut default_app: AppService, cfgs: AppConfig) -> AppService {
320 default_app.env = match (default_app.env, cfgs.env) {
322 (Some(defaults), Some(overrides)) => {
323 let envs = merge_env_defaults(defaults, overrides);
324 Some(envs)
325 }
326 (None, Some(overrides)) => Some(overrides),
327 (Some(defaults), None) => Some(defaults),
328 (None, None) => None,
329 };
330
331 if let Some(resources) = cfgs.resources {
333 default_app.resources = resources;
334 }
335
336 default_app
338}
339
340fn merge_env_defaults(defaults: Vec<EnvVar>, overrides: Vec<EnvVar>) -> Vec<EnvVar> {
342 let mut default_map: HashMap<String, EnvVar> = defaults
343 .into_iter()
344 .map(|var| (var.name.clone(), var))
345 .collect();
346 for var in overrides {
347 default_map.insert(var.name.clone(), var);
348 }
349 default_map.into_values().collect()
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use crate::apps::types::AppConfig;
356 use tembo_controller::app_service::types::EnvVar;
357 #[test]
358 fn test_merge_app_reqs() {
359 let app_config = AppConfig {
360 env: Some(vec![
361 EnvVar {
362 name: "APP_ENV".to_string(),
363 value: Some("user".to_string()),
364 value_from_platform: None,
365 },
366 EnvVar {
367 name: "TMPDIR".to_string(),
368 value: Some("/custom_dir".to_string()),
369 value_from_platform: None,
370 },
371 ]),
372 resources: None,
373 };
374 let user_embedding_app = AppType::Embeddings(Some(app_config));
375 let user_apps = vec![user_embedding_app];
376 let stack_apps = vec![AppService {
377 name: "embeddings".to_string(),
378 env: Some(vec![EnvVar {
379 name: "APP_ENV".to_string(),
380 value: Some("stack".to_string()),
381 value_from_platform: None,
382 }]),
383 ..AppService::default()
384 }];
385 let merged_configs: MergedConfigs =
386 merge_app_reqs(Some(user_apps), Some(stack_apps), None, None, None).unwrap();
387
388 let embedding_app = merged_configs
390 .app_services
391 .unwrap()
392 .iter()
393 .find(|a| a.name == "embeddings")
394 .unwrap()
395 .clone();
396 let mut to_find = 2;
397 assert_eq!(embedding_app.env.as_ref().unwrap().len(), 5);
399 for e in embedding_app.env.unwrap() {
400 match e.name.as_str() {
401 "APP_ENV" => {
403 assert_eq!(e.value.unwrap(), "user".to_string());
404 to_find -= 1;
405 }
406 "TMPDIR" => {
408 assert_eq!(e.value.unwrap(), "/custom_dir".to_string());
409 to_find -= 1;
410 }
411 _ => {}
412 }
413 }
414 assert_eq!(to_find, 0);
415
416 let metrics = embedding_app.metrics.expect("metrics not found");
418 assert_eq!(metrics.path, "/metrics".to_string());
419 assert_eq!(metrics.port, 3000);
420 }
421
422 #[test]
423 fn test_app_specs() {
424 assert!(EMBEDDINGS.app_services.is_some());
425 assert!(HTTP.app_services.is_some());
426 assert!(MQ.app_services.is_some());
427 assert!(PGANALYZE.app_services.is_some());
428 assert!(RESTAPI.app_services.is_some());
429 }
430
431 #[test]
432 fn test_pganalyze_spec() {
433 let cfg = PGANALYZE.postgres_config.clone().unwrap();
434 for c in cfg {
435 if c.name == "log_line_prefix" {
436 assert_eq!(c.value.to_string(), "'%m [%p] %q[user=%u,app=%a] ',db=%d")
437 }
438 }
439 }
440
441 #[test]
442 fn test_merge_apps() {
443 let user_apps = vec![
444 AppService {
445 name: "app1".to_string(),
446 image: "user_image".to_string(),
447 ..AppService::default()
448 },
449 AppService {
450 name: "app2".to_string(),
451 image: "user_image".to_string(),
452 ..AppService::default()
453 },
454 ];
455 let stack_apps = vec![
456 AppService {
457 name: "app1".to_string(),
458 image: "stack_image".to_string(),
459 ..AppService::default()
460 },
461 AppService {
462 name: "app3".to_string(),
463 image: "stack_image".to_string(),
464 ..AppService::default()
465 },
466 ];
467 let merged_apps = merge_apps(user_apps, stack_apps.clone()).unwrap();
469 assert_eq!(merged_apps.len(), 3);
470 for app in merged_apps {
471 if app.name == "app1" {
472 assert_eq!(app.image, "user_image");
473 }
474 if app.name == "reserved_name_1" {
476 assert_eq!(app.image, "stack_image");
477 }
478 }
479 }
480
481 #[test]
482 fn test_merge_env_vars() {
483 let e0 = EnvVar {
484 name: "e0".to_string(),
485 value: Some("e0".to_string()),
486 value_from_platform: None,
487 };
488 let e1 = EnvVar {
489 name: "e1".to_string(),
490 value: Some("e1".to_string()),
491 value_from_platform: None,
492 };
493 let e1_override = EnvVar {
494 name: "e1".to_string(),
495 value: Some("e1-override".to_string()),
496 value_from_platform: None,
497 };
498 let e2 = EnvVar {
499 name: "e2".to_string(),
500 value: Some("e2".to_string()),
501 value_from_platform: None,
502 };
503 let v0 = vec![e0.clone(), e1.clone()];
504 let v1 = vec![e1.clone(), e1_override.clone(), e2.clone()];
505 let merged = crate::apps::app::merge_env_defaults(v0.clone(), v1.clone());
506 assert_eq!(merged.len(), 3);
507 let merged_envs: HashMap<String, EnvVar> = merged
508 .into_iter()
509 .map(|var| (var.name.clone(), var))
510 .collect();
511 let same = merged_envs.get("e0").unwrap();
512 assert_eq!(same.value, Some("e0".to_string()));
513 let overridden = merged_envs.get("e1").unwrap();
514 assert_eq!(overridden.value, Some("e1-override".to_string()));
515 let same = merged_envs.get("e2").unwrap();
516 assert_eq!(same.value, Some("e2".to_string()));
517 }
518}