tembo_stacks/apps/
app.rs

1use anyhow::Error;
2use lazy_static::lazy_static;
3use std::collections::{BTreeMap, HashMap};
4use tembo_controller::{
5    apis::postgres_parameters::PgConfig,
6    app_service::types::{AppService, EnvVar},
7    extensions::types::{Extension, ExtensionInstallLocation, TrunkInstall},
8};
9use tracing::{instrument, warn};
10
11use crate::apps::types::{App, AppConfig, AppType, MergedConfigs};
12
13lazy_static! {
14    pub static ref HTTP: App =
15        serde_yaml::from_str(include_str!("http.yaml")).expect("http.yaml not found");
16    pub static ref RESTAPI: App =
17        serde_yaml::from_str(include_str!("restapi.yaml")).expect("restapi.yaml not found");
18    pub static ref MQ: App =
19        serde_yaml::from_str(include_str!("mq.yaml")).expect("mq.yaml not found");
20    pub static ref EMBEDDINGS: App =
21        serde_yaml::from_str(include_str!("embeddings.yaml")).expect("embeddings.yaml not found");
22    pub static ref PGANALYZE: App =
23        serde_yaml::from_str(include_str!("pganalyze.yaml")).expect("pganalyze.yaml not found");
24    pub static ref SQLRUNNER: App =
25        serde_yaml::from_str(include_str!("sql-runner.yaml")).expect("sql-runner.yaml not found");
26}
27
28// handling merging requirements coming from an App into the final
29#[instrument(skip(user_apps, stack_apps, extensions, trunk_installs, pg_configs))]
30pub fn merge_app_reqs(
31    user_apps: Option<Vec<AppType>>,
32    stack_apps: Option<Vec<AppService>>,
33    extensions: Option<Vec<Extension>>,
34    trunk_installs: Option<Vec<TrunkInstall>>,
35    pg_configs: Option<Vec<PgConfig>>,
36) -> Result<MergedConfigs, Error> {
37    let mut fin_app_extensions: Vec<Extension> = vec![];
38    let mut fin_app_trunk_installs: Vec<TrunkInstall> = vec![];
39    let mut final_pg_configs: Vec<PgConfig> = vec![];
40
41    let mut user_app_services: Vec<AppService> = vec![];
42    // generates a Vec<AppService> from the user provided Apps
43    if let Some(apps) = user_apps {
44        for app in apps {
45            match app {
46                AppType::RestAPI(config) => {
47                    // there is only 1 app_service in the restAPI
48                    let mut restapi = RESTAPI.clone().app_services.unwrap().clone()[0].clone();
49                    // if there are user provided configs, overwrite the defaults with them
50                    if let Some(cfg) = config {
51                        restapi = merge_app_configs(restapi, cfg);
52                    };
53                    user_app_services.push(restapi);
54                    // restAPI only has app_service containers
55                    // no extensions or trunk installs
56                }
57                AppType::HTTP(config) => {
58                    let http = HTTP.clone();
59                    let mut http_app_svc = http.app_services.unwrap()[0].clone();
60                    if let Some(cfg) = config {
61                        http_app_svc = merge_app_configs(http_app_svc, cfg);
62                    };
63                    user_app_services.push(http_app_svc);
64                    if let Some(extensions) = http.extensions {
65                        fin_app_extensions.extend(extensions);
66                    }
67                    if let Some(trunks) = http.trunk_installs {
68                        fin_app_trunk_installs.extend(trunks);
69                    }
70                }
71                AppType::MQ(config) => {
72                    let mq = MQ.clone();
73                    let mut mq_app_svc = mq.app_services.unwrap()[0].clone();
74                    if let Some(cfg) = config {
75                        mq_app_svc = merge_app_configs(mq_app_svc, cfg);
76                    }
77                    user_app_services.push(mq_app_svc);
78                }
79                AppType::Embeddings(config) => {
80                    let embedding_app = EMBEDDINGS.clone();
81                    // handle the app container from embeddings app
82                    let mut embedding_app_svc = embedding_app.app_services.unwrap()[0].clone();
83                    if let Some(cfg) = config {
84                        embedding_app_svc = merge_app_configs(embedding_app_svc, cfg);
85                    }
86                    user_app_services.push(embedding_app_svc);
87                    // handle extensions from embeddings app
88                    if let Some(extensions) = embedding_app.extensions {
89                        fin_app_extensions.extend(extensions);
90                    }
91                    // handle the trunk installs from embeddings app
92                    if let Some(trunks) = embedding_app.trunk_installs {
93                        fin_app_trunk_installs.extend(trunks);
94                    }
95
96                    if let Some(pg_cfg) = embedding_app.postgres_config {
97                        final_pg_configs.extend(pg_cfg);
98                    }
99                }
100                AppType::PgAnalyze(config) => {
101                    // There is only 1 app_service in the pganalyze app
102                    let pg_analyze = PGANALYZE.clone();
103                    let mut pg_analyze_app_svc = pg_analyze.app_services.unwrap()[0].clone();
104                    // If there are user provided configs, overwrite the defaults with them
105                    if let Some(cfg) = config {
106                        pg_analyze_app_svc = merge_app_configs(pg_analyze_app_svc, cfg);
107                    }
108                    user_app_services.push(pg_analyze_app_svc);
109                    // Handle extensions from pganalyze app
110                    if let Some(extensions) = pg_analyze.extensions {
111                        fin_app_extensions.extend(extensions);
112                    }
113                    // Handle trunk installs from pganalyze app
114                    if let Some(trunks) = pg_analyze.trunk_installs {
115                        fin_app_trunk_installs.extend(trunks);
116                    }
117                    // Handle postgres_config from pganalyze app
118                    if let Some(pg_cfg) = pg_analyze.postgres_config {
119                        final_pg_configs.extend(pg_cfg);
120                    }
121                }
122                AppType::SqlRunner(_) => {
123                    // there is only 1 app_service in the restAPI
124                    let sqlrunner = SQLRUNNER.clone().app_services.unwrap().clone()[0].clone();
125                    user_app_services.push(sqlrunner);
126                    // sqlrunner only has app_service containers
127                    // no extensions or trunk installs
128                }
129                AppType::Custom(custom_app) => {
130                    user_app_services.push(custom_app);
131                }
132            }
133        }
134    }
135
136    // merge stack apps into final app services
137    let final_apps = match stack_apps {
138        Some(s_apps) => {
139            let merged_apps = merge_apps(user_app_services, s_apps)?;
140            Some(merged_apps)
141        }
142        None => {
143            if user_app_services.is_empty() {
144                None
145            } else {
146                Some(user_app_services)
147            }
148        }
149    };
150
151    let mut final_extensions: Vec<Extension> = match extensions {
152        Some(exts) => exts.clone(),
153        None => vec![],
154    };
155
156    for app_ext in fin_app_extensions {
157        for loc in app_ext.locations {
158            final_extensions =
159                merge_location_into_extensions(&app_ext.name, &loc, final_extensions);
160        }
161    }
162    // final extensions
163    let fe = if final_extensions.is_empty() {
164        None
165    } else {
166        Some(final_extensions)
167    };
168
169    let final_trunks = match trunk_installs {
170        Some(trunks) => Some(merge_trunk_installs(trunks, fin_app_trunk_installs)),
171        None => {
172            if fin_app_trunk_installs.is_empty() {
173                None
174            } else {
175                Some(fin_app_trunk_installs)
176            }
177        }
178    };
179
180    // merge all the pg configs coming from Apps into the existing Instance configs
181    let final_pg_configs = match pg_configs {
182        Some(cfgs) => Some(merge_pg_configs(cfgs, final_pg_configs)),
183        None => {
184            if final_pg_configs.is_empty() {
185                None
186            } else {
187                Some(final_pg_configs)
188            }
189        }
190    };
191
192    Ok(MergedConfigs {
193        extensions: fe,
194        trunk_installs: final_trunks,
195        app_services: final_apps,
196        pg_configs: final_pg_configs,
197    })
198}
199
200// used for merging Vec of requested with Vec in Stack spec
201#[instrument(skip(opt1, opt2))]
202pub fn merge_options<T>(opt1: Option<Vec<T>>, opt2: Option<Vec<T>>) -> Option<Vec<T>>
203where
204    T: Clone,
205{
206    match (opt1, opt2) {
207        (Some(mut vec1), Some(vec2)) => {
208            vec1.extend(vec2);
209            Some(vec1)
210        }
211        (Some(vec), None) | (None, Some(vec)) => Some(vec),
212        (None, None) => None,
213    }
214}
215
216#[instrument]
217pub fn merge_location_into_extensions(
218    extension_name: &str,
219    new_location: &ExtensionInstallLocation,
220    current_extensions: Vec<Extension>,
221) -> Vec<Extension> {
222    let mut new_extensions = current_extensions.clone();
223    for extension in &mut new_extensions {
224        // If the extension is already in the list
225        if extension.name == extension_name {
226            for location in &mut extension.locations {
227                // If the location is already in the extension
228                if location.database == new_location.database {
229                    // Then replace it
230                    *location = new_location.clone();
231                    return new_extensions;
232                }
233            }
234            // If we never found the location, append it to existing extension status
235            extension.locations.push(new_location.clone());
236            // Then sort the locations alphabetically by database and schema
237            // sort locations by database and schema so the order is deterministic
238            extension
239                .locations
240                .sort_by(|a, b| a.database.cmp(&b.database).then(a.schema.cmp(&b.schema)));
241            return new_extensions;
242        }
243    }
244    // If we never found the extension status, append it
245    new_extensions.push(Extension {
246        name: extension_name.to_string(),
247        description: None,
248        locations: vec![new_location.clone()],
249    });
250    // Then sort alphabetically by name
251    new_extensions.sort_by(|a, b| a.name.cmp(&b.name));
252    new_extensions
253}
254
255// merge user Apps and Stack Apps
256#[instrument]
257fn merge_apps(
258    user_apps: Vec<AppService>,
259    stack_apps: Vec<AppService>,
260) -> Result<Vec<AppService>, Error> {
261    // when user provides configuration for app with same name as another app,
262    // the user provided configuration overrides the existing configuration
263    let mut final_apps: HashMap<String, AppService> = HashMap::new();
264    for app in stack_apps {
265        final_apps.insert(app.name.clone(), app);
266    }
267    for app in user_apps {
268        final_apps.insert(app.name.clone(), app);
269    }
270    Ok(final_apps.into_values().collect())
271}
272
273// merges 2 vecs of PgConfigs
274// except for multivalue configs, cfg2 overrides cfg2 if they match on name
275#[instrument(skip(cfg1, cfg2))]
276pub fn merge_pg_configs(cfg1: Vec<PgConfig>, cfg2: Vec<PgConfig>) -> Vec<PgConfig> {
277    let mut map: BTreeMap<String, PgConfig> = BTreeMap::new();
278    for cfg in cfg1 {
279        map.insert(cfg.name.clone(), cfg);
280    }
281    for cfg in cfg2 {
282        map.insert(cfg.name.clone(), cfg);
283    }
284    map.into_values().collect()
285}
286
287// merge two vecs of extensions
288// vec2 overrides vec1 if they match on Extension.name
289pub fn merge_extensions(vec1: Vec<Extension>, vec2: Vec<Extension>) -> Vec<Extension> {
290    let mut map = HashMap::new();
291
292    for ext in vec1 {
293        map.insert(ext.name.clone(), ext);
294    }
295
296    for ext in vec2 {
297        map.insert(ext.name.clone(), ext);
298    }
299
300    map.into_values().collect()
301}
302
303#[instrument(skip(vec1, vec2))]
304pub fn merge_trunk_installs(vec1: Vec<TrunkInstall>, vec2: Vec<TrunkInstall>) -> Vec<TrunkInstall> {
305    let mut map = HashMap::new();
306
307    for ext in vec1 {
308        map.insert(ext.name.clone(), ext);
309    }
310
311    for ext in vec2 {
312        map.insert(ext.name.clone(), ext);
313    }
314
315    map.into_values().collect()
316}
317
318// handles overriding any default AppService configurations with user specified configurations
319pub fn merge_app_configs(mut default_app: AppService, cfgs: AppConfig) -> AppService {
320    // append override envs, if any, with the required env vars
321    default_app.env = match (default_app.env, cfgs.env) {
322        (Some(defaults), Some(overrides)) => {
323            let envs = merge_env_defaults(defaults, overrides);
324            Some(envs)
325        }
326        (None, Some(overrides)) => Some(overrides),
327        (Some(defaults), None) => Some(defaults),
328        (None, None) => None,
329    };
330
331    // override resources if present
332    if let Some(resources) = cfgs.resources {
333        default_app.resources = resources;
334    }
335
336    // override other configs as they become supported
337    default_app
338}
339
340// overrides default env vars with user provided env vars
341fn merge_env_defaults(defaults: Vec<EnvVar>, overrides: Vec<EnvVar>) -> Vec<EnvVar> {
342    let mut default_map: HashMap<String, EnvVar> = defaults
343        .into_iter()
344        .map(|var| (var.name.clone(), var))
345        .collect();
346    for var in overrides {
347        default_map.insert(var.name.clone(), var);
348    }
349    default_map.into_values().collect()
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use crate::apps::types::AppConfig;
356    use tembo_controller::app_service::types::EnvVar;
357    #[test]
358    fn test_merge_app_reqs() {
359        let app_config = AppConfig {
360            env: Some(vec![
361                EnvVar {
362                    name: "APP_ENV".to_string(),
363                    value: Some("user".to_string()),
364                    value_from_platform: None,
365                },
366                EnvVar {
367                    name: "TMPDIR".to_string(),
368                    value: Some("/custom_dir".to_string()),
369                    value_from_platform: None,
370                },
371            ]),
372            resources: None,
373        };
374        let user_embedding_app = AppType::Embeddings(Some(app_config));
375        let user_apps = vec![user_embedding_app];
376        let stack_apps = vec![AppService {
377            name: "embeddings".to_string(),
378            env: Some(vec![EnvVar {
379                name: "APP_ENV".to_string(),
380                value: Some("stack".to_string()),
381                value_from_platform: None,
382            }]),
383            ..AppService::default()
384        }];
385        let merged_configs: MergedConfigs =
386            merge_app_reqs(Some(user_apps), Some(stack_apps), None, None, None).unwrap();
387
388        // filter for embedding app
389        let embedding_app = merged_configs
390            .app_services
391            .unwrap()
392            .iter()
393            .find(|a| a.name == "embeddings")
394            .unwrap()
395            .clone();
396        let mut to_find = 2;
397        // 4 embedding app defaults + 1 custom
398        assert_eq!(embedding_app.env.as_ref().unwrap().len(), 5);
399        for e in embedding_app.env.unwrap() {
400            match e.name.as_str() {
401                // custom env var is found
402                "APP_ENV" => {
403                    assert_eq!(e.value.unwrap(), "user".to_string());
404                    to_find -= 1;
405                }
406                // overridden TMPDIR value is found
407                "TMPDIR" => {
408                    assert_eq!(e.value.unwrap(), "/custom_dir".to_string());
409                    to_find -= 1;
410                }
411                _ => {}
412            }
413        }
414        assert_eq!(to_find, 0);
415
416        // validate metrics end up in final_app
417        let metrics = embedding_app.metrics.expect("metrics not found");
418        assert_eq!(metrics.path, "/metrics".to_string());
419        assert_eq!(metrics.port, 3000);
420    }
421
422    #[test]
423    fn test_app_specs() {
424        assert!(EMBEDDINGS.app_services.is_some());
425        assert!(HTTP.app_services.is_some());
426        assert!(MQ.app_services.is_some());
427        assert!(PGANALYZE.app_services.is_some());
428        assert!(RESTAPI.app_services.is_some());
429    }
430
431    #[test]
432    fn test_pganalyze_spec() {
433        let cfg = PGANALYZE.postgres_config.clone().unwrap();
434        for c in cfg {
435            if c.name == "log_line_prefix" {
436                assert_eq!(c.value.to_string(), "'%m [%p] %q[user=%u,app=%a] ',db=%d")
437            }
438        }
439    }
440
441    #[test]
442    fn test_merge_apps() {
443        let user_apps = vec![
444            AppService {
445                name: "app1".to_string(),
446                image: "user_image".to_string(),
447                ..AppService::default()
448            },
449            AppService {
450                name: "app2".to_string(),
451                image: "user_image".to_string(),
452                ..AppService::default()
453            },
454        ];
455        let stack_apps = vec![
456            AppService {
457                name: "app1".to_string(),
458                image: "stack_image".to_string(),
459                ..AppService::default()
460            },
461            AppService {
462                name: "app3".to_string(),
463                image: "stack_image".to_string(),
464                ..AppService::default()
465            },
466        ];
467        // app1 should be overriten with the user provided image
468        let merged_apps = merge_apps(user_apps, stack_apps.clone()).unwrap();
469        assert_eq!(merged_apps.len(), 3);
470        for app in merged_apps {
471            if app.name == "app1" {
472                assert_eq!(app.image, "user_image");
473            }
474            // reserved_name_1 should not be overriten
475            if app.name == "reserved_name_1" {
476                assert_eq!(app.image, "stack_image");
477            }
478        }
479    }
480
481    #[test]
482    fn test_merge_env_vars() {
483        let e0 = EnvVar {
484            name: "e0".to_string(),
485            value: Some("e0".to_string()),
486            value_from_platform: None,
487        };
488        let e1 = EnvVar {
489            name: "e1".to_string(),
490            value: Some("e1".to_string()),
491            value_from_platform: None,
492        };
493        let e1_override = EnvVar {
494            name: "e1".to_string(),
495            value: Some("e1-override".to_string()),
496            value_from_platform: None,
497        };
498        let e2 = EnvVar {
499            name: "e2".to_string(),
500            value: Some("e2".to_string()),
501            value_from_platform: None,
502        };
503        let v0 = vec![e0.clone(), e1.clone()];
504        let v1 = vec![e1.clone(), e1_override.clone(), e2.clone()];
505        let merged = crate::apps::app::merge_env_defaults(v0.clone(), v1.clone());
506        assert_eq!(merged.len(), 3);
507        let merged_envs: HashMap<String, EnvVar> = merged
508            .into_iter()
509            .map(|var| (var.name.clone(), var))
510            .collect();
511        let same = merged_envs.get("e0").unwrap();
512        assert_eq!(same.value, Some("e0".to_string()));
513        let overridden = merged_envs.get("e1").unwrap();
514        assert_eq!(overridden.value, Some("e1-override".to_string()));
515        let same = merged_envs.get("e2").unwrap();
516        assert_eq!(same.value, Some("e2".to_string()));
517    }
518}