tembo_stacks/stacks/
types.rs

1use crate::stacks::config_engines::{
2    mq_config_engine, olap_config_engine, paradedb_config_engine, standard_config_engine,
3    ConfigEngine,
4};
5use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
6use schemars::JsonSchema;
7use serde::{Deserialize, Serialize};
8use tembo_controller::{
9    apis::{coredb_types::CoreDBSpec, postgres_parameters::PgConfig},
10    app_service::types::AppService,
11    defaults::{
12        default_images, default_postgres_exporter_image, default_repository, ImagePerPgVersion,
13    },
14    extensions::types::{Extension, TrunkInstall},
15    postgres_exporter::{PostgresMetrics, QueryConfig},
16};
17use utoipa::ToSchema;
18
19#[derive(
20    Clone,
21    Debug,
22    Default,
23    Serialize,
24    Deserialize,
25    JsonSchema,
26    PartialEq,
27    ToSchema,
28    strum_macros::EnumIter,
29    strum_macros::Display,
30)]
31pub enum StackType {
32    Analytics,
33    Geospatial,
34    MachineLearning,
35    MessageQueue,
36    MongoAlternative,
37    #[default]
38    OLTP,
39    ParadeDB,
40    Standard,
41    Timeseries,
42    VectorDB,
43}
44
45impl std::str::FromStr for StackType {
46    type Err = &'static str;
47
48    fn from_str(value: &str) -> Result<Self, Self::Err> {
49        match value {
50            "Analytics" => Ok(StackType::Analytics),
51            "Geospatial" => Ok(StackType::Geospatial),
52            "MachineLearning" => Ok(StackType::MachineLearning),
53            "MessageQueue" => Ok(StackType::MessageQueue),
54            "MongoAlternative" => Ok(StackType::MongoAlternative),
55            "OLTP" => Ok(StackType::OLTP),
56            "ParadeDB" => Ok(StackType::ParadeDB),
57            "Standard" => Ok(StackType::Standard),
58            "Timeseries" => Ok(StackType::Timeseries),
59            "VectorDB" => Ok(StackType::VectorDB),
60            _ => Err("invalid value"),
61        }
62    }
63}
64
65impl StackType {
66    pub fn as_str(&self) -> &str {
67        match self {
68            StackType::Analytics => "Analytics",
69            StackType::Geospatial => "Geospatial",
70            StackType::MachineLearning => "MachineLearning",
71            StackType::MessageQueue => "MessageQueue",
72            StackType::MongoAlternative => "MongoAlternative",
73            StackType::OLTP => "OLTP",
74            StackType::ParadeDB => "ParadeDB",
75            StackType::Standard => "Standard",
76            StackType::Timeseries => "Timeseries",
77            StackType::VectorDB => "VectorDB",
78        }
79    }
80}
81
82#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq, ToSchema)]
83pub struct Stack {
84    pub name: String,
85    /// specifies any resource constraints that should be applied to an instance of the Stack
86    pub compute_constraints: Option<ComputeConstraint>,
87    pub description: Option<String>,
88    /// Organization hosting the Docker images used in this stack
89    /// Default: "tembo"
90    #[serde(default = "default_organization")]
91    pub organization: String,
92    #[serde(default = "default_stack_repository")]
93    pub repository: String,
94    /// The Docker images to use for each supported Postgres versions
95    ///
96    /// Default:
97    ///     14: "standard-cnpg:14-bffd097"
98    ///     15: "standard-cnpg:15-bffd097"
99    ///     16: "standard-cnpg:16-bffd097"
100    ///     17: "standard-cnpg:17-bffd097"
101    #[serde(default = "default_images")]
102    pub images: ImagePerPgVersion,
103    pub stack_version: Option<String>,
104    pub trunk_installs: Option<Vec<TrunkInstall>>,
105    pub extensions: Option<Vec<Extension>>,
106    /// Postgres metric definition specific to the Stack
107    pub postgres_metrics: Option<QueryConfig>,
108    /// configs are strongly typed so that they can be programmatically transformed
109    pub postgres_config: Option<Vec<PgConfig>>,
110    #[serde(default = "default_config_engine")]
111    pub postgres_config_engine: Option<ConfigEngine>,
112    /// external application services
113    pub infrastructure: Option<Infrastructure>,
114    #[serde(rename = "appServices")]
115    pub app_services: Option<Vec<AppService>>,
116}
117
118impl Stack {
119    // warning: for development purposes only
120    pub fn to_coredb(self, cpu: String, memory: String, storage: String) -> CoreDBSpec {
121        let metrics = PostgresMetrics {
122            image: default_postgres_exporter_image(),
123            enabled: true,
124            queries: self.postgres_metrics.clone(),
125        };
126        let mut mut_self = self.clone();
127        mut_self.infrastructure = Some(Infrastructure {
128            cpu,
129            memory,
130            storage,
131        });
132        let runtime_config = mut_self.runtime_config();
133        CoreDBSpec {
134            image: format!(
135                "{repo}/{image}",
136                repo = self.repository,
137                image = self.images.pg16.as_ref().unwrap()
138            ),
139            extensions: self.extensions.unwrap_or_default(),
140            trunk_installs: self.trunk_installs.unwrap_or_default(),
141            app_services: self.app_services,
142            stack: Some(tembo_controller::apis::coredb_types::Stack {
143                name: self.name,
144                postgres_config: self.postgres_config,
145            }),
146            metrics: Some(metrics),
147            runtime_config,
148            replicas: 1,
149            storage: Quantity("10Gi".to_string()),
150            ..CoreDBSpec::default()
151        }
152    }
153}
154
155#[derive(Clone, Debug, Serialize, Deserialize, ToSchema, JsonSchema, PartialEq)]
156pub struct ComputeConstraint {
157    pub min: Option<ComputeResource>,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, ToSchema, JsonSchema, PartialEq)]
161pub struct ComputeResource {
162    pub cpu: Option<String>,
163    pub memory: Option<String>,
164}
165
166impl Stack {
167    // https://www.postgresql.org/docs/current/runtime-config-resource.html#RUNTIME-CONFIG-RESOURCE-MEMORY
168    pub fn runtime_config(&self) -> Option<Vec<PgConfig>> {
169        match &self.postgres_config_engine {
170            Some(ConfigEngine::Standard) => Some(standard_config_engine(self)),
171            Some(ConfigEngine::OLAP) => Some(olap_config_engine(self)),
172            Some(ConfigEngine::MQ) => Some(mq_config_engine(self)),
173            Some(ConfigEngine::ParadeDB) => Some(paradedb_config_engine(self)),
174            None => Some(standard_config_engine(self)),
175        }
176    }
177}
178
179fn default_organization() -> String {
180    "tembo".into()
181}
182
183fn default_stack_repository() -> String {
184    default_repository()
185}
186
187fn default_config_engine() -> Option<ConfigEngine> {
188    Some(ConfigEngine::Standard)
189}
190
191#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq, ToSchema)]
192pub struct Infrastructure {
193    // generic specs
194    #[serde(default = "default_cpu")]
195    pub cpu: String,
196    #[serde(default = "default_memory")]
197    pub memory: String,
198    #[serde(default = "default_storage")]
199    pub storage: String,
200}
201
202fn default_cpu() -> String {
203    "1".to_owned()
204}
205
206fn default_memory() -> String {
207    "1Gi".to_owned()
208}
209
210fn default_storage() -> String {
211    "10Gi".to_owned()
212}
213
214pub fn merge_options<T>(opt1: Option<Vec<T>>, opt2: Option<Vec<T>>) -> Option<Vec<T>>
215where
216    T: Clone,
217{
218    match (opt1, opt2) {
219        (Some(mut vec1), Some(vec2)) => {
220            vec1.extend(vec2);
221            Some(vec1)
222        }
223        (Some(vec), None) | (None, Some(vec)) => Some(vec),
224        (None, None) => None,
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use crate::stacks::{
231        get_stack,
232        types::{Infrastructure, StackType},
233    };
234    use strum::IntoEnumIterator;
235    use tembo_controller::apis::postgres_parameters::PgConfig;
236
237    #[test]
238    fn test_stacks_definitions() {
239        let mut mq = get_stack(StackType::MessageQueue);
240        let infra = Infrastructure {
241            cpu: "1".to_string(),
242            memory: "1Gi".to_string(),
243            storage: "10Gi".to_string(),
244        };
245        mq.infrastructure = Some(infra);
246
247        // testing the default instance configurations
248        let runtime_configs = mq.runtime_config().expect("expected configs");
249        // convert to vec to hashmap because order is not guaranteed
250        let hm: std::collections::HashMap<String, PgConfig> = runtime_configs
251            .into_iter()
252            .map(|c| (c.name.clone(), c))
253            .collect();
254        let shared_buffers = hm.get("shared_buffers").unwrap();
255        assert_eq!(shared_buffers.name, "shared_buffers");
256        assert_eq!(shared_buffers.value.to_string(), "307MB");
257        let max_connections = hm.get("max_connections").unwrap();
258        assert_eq!(max_connections.name, "max_connections");
259        assert_eq!(max_connections.value.to_string(), "107");
260        assert!(mq.postgres_metrics.is_some());
261        assert!(mq.postgres_config.is_some());
262        let mq_metrics = mq.postgres_metrics.unwrap();
263        assert_eq!(mq_metrics.queries.len(), 1);
264        assert!(mq_metrics.queries.contains_key("pgmq"));
265        assert!(mq_metrics.queries["pgmq"].master);
266        assert_eq!(mq_metrics.queries["pgmq"].metrics.len(), 6);
267
268        let mut std = get_stack(StackType::Standard);
269        let infra = Infrastructure {
270            cpu: "1".to_string(),
271            memory: "2Gi".to_string(),
272            storage: "10Gi".to_string(),
273        };
274        std.infrastructure = Some(infra);
275        println!("STD: {:#?}", std);
276
277        let runtime_configs = std.runtime_config().expect("expected configs");
278        let hm: std::collections::HashMap<String, PgConfig> = runtime_configs
279            .into_iter()
280            .map(|c| (c.name.clone(), c))
281            .collect();
282        let shared_buffers = hm.get("shared_buffers").unwrap();
283        assert_eq!(shared_buffers.name, "shared_buffers");
284        assert_eq!(shared_buffers.value.to_string(), "512MB");
285    }
286
287    #[test]
288    fn test_all_stack_deserialization() {
289        for stack in StackType::iter() {
290            match stack {
291                StackType::Analytics => {
292                    get_stack(StackType::Analytics);
293                }
294                StackType::Geospatial => {
295                    get_stack(StackType::Geospatial);
296                }
297                StackType::MachineLearning => {
298                    get_stack(StackType::MachineLearning);
299                }
300                StackType::MessageQueue => {
301                    get_stack(StackType::MessageQueue);
302                }
303                StackType::MongoAlternative => {
304                    get_stack(StackType::MongoAlternative);
305                }
306                StackType::OLTP => {
307                    get_stack(StackType::OLTP);
308                }
309                StackType::ParadeDB => {
310                    get_stack(StackType::ParadeDB);
311                }
312                StackType::Standard => {
313                    get_stack(StackType::Standard);
314                }
315                StackType::Timeseries => {
316                    get_stack(StackType::Timeseries);
317                }
318                StackType::VectorDB => {
319                    get_stack(StackType::VectorDB);
320                }
321            }
322        }
323    }
324
325    #[test]
326    fn test_all_stack_variants() {
327        for variant in StackType::iter() {
328            let stack_str = variant.as_str();
329            // from string back to StackType
330            let stack_type = stack_str.parse::<StackType>();
331            assert!(
332                stack_type.is_ok(),
333                "stack type missing from_str {:?}",
334                stack_str
335            )
336        }
337    }
338
339    #[test]
340    fn test_compute_constraints() {
341        for variant in StackType::iter() {
342            let stack = get_stack(variant.clone());
343            let maybe_constraints = stack.compute_constraints;
344            if variant == StackType::MachineLearning {
345                // ML stack is only stack currently with constraints
346                let constraints = maybe_constraints.expect("missing ML constraints");
347                let min_constraint = constraints.min.expect("missing min constraint");
348                assert_eq!(min_constraint.cpu, Some("2".to_string()));
349                assert_eq!(min_constraint.memory, Some("4Gi".to_string()));
350            } else {
351                // only ML has compute constraints
352                assert!(maybe_constraints.is_none());
353            }
354        }
355    }
356
357    #[test]
358    fn test_app_metrics() {
359        let vdb = get_stack(StackType::VectorDB);
360
361        let embedding_app = vdb
362            .app_services
363            .unwrap()
364            .into_iter()
365            .find(|app| app.name == "embeddings")
366            .expect("missing embedding app");
367
368        let metrics = embedding_app.metrics.expect("missing metrics");
369        assert_eq!(metrics.path, "/metrics");
370        assert_eq!(metrics.port, 3000);
371    }
372}