controller/extensions/
database_queries.rs

1use crate::{
2    apis::{
3        coredb_types::CoreDB,
4        postgres_parameters::{ConfigValue, PgConfig},
5    },
6    extensions::{
7        types,
8        types::{ExtensionInstallLocation, ExtensionInstallLocationStatus, ExtensionStatus},
9    },
10    Context, RESTARTED_AT,
11};
12use chrono::{DateTime, Utc};
13use k8s_openapi::api::core::v1::Pod;
14use kube::{api::DeleteParams, runtime::controller::Action, Api, ResourceExt};
15use lazy_static::lazy_static;
16use regex::Regex;
17use std::{
18    collections::{BTreeSet, HashMap},
19    sync::Arc,
20    time::Duration,
21};
22use tracing::{debug, error, info, instrument, trace, warn};
23
24lazy_static! {
25    static ref VALID_INPUT: Regex =
26        Regex::new(r"^[a-zA-Z]([a-zA-Z0-9]*[-_]?)*[a-zA-Z0-9]+$").unwrap();
27}
28
29pub fn check_input(input: &str) -> bool {
30    VALID_INPUT.is_match(input)
31}
32
33pub const LIST_SHARED_PRELOAD_LIBRARIES_QUERY: &str = r#"SHOW shared_preload_libraries;"#;
34
35pub const LIST_DATABASES_QUERY: &str =
36    r#"SELECT datname FROM pg_database WHERE datname != 'template0';"#;
37
38pub const LIST_EXTENSIONS_QUERY: &str = r#"select
39distinct on
40(name) *
41from
42(
43select
44    name,
45    version,
46    enabled,
47    schema,
48    description
49from
50    (
51    select
52        t0.extname as name,
53        t0.extversion as version,
54        true as enabled,
55        t1.nspname as schema,
56        comment as description
57    from
58        (
59        select
60            extnamespace,
61            extname,
62            extversion
63        from
64            pg_extension
65) t0,
66        (
67        select
68            oid,
69            nspname
70        from
71            pg_namespace
72) t1,
73        (
74        select
75            name,
76            comment
77        from
78            pg_catalog.pg_available_extensions
79) t2
80    where
81        t1.oid = t0.extnamespace
82        and t2.name = t0.extname
83) installed
84union
85select
86    name,
87    default_version as version,
88    false as enabled,
89    'public' as schema,
90    comment as description
91from
92    pg_catalog.pg_available_extensions
93order by
94    enabled asc
95) combined
96order by
97name asc,
98enabled desc
99"#;
100
101#[derive(Debug)]
102pub struct ExtRow {
103    pub name: String,
104    pub description: String,
105    pub version: String,
106    pub enabled: bool,
107    pub schema: String,
108}
109
110#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
111pub async fn list_shared_preload_libraries(
112    cdb: &CoreDB,
113    ctx: Arc<Context>,
114) -> Result<Vec<String>, Action> {
115    let psql_out = cdb
116        .psql(
117            LIST_SHARED_PRELOAD_LIBRARIES_QUERY.to_owned(),
118            "postgres".to_owned(),
119            ctx,
120        )
121        .await?;
122    let result_string = match psql_out.stdout {
123        None => {
124            error!(
125                "No stdout from psql when looking for shared_preload_libraries for {}",
126                cdb.metadata.name.clone().unwrap()
127            );
128            return Err(Action::requeue(Duration::from_secs(300)));
129        }
130        Some(out) => out,
131    };
132    let result = parse_sql_output(&result_string);
133    let mut libraries: Vec<String> = vec![];
134    if result.len() == 1 {
135        libraries = result[0].split(',').map(|s| s.trim().to_string()).collect();
136    }
137    debug!(
138        "{}: Found shared_preload_libraries: {:?}",
139        cdb.metadata.name.clone().unwrap(),
140        libraries.clone()
141    );
142    Ok(libraries)
143}
144
145/// lists all extensions in a single database
146#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
147pub async fn list_extensions(
148    cdb: &CoreDB,
149    ctx: Arc<Context>,
150    database: &str,
151) -> Result<Vec<ExtRow>, Action> {
152    let psql_out = cdb
153        .psql(LIST_EXTENSIONS_QUERY.to_owned(), database.to_owned(), ctx)
154        .await?;
155    let result_string = psql_out.stdout.unwrap();
156    Ok(parse_extensions(&result_string))
157}
158
159/// List all configuration parameters
160#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
161pub async fn list_config_params(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<PgConfig>, Action> {
162    let psql_out = cdb
163        .psql("SHOW ALL;".to_owned(), "postgres".to_owned(), ctx)
164        .await?;
165    let result_string = match psql_out.stdout {
166        None => {
167            error!(
168                "No stdout from psql when looking for config values for {}",
169                cdb.metadata.name.clone().unwrap()
170            );
171            return Err(Action::requeue(Duration::from_secs(300)));
172        }
173        Some(out) => out,
174    };
175    Ok(parse_config_params(&result_string))
176}
177
178/// Returns Ok if the given database is running (i.e. not restarting)
179#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
180pub async fn is_not_restarting(
181    cdb: &CoreDB,
182    ctx: Arc<Context>,
183    database: &str,
184) -> Result<Option<DateTime<Utc>>, Action> {
185    // chrono strftime declaration to parse Postgres timestamps
186    const PG_TIMESTAMP_DECL: &str = "%Y-%m-%d %H:%M:%S.%f%#z";
187
188    fn parse_psql_output(output: &str) -> Option<&str> {
189        output.lines().nth(2).map(str::trim)
190    }
191
192    let cdb_name = cdb.name_any();
193
194    let pg_postmaster_result = cdb
195        .psql(
196            "select pg_postmaster_start_time();".to_owned(),
197            database.to_owned(),
198            ctx.clone(),
199        )
200        .await;
201
202    let Some(restarted_at) = cdb.annotations().get(RESTARTED_AT) else {
203        // We don't have the annotation, so we are not restarting
204        // return pg_postmaster_start_time if we have it.
205        let result = pg_postmaster_result
206            .ok()
207            .and_then(|result| result.stdout)
208            .as_ref() // Convert String to &str for parse_psql_output
209            .and_then(|stdout| parse_psql_output(stdout))
210            .and_then(|pg_postmaster_start_time_str| {
211                let pg_postmaster_start_time = pg_postmaster_start_time_str.to_string();
212                DateTime::parse_from_str(&pg_postmaster_start_time, PG_TIMESTAMP_DECL)
213                    .ok()
214                    .map(|dt_with_offset| dt_with_offset.with_timezone(&Utc))
215            });
216        return Ok(result);
217    };
218
219    let restarted_requested_at: DateTime<Utc> = DateTime::parse_from_rfc3339(restarted_at)
220        .map_err(|err| {
221            error!("{cdb_name}: Failed to deserialize DateTime from `restartedAt`: {err}");
222
223            Action::requeue(Duration::from_secs(300))
224        })?
225        .into();
226
227    let pg_postmaster = match pg_postmaster_result {
228        Ok(result) => result.stdout.ok_or_else(|| {
229            error!("{cdb_name}: select pg_postmaster_start_time() had no stdout");
230            Action::requeue(Duration::from_secs(300))
231        })?,
232        Err(_) => {
233            let pod = cdb
234                .primary_pod_cnpg_ready_or_not(ctx.client.clone())
235                .await?;
236
237            let pod_not_ready_duration = match get_pod_not_ready_duration(pod.clone()) {
238                Ok(Some(duration)) => {
239                    warn!("Primary pod has not been ready for {:?}", duration);
240                    duration
241                }
242                Ok(None) => {
243                    warn!("{cdb_name}: Primary pod is ready or doesn't have a Ready condition, but we could not execute a command.");
244                    return Err(Action::requeue(Duration::from_secs(5)));
245                }
246                Err(_e) => {
247                    error!(
248                        "{cdb_name}: Failed to determine how long the primary has not been ready"
249                    );
250                    return Err(Action::requeue(Duration::from_secs(300)));
251                }
252            };
253
254            let pod_creation_timestamp = pod.metadata.creation_timestamp.ok_or_else(|| {
255                error!("{cdb_name}: Pod has no creation timestamp");
256                Action::requeue(Duration::from_secs(300))
257            })?;
258
259            let pod_age = Utc::now() - pod_creation_timestamp.0;
260
261            // Check if the pod is older than restarted_at, and the pod has been not ready for over 30 seconds
262            if pod_age > restarted_requested_at - Utc::now()
263                && pod_not_ready_duration > Duration::from_secs(30)
264            {
265                error!("{cdb_name}: Primary pod is older than restarted_at and has been not ready for over 30 seconds. Deleting the pod");
266                let pods_api =
267                    Api::<Pod>::namespaced(ctx.client.clone(), &pod.metadata.namespace.unwrap());
268                let delete_result = pods_api
269                    .delete(&pod.metadata.name.unwrap(), &DeleteParams::default())
270                    .await;
271                if let Err(e) = delete_result {
272                    error!("{cdb_name}: Failed to delete primary pod: {:?}", e);
273                    return Err(Action::requeue(Duration::from_secs(300)));
274                }
275                return Err(Action::requeue(Duration::from_secs(10)));
276            }
277            return Err(Action::requeue(Duration::from_secs(15)));
278        }
279    };
280
281    let pg_postmaster_start_time = parse_psql_output(&pg_postmaster).ok_or_else(|| {
282        error!("{cdb_name}: failed to parse pg_postmaster_start_time() output");
283
284        Action::requeue(Duration::from_secs(300))
285    })?;
286
287    let server_started_at: DateTime<Utc> = DateTime::parse_from_str(pg_postmaster_start_time, PG_TIMESTAMP_DECL)
288        .map_err(|err| {
289            tracing::error!(
290                "{cdb_name}: Failed to deserialize DateTime from `pg_postmaster_start_time`: {err}, received '{pg_postmaster_start_time}'"
291            );
292
293            Action::requeue(Duration::from_secs(300))
294        })?
295        .into();
296
297    if server_started_at >= restarted_requested_at {
298        // Server started after the moment we requested it to restart,
299        // meaning the restart is done
300        debug!("Restart is complete for {}", cdb_name);
301        Ok(Some(server_started_at))
302    } else {
303        // Server hasn't even started restarting yet
304        warn!("Restart is not complete for {}, requeuing", cdb_name);
305        Err(Action::requeue(Duration::from_secs(5)))
306    }
307}
308
309fn get_pod_not_ready_duration(pod: Pod) -> Result<Option<Duration>, Box<dyn std::error::Error>> {
310    let status = pod.status.ok_or("Pod has no status information")?;
311    if let Some(conditions) = status.conditions {
312        for condition in conditions {
313            if condition.type_ == "Ready" {
314                if condition.status == "False" {
315                    // Extract the last transition time when the pod was not ready
316                    let last_transition = condition
317                        .last_transition_time
318                        .ok_or("No last transition time for Ready condition")?;
319                    let last_not_ready_time = last_transition.0;
320                    let duration_since_not_ready = Utc::now() - last_not_ready_time;
321
322                    let std_duration = match duration_since_not_ready.to_std() {
323                        Ok(duration) => duration,
324                        Err(_) => {
325                            error!("Failed to convert duration to std::time::Duration");
326                            return Ok(None);
327                        }
328                    };
329
330                    return Ok(Some(std_duration));
331                }
332                break;
333            }
334        }
335    }
336    Ok(None)
337}
338
339#[instrument(skip(psql_str))]
340pub fn parse_extensions(psql_str: &str) -> Vec<ExtRow> {
341    let mut extensions = vec![];
342    for line in psql_str.lines().skip(2) {
343        let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
344        if fields.len() < 5 {
345            debug!("Done:{:?}", fields);
346            continue;
347        }
348        let package = ExtRow {
349            name: fields[0].to_owned(),
350            version: fields[1].to_owned(),
351            enabled: fields[2] == "t",
352            schema: fields[3].to_owned(),
353            description: fields[4].to_owned(),
354        };
355        extensions.push(package);
356    }
357    let num_extensions = extensions.len();
358    debug!("Found {} extensions", num_extensions);
359    extensions
360}
361
362/// returns all the databases in an instance
363#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
364pub async fn list_databases(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<String>, Action> {
365    let _client = ctx.client.clone();
366    let psql_out = cdb
367        .psql(LIST_DATABASES_QUERY.to_owned(), "postgres".to_owned(), ctx)
368        .await?;
369    let result_string = psql_out.stdout.unwrap();
370    Ok(parse_sql_output(&result_string))
371}
372
373#[instrument(skip(psql_str))]
374pub fn parse_sql_output(psql_str: &str) -> Vec<String> {
375    let mut results = vec![];
376    for line in psql_str.lines().skip(2) {
377        let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
378        if fields.is_empty()
379            || fields[0].is_empty()
380            || fields[0].contains("rows)")
381            || fields[0].contains("row)")
382        {
383            debug!("Done:{:?}", fields);
384            continue;
385        }
386        results.push(fields[0].to_string());
387    }
388    let num_results = results.len();
389    debug!("Found {} results", num_results);
390    results
391}
392
393/// Parse the output of `SHOW ALL` to get the parameter and its value. Return Vec<PgConfig>
394#[instrument(skip(psql_str))]
395pub fn parse_config_params(psql_str: &str) -> Vec<PgConfig> {
396    let mut results = vec![];
397    for line in psql_str.lines().skip(2) {
398        let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
399        if fields.len() < 2 {
400            debug!("Skipping last line:{:?}", fields);
401            continue;
402        }
403        // If value is multiple, Set as ConfigValue::Multiple
404        if fields[1].contains(',') {
405            let values: BTreeSet<String> =
406                fields[1].split(',').map(|s| s.trim().to_owned()).collect();
407            let config = PgConfig {
408                name: fields[0].to_owned(),
409                value: ConfigValue::Multiple(values),
410            };
411            results.push(config);
412            continue;
413        }
414        let config = PgConfig {
415            name: fields[0].to_owned(),
416            value: ConfigValue::Single(fields[1].to_owned()),
417        };
418        results.push(config);
419    }
420    let num_results = results.len();
421    debug!("Found {} config values", num_results);
422    // Log config values to debug
423    for result in &results {
424        trace!("Config value: {:?}", result);
425    }
426    results
427}
428
429/// list databases then get all extensions from each database
430#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
431pub async fn get_all_extensions(
432    cdb: &CoreDB,
433    ctx: Arc<Context>,
434) -> Result<Vec<ExtensionStatus>, Action> {
435    let databases = list_databases(cdb, ctx.clone()).await?;
436    debug!("databases: {:?}", databases);
437
438    let mut ext_hashmap: HashMap<(String, String), Vec<ExtensionInstallLocationStatus>> =
439        HashMap::new();
440    // query every database for extensions
441    // transform results by extension name, rather than by database
442    for db in databases {
443        let extensions = list_extensions(cdb, ctx.clone(), &db).await?;
444        for ext in extensions {
445            let extlocation = ExtensionInstallLocationStatus {
446                database: db.clone(),
447                version: Some(ext.version),
448                enabled: Some(ext.enabled),
449                schema: Some(ext.schema),
450                error: None,
451                error_message: None,
452            };
453            ext_hashmap
454                .entry((ext.name, ext.description))
455                // .or_insert_with(Vec::new)
456                .or_default()
457                .push(extlocation);
458        }
459    }
460
461    let mut ext_spec: Vec<ExtensionStatus> = Vec::new();
462    for ((extname, extdescr), ext_locations) in &ext_hashmap {
463        ext_spec.push(ExtensionStatus {
464            name: extname.clone(),
465            description: Some(extdescr.clone()),
466            locations: ext_locations.clone(),
467        });
468    }
469    // put them in order
470    ext_spec.sort_by_key(|e| e.name.clone());
471
472    Ok(ext_spec)
473}
474
475pub enum ToggleError {
476    WithDescription(String),
477    WithAction(Action),
478}
479
480/// Handles create/drop an extension location
481/// On failure, returns an error message
482#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any(), ext_name, ext_loc))]
483pub async fn toggle_extension(
484    cdb: &CoreDB,
485    ext_name: &str,
486    ext_loc: ExtensionInstallLocation,
487    ctx: Arc<Context>,
488) -> Result<(), ToggleError> {
489    let coredb_name = cdb.name_any();
490    if !check_input(ext_name) {
491        warn!(
492            "Extension is not formatted properly. Skipping operation. {}",
493            &coredb_name
494        );
495        return Err(ToggleError::WithDescription(
496            "Extension name is not formatted properly".into(),
497        ));
498    }
499    let database_name = ext_loc.database.to_owned();
500    if !check_input(&database_name) {
501        warn!(
502            "Database name is not formatted properly. Skipping operation. {}",
503            &coredb_name
504        );
505        return Err(ToggleError::WithDescription(
506            "Database name is not formatted properly".into(),
507        ));
508    }
509
510    let command = types::generate_extension_enable_cmd(ext_name, &ext_loc)
511        .map_err(ToggleError::WithDescription)?;
512
513    let result = cdb
514        .psql(command.clone(), database_name.clone(), ctx.clone())
515        .await;
516
517    match result {
518        Ok(psql_output) => match psql_output.success {
519            true => {
520                info!(
521                    "Successfully toggled extension {} in database {}, instance {}",
522                    ext_name, database_name, &coredb_name
523                );
524            }
525            false => {
526                warn!(
527                    "Failed to toggle extension {} in database {}, instance {}",
528                    ext_name, database_name, &coredb_name
529                );
530                match psql_output.stderr {
531                    Some(stderr) => {
532                        return Err(ToggleError::WithDescription(stderr));
533                    }
534                    None => {
535                        return Err(ToggleError::WithDescription("Failed to enable extension, and found no output. Please try again. If this issue persists, contact support.".to_string()));
536                    }
537                }
538            }
539        },
540        Err(e) => {
541            error!(
542                "Failed to reconcile extension because of kube exec error: {:?}",
543                e
544            );
545            return Err(ToggleError::WithAction(e));
546        }
547    }
548    Ok(())
549}
550
551#[cfg(test)]
552mod tests {
553    use crate::{
554        apis::postgres_parameters::PgConfig,
555        extensions::database_queries::{
556            check_input, parse_config_params, parse_extensions, parse_sql_output,
557        },
558    };
559
560    #[test]
561    fn test_parse_databases() {
562        let three_db = " datname
563        ----------
564         postgres
565         cat
566         dog
567        (3 rows)
568
569         ";
570
571        let rows = parse_sql_output(three_db);
572        println!("{:?}", rows);
573        assert_eq!(rows.len(), 3);
574        assert_eq!(rows[0], "postgres");
575        assert_eq!(rows[1], "cat");
576        assert_eq!(rows[2], "dog");
577
578        let one_db = " datname
579        ----------
580         postgres
581        (1 row)
582
583         ";
584
585        let rows = parse_sql_output(one_db);
586        println!("{:?}", rows);
587        assert_eq!(rows.len(), 1);
588        assert_eq!(rows[0], "postgres");
589    }
590
591    #[test]
592    fn test_parse_extensions() {
593        let ext_psql = "        name        | version | enabled |   schema   |                              description
594        --------------------+---------+---------+------------+------------------------------------------------------------------------
595         adminpack          | 2.1     | f       | public     | administrative functions for PostgreSQL
596         amcheck            | 1.3     | f       | public     | functions for verifying relation integrity
597         autoinc            | 1.0     | f       | public     | functions for autoincrementing fields
598         bloom              | 1.0     | f       | public     | bloom access method - signature file based index
599         btree_gin          | 1.3     | f       | public     | support for indexing common datatypes in GIN
600         btree_gist         | 1.7     | f       | public     | support for indexing common datatypes in GiST
601         citext             | 1.6     | f       | public     | data type for case-insensitive character strings
602         cube               | 1.5     | f       | public     | data type for multidimensional cubes
603         dblink             | 1.2     | f       | public     | connect to other PostgreSQL databases from within a database
604         (9 rows)";
605
606        let ext = parse_extensions(ext_psql);
607        assert_eq!(ext.len(), 9);
608        assert_eq!(ext[0].name, "adminpack");
609        assert!(!ext[0].enabled);
610        assert_eq!(ext[0].version, "2.1".to_owned());
611        assert_eq!(ext[0].schema, "public".to_owned());
612        assert_eq!(
613            ext[0].description,
614            "administrative functions for PostgreSQL".to_owned()
615        );
616
617        assert_eq!(ext[8].name, "dblink");
618        assert!(!ext[8].enabled);
619        assert_eq!(ext[8].version, "1.2".to_owned());
620        assert_eq!(ext[8].schema, "public".to_owned());
621        assert_eq!(
622            ext[8].description,
623            "connect to other PostgreSQL databases from within a database".to_owned()
624        );
625    }
626
627    #[test]
628    fn test_parse_config_params() {
629        let config_psql = "        name        | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart
630        ---------------------+---------+------+----------+------------+------------+---------+---------+--------+---------+---------+----------+----------+-----------+------------+------------+-----------------
631         allow_system_table_mods | off     |      | Developer |            |            | postmas | bool    |        |         |         |          | off      | off       |            |            | f
632         application_name      |         |      |          |            |            | user    | string  |        |         |         |          |          |           |            |            |
633         archive_command       |         |      |          |            |            | sighup  | string  |        |         |         |          |          |           |            |            |
634         archive_mode          | off     |      |          |            |            | sighup  | enum    |        |         |         | on,off   | off      | off       |            |            | f";
635        let config = parse_config_params(config_psql);
636        assert_eq!(config.len(), 4);
637        assert_eq!(
638            config[0],
639            PgConfig {
640                name: "allow_system_table_mods".to_owned(),
641                value: "off".parse().unwrap(),
642            }
643        );
644        assert_eq!(
645            config[1],
646            PgConfig {
647                name: "application_name".to_owned(),
648                value: "".parse().unwrap(),
649            }
650        );
651        assert_eq!(
652            config[2],
653            PgConfig {
654                name: "archive_command".to_owned(),
655                value: "".parse().unwrap(),
656            }
657        );
658        assert_eq!(
659            config[3],
660            PgConfig {
661                name: "archive_mode".to_owned(),
662                value: "off".parse().unwrap(),
663            }
664        );
665    }
666
667    #[test]
668    fn test_check_input() {
669        let invalids = [
670            "extension--",
671            "data;",
672            "invalid^#$$characters",
673            ";invalid",
674            "",
675        ];
676        for i in invalids.iter() {
677            assert!(!check_input(i), "input {} should be invalid", i);
678        }
679
680        let valids = [
681            "extension_a",
682            "schema_abc",
683            "extension",
684            "NewExtension",
685            "NewExtension123",
686            "postgis_tiger_geocoder-3",
687            "address_standardizer-3",
688            "xml2",
689        ];
690        for i in valids.iter() {
691            assert!(check_input(i), "input {} should be valid", i);
692        }
693    }
694}