1use crate::{
2 apis::{
3 coredb_types::CoreDB,
4 postgres_parameters::{ConfigValue, PgConfig},
5 },
6 extensions::{
7 types,
8 types::{ExtensionInstallLocation, ExtensionInstallLocationStatus, ExtensionStatus},
9 },
10 Context, RESTARTED_AT,
11};
12use chrono::{DateTime, Utc};
13use k8s_openapi::api::core::v1::Pod;
14use kube::{api::DeleteParams, runtime::controller::Action, Api, ResourceExt};
15use lazy_static::lazy_static;
16use regex::Regex;
17use std::{
18 collections::{BTreeSet, HashMap},
19 sync::Arc,
20 time::Duration,
21};
22use tracing::{debug, error, info, instrument, trace, warn};
23
24lazy_static! {
25 static ref VALID_INPUT: Regex =
26 Regex::new(r"^[a-zA-Z]([a-zA-Z0-9]*[-_]?)*[a-zA-Z0-9]+$").unwrap();
27}
28
29pub fn check_input(input: &str) -> bool {
30 VALID_INPUT.is_match(input)
31}
32
33pub const LIST_SHARED_PRELOAD_LIBRARIES_QUERY: &str = r#"SHOW shared_preload_libraries;"#;
34
35pub const LIST_DATABASES_QUERY: &str =
36 r#"SELECT datname FROM pg_database WHERE datname != 'template0';"#;
37
38pub const LIST_EXTENSIONS_QUERY: &str = r#"select
39distinct on
40(name) *
41from
42(
43select
44 name,
45 version,
46 enabled,
47 schema,
48 description
49from
50 (
51 select
52 t0.extname as name,
53 t0.extversion as version,
54 true as enabled,
55 t1.nspname as schema,
56 comment as description
57 from
58 (
59 select
60 extnamespace,
61 extname,
62 extversion
63 from
64 pg_extension
65) t0,
66 (
67 select
68 oid,
69 nspname
70 from
71 pg_namespace
72) t1,
73 (
74 select
75 name,
76 comment
77 from
78 pg_catalog.pg_available_extensions
79) t2
80 where
81 t1.oid = t0.extnamespace
82 and t2.name = t0.extname
83) installed
84union
85select
86 name,
87 default_version as version,
88 false as enabled,
89 'public' as schema,
90 comment as description
91from
92 pg_catalog.pg_available_extensions
93order by
94 enabled asc
95) combined
96order by
97name asc,
98enabled desc
99"#;
100
101#[derive(Debug)]
102pub struct ExtRow {
103 pub name: String,
104 pub description: String,
105 pub version: String,
106 pub enabled: bool,
107 pub schema: String,
108}
109
110#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
111pub async fn list_shared_preload_libraries(
112 cdb: &CoreDB,
113 ctx: Arc<Context>,
114) -> Result<Vec<String>, Action> {
115 let psql_out = cdb
116 .psql(
117 LIST_SHARED_PRELOAD_LIBRARIES_QUERY.to_owned(),
118 "postgres".to_owned(),
119 ctx,
120 )
121 .await?;
122 let result_string = match psql_out.stdout {
123 None => {
124 error!(
125 "No stdout from psql when looking for shared_preload_libraries for {}",
126 cdb.metadata.name.clone().unwrap()
127 );
128 return Err(Action::requeue(Duration::from_secs(300)));
129 }
130 Some(out) => out,
131 };
132 let result = parse_sql_output(&result_string);
133 let mut libraries: Vec<String> = vec![];
134 if result.len() == 1 {
135 libraries = result[0].split(',').map(|s| s.trim().to_string()).collect();
136 }
137 debug!(
138 "{}: Found shared_preload_libraries: {:?}",
139 cdb.metadata.name.clone().unwrap(),
140 libraries.clone()
141 );
142 Ok(libraries)
143}
144
145#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
147pub async fn list_extensions(
148 cdb: &CoreDB,
149 ctx: Arc<Context>,
150 database: &str,
151) -> Result<Vec<ExtRow>, Action> {
152 let psql_out = cdb
153 .psql(LIST_EXTENSIONS_QUERY.to_owned(), database.to_owned(), ctx)
154 .await?;
155 let result_string = psql_out.stdout.unwrap();
156 Ok(parse_extensions(&result_string))
157}
158
159#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
161pub async fn list_config_params(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<PgConfig>, Action> {
162 let psql_out = cdb
163 .psql("SHOW ALL;".to_owned(), "postgres".to_owned(), ctx)
164 .await?;
165 let result_string = match psql_out.stdout {
166 None => {
167 error!(
168 "No stdout from psql when looking for config values for {}",
169 cdb.metadata.name.clone().unwrap()
170 );
171 return Err(Action::requeue(Duration::from_secs(300)));
172 }
173 Some(out) => out,
174 };
175 Ok(parse_config_params(&result_string))
176}
177
178#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
180pub async fn is_not_restarting(
181 cdb: &CoreDB,
182 ctx: Arc<Context>,
183 database: &str,
184) -> Result<Option<DateTime<Utc>>, Action> {
185 const PG_TIMESTAMP_DECL: &str = "%Y-%m-%d %H:%M:%S.%f%#z";
187
188 fn parse_psql_output(output: &str) -> Option<&str> {
189 output.lines().nth(2).map(str::trim)
190 }
191
192 let cdb_name = cdb.name_any();
193
194 let pg_postmaster_result = cdb
195 .psql(
196 "select pg_postmaster_start_time();".to_owned(),
197 database.to_owned(),
198 ctx.clone(),
199 )
200 .await;
201
202 let Some(restarted_at) = cdb.annotations().get(RESTARTED_AT) else {
203 let result = pg_postmaster_result
206 .ok()
207 .and_then(|result| result.stdout)
208 .as_ref() .and_then(|stdout| parse_psql_output(stdout))
210 .and_then(|pg_postmaster_start_time_str| {
211 let pg_postmaster_start_time = pg_postmaster_start_time_str.to_string();
212 DateTime::parse_from_str(&pg_postmaster_start_time, PG_TIMESTAMP_DECL)
213 .ok()
214 .map(|dt_with_offset| dt_with_offset.with_timezone(&Utc))
215 });
216 return Ok(result);
217 };
218
219 let restarted_requested_at: DateTime<Utc> = DateTime::parse_from_rfc3339(restarted_at)
220 .map_err(|err| {
221 error!("{cdb_name}: Failed to deserialize DateTime from `restartedAt`: {err}");
222
223 Action::requeue(Duration::from_secs(300))
224 })?
225 .into();
226
227 let pg_postmaster = match pg_postmaster_result {
228 Ok(result) => result.stdout.ok_or_else(|| {
229 error!("{cdb_name}: select pg_postmaster_start_time() had no stdout");
230 Action::requeue(Duration::from_secs(300))
231 })?,
232 Err(_) => {
233 let pod = cdb
234 .primary_pod_cnpg_ready_or_not(ctx.client.clone())
235 .await?;
236
237 let pod_not_ready_duration = match get_pod_not_ready_duration(pod.clone()) {
238 Ok(Some(duration)) => {
239 warn!("Primary pod has not been ready for {:?}", duration);
240 duration
241 }
242 Ok(None) => {
243 warn!("{cdb_name}: Primary pod is ready or doesn't have a Ready condition, but we could not execute a command.");
244 return Err(Action::requeue(Duration::from_secs(5)));
245 }
246 Err(_e) => {
247 error!(
248 "{cdb_name}: Failed to determine how long the primary has not been ready"
249 );
250 return Err(Action::requeue(Duration::from_secs(300)));
251 }
252 };
253
254 let pod_creation_timestamp = pod.metadata.creation_timestamp.ok_or_else(|| {
255 error!("{cdb_name}: Pod has no creation timestamp");
256 Action::requeue(Duration::from_secs(300))
257 })?;
258
259 let pod_age = Utc::now() - pod_creation_timestamp.0;
260
261 if pod_age > restarted_requested_at - Utc::now()
263 && pod_not_ready_duration > Duration::from_secs(30)
264 {
265 error!("{cdb_name}: Primary pod is older than restarted_at and has been not ready for over 30 seconds. Deleting the pod");
266 let pods_api =
267 Api::<Pod>::namespaced(ctx.client.clone(), &pod.metadata.namespace.unwrap());
268 let delete_result = pods_api
269 .delete(&pod.metadata.name.unwrap(), &DeleteParams::default())
270 .await;
271 if let Err(e) = delete_result {
272 error!("{cdb_name}: Failed to delete primary pod: {:?}", e);
273 return Err(Action::requeue(Duration::from_secs(300)));
274 }
275 return Err(Action::requeue(Duration::from_secs(10)));
276 }
277 return Err(Action::requeue(Duration::from_secs(15)));
278 }
279 };
280
281 let pg_postmaster_start_time = parse_psql_output(&pg_postmaster).ok_or_else(|| {
282 error!("{cdb_name}: failed to parse pg_postmaster_start_time() output");
283
284 Action::requeue(Duration::from_secs(300))
285 })?;
286
287 let server_started_at: DateTime<Utc> = DateTime::parse_from_str(pg_postmaster_start_time, PG_TIMESTAMP_DECL)
288 .map_err(|err| {
289 tracing::error!(
290 "{cdb_name}: Failed to deserialize DateTime from `pg_postmaster_start_time`: {err}, received '{pg_postmaster_start_time}'"
291 );
292
293 Action::requeue(Duration::from_secs(300))
294 })?
295 .into();
296
297 if server_started_at >= restarted_requested_at {
298 debug!("Restart is complete for {}", cdb_name);
301 Ok(Some(server_started_at))
302 } else {
303 warn!("Restart is not complete for {}, requeuing", cdb_name);
305 Err(Action::requeue(Duration::from_secs(5)))
306 }
307}
308
309fn get_pod_not_ready_duration(pod: Pod) -> Result<Option<Duration>, Box<dyn std::error::Error>> {
310 let status = pod.status.ok_or("Pod has no status information")?;
311 if let Some(conditions) = status.conditions {
312 for condition in conditions {
313 if condition.type_ == "Ready" {
314 if condition.status == "False" {
315 let last_transition = condition
317 .last_transition_time
318 .ok_or("No last transition time for Ready condition")?;
319 let last_not_ready_time = last_transition.0;
320 let duration_since_not_ready = Utc::now() - last_not_ready_time;
321
322 let std_duration = match duration_since_not_ready.to_std() {
323 Ok(duration) => duration,
324 Err(_) => {
325 error!("Failed to convert duration to std::time::Duration");
326 return Ok(None);
327 }
328 };
329
330 return Ok(Some(std_duration));
331 }
332 break;
333 }
334 }
335 }
336 Ok(None)
337}
338
339#[instrument(skip(psql_str))]
340pub fn parse_extensions(psql_str: &str) -> Vec<ExtRow> {
341 let mut extensions = vec![];
342 for line in psql_str.lines().skip(2) {
343 let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
344 if fields.len() < 5 {
345 debug!("Done:{:?}", fields);
346 continue;
347 }
348 let package = ExtRow {
349 name: fields[0].to_owned(),
350 version: fields[1].to_owned(),
351 enabled: fields[2] == "t",
352 schema: fields[3].to_owned(),
353 description: fields[4].to_owned(),
354 };
355 extensions.push(package);
356 }
357 let num_extensions = extensions.len();
358 debug!("Found {} extensions", num_extensions);
359 extensions
360}
361
362#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
364pub async fn list_databases(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<String>, Action> {
365 let _client = ctx.client.clone();
366 let psql_out = cdb
367 .psql(LIST_DATABASES_QUERY.to_owned(), "postgres".to_owned(), ctx)
368 .await?;
369 let result_string = psql_out.stdout.unwrap();
370 Ok(parse_sql_output(&result_string))
371}
372
373#[instrument(skip(psql_str))]
374pub fn parse_sql_output(psql_str: &str) -> Vec<String> {
375 let mut results = vec![];
376 for line in psql_str.lines().skip(2) {
377 let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
378 if fields.is_empty()
379 || fields[0].is_empty()
380 || fields[0].contains("rows)")
381 || fields[0].contains("row)")
382 {
383 debug!("Done:{:?}", fields);
384 continue;
385 }
386 results.push(fields[0].to_string());
387 }
388 let num_results = results.len();
389 debug!("Found {} results", num_results);
390 results
391}
392
393#[instrument(skip(psql_str))]
395pub fn parse_config_params(psql_str: &str) -> Vec<PgConfig> {
396 let mut results = vec![];
397 for line in psql_str.lines().skip(2) {
398 let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
399 if fields.len() < 2 {
400 debug!("Skipping last line:{:?}", fields);
401 continue;
402 }
403 if fields[1].contains(',') {
405 let values: BTreeSet<String> =
406 fields[1].split(',').map(|s| s.trim().to_owned()).collect();
407 let config = PgConfig {
408 name: fields[0].to_owned(),
409 value: ConfigValue::Multiple(values),
410 };
411 results.push(config);
412 continue;
413 }
414 let config = PgConfig {
415 name: fields[0].to_owned(),
416 value: ConfigValue::Single(fields[1].to_owned()),
417 };
418 results.push(config);
419 }
420 let num_results = results.len();
421 debug!("Found {} config values", num_results);
422 for result in &results {
424 trace!("Config value: {:?}", result);
425 }
426 results
427}
428
429#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any()))]
431pub async fn get_all_extensions(
432 cdb: &CoreDB,
433 ctx: Arc<Context>,
434) -> Result<Vec<ExtensionStatus>, Action> {
435 let databases = list_databases(cdb, ctx.clone()).await?;
436 debug!("databases: {:?}", databases);
437
438 let mut ext_hashmap: HashMap<(String, String), Vec<ExtensionInstallLocationStatus>> =
439 HashMap::new();
440 for db in databases {
443 let extensions = list_extensions(cdb, ctx.clone(), &db).await?;
444 for ext in extensions {
445 let extlocation = ExtensionInstallLocationStatus {
446 database: db.clone(),
447 version: Some(ext.version),
448 enabled: Some(ext.enabled),
449 schema: Some(ext.schema),
450 error: None,
451 error_message: None,
452 };
453 ext_hashmap
454 .entry((ext.name, ext.description))
455 .or_default()
457 .push(extlocation);
458 }
459 }
460
461 let mut ext_spec: Vec<ExtensionStatus> = Vec::new();
462 for ((extname, extdescr), ext_locations) in &ext_hashmap {
463 ext_spec.push(ExtensionStatus {
464 name: extname.clone(),
465 description: Some(extdescr.clone()),
466 locations: ext_locations.clone(),
467 });
468 }
469 ext_spec.sort_by_key(|e| e.name.clone());
471
472 Ok(ext_spec)
473}
474
475pub enum ToggleError {
476 WithDescription(String),
477 WithAction(Action),
478}
479
480#[instrument(skip(cdb, ctx), fields(cdb_name = %cdb.name_any(), ext_name, ext_loc))]
483pub async fn toggle_extension(
484 cdb: &CoreDB,
485 ext_name: &str,
486 ext_loc: ExtensionInstallLocation,
487 ctx: Arc<Context>,
488) -> Result<(), ToggleError> {
489 let coredb_name = cdb.name_any();
490 if !check_input(ext_name) {
491 warn!(
492 "Extension is not formatted properly. Skipping operation. {}",
493 &coredb_name
494 );
495 return Err(ToggleError::WithDescription(
496 "Extension name is not formatted properly".into(),
497 ));
498 }
499 let database_name = ext_loc.database.to_owned();
500 if !check_input(&database_name) {
501 warn!(
502 "Database name is not formatted properly. Skipping operation. {}",
503 &coredb_name
504 );
505 return Err(ToggleError::WithDescription(
506 "Database name is not formatted properly".into(),
507 ));
508 }
509
510 let command = types::generate_extension_enable_cmd(ext_name, &ext_loc)
511 .map_err(ToggleError::WithDescription)?;
512
513 let result = cdb
514 .psql(command.clone(), database_name.clone(), ctx.clone())
515 .await;
516
517 match result {
518 Ok(psql_output) => match psql_output.success {
519 true => {
520 info!(
521 "Successfully toggled extension {} in database {}, instance {}",
522 ext_name, database_name, &coredb_name
523 );
524 }
525 false => {
526 warn!(
527 "Failed to toggle extension {} in database {}, instance {}",
528 ext_name, database_name, &coredb_name
529 );
530 match psql_output.stderr {
531 Some(stderr) => {
532 return Err(ToggleError::WithDescription(stderr));
533 }
534 None => {
535 return Err(ToggleError::WithDescription("Failed to enable extension, and found no output. Please try again. If this issue persists, contact support.".to_string()));
536 }
537 }
538 }
539 },
540 Err(e) => {
541 error!(
542 "Failed to reconcile extension because of kube exec error: {:?}",
543 e
544 );
545 return Err(ToggleError::WithAction(e));
546 }
547 }
548 Ok(())
549}
550
551#[cfg(test)]
552mod tests {
553 use crate::{
554 apis::postgres_parameters::PgConfig,
555 extensions::database_queries::{
556 check_input, parse_config_params, parse_extensions, parse_sql_output,
557 },
558 };
559
560 #[test]
561 fn test_parse_databases() {
562 let three_db = " datname
563 ----------
564 postgres
565 cat
566 dog
567 (3 rows)
568
569 ";
570
571 let rows = parse_sql_output(three_db);
572 println!("{:?}", rows);
573 assert_eq!(rows.len(), 3);
574 assert_eq!(rows[0], "postgres");
575 assert_eq!(rows[1], "cat");
576 assert_eq!(rows[2], "dog");
577
578 let one_db = " datname
579 ----------
580 postgres
581 (1 row)
582
583 ";
584
585 let rows = parse_sql_output(one_db);
586 println!("{:?}", rows);
587 assert_eq!(rows.len(), 1);
588 assert_eq!(rows[0], "postgres");
589 }
590
591 #[test]
592 fn test_parse_extensions() {
593 let ext_psql = " name | version | enabled | schema | description
594 --------------------+---------+---------+------------+------------------------------------------------------------------------
595 adminpack | 2.1 | f | public | administrative functions for PostgreSQL
596 amcheck | 1.3 | f | public | functions for verifying relation integrity
597 autoinc | 1.0 | f | public | functions for autoincrementing fields
598 bloom | 1.0 | f | public | bloom access method - signature file based index
599 btree_gin | 1.3 | f | public | support for indexing common datatypes in GIN
600 btree_gist | 1.7 | f | public | support for indexing common datatypes in GiST
601 citext | 1.6 | f | public | data type for case-insensitive character strings
602 cube | 1.5 | f | public | data type for multidimensional cubes
603 dblink | 1.2 | f | public | connect to other PostgreSQL databases from within a database
604 (9 rows)";
605
606 let ext = parse_extensions(ext_psql);
607 assert_eq!(ext.len(), 9);
608 assert_eq!(ext[0].name, "adminpack");
609 assert!(!ext[0].enabled);
610 assert_eq!(ext[0].version, "2.1".to_owned());
611 assert_eq!(ext[0].schema, "public".to_owned());
612 assert_eq!(
613 ext[0].description,
614 "administrative functions for PostgreSQL".to_owned()
615 );
616
617 assert_eq!(ext[8].name, "dblink");
618 assert!(!ext[8].enabled);
619 assert_eq!(ext[8].version, "1.2".to_owned());
620 assert_eq!(ext[8].schema, "public".to_owned());
621 assert_eq!(
622 ext[8].description,
623 "connect to other PostgreSQL databases from within a database".to_owned()
624 );
625 }
626
627 #[test]
628 fn test_parse_config_params() {
629 let config_psql = " name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart
630 ---------------------+---------+------+----------+------------+------------+---------+---------+--------+---------+---------+----------+----------+-----------+------------+------------+-----------------
631 allow_system_table_mods | off | | Developer | | | postmas | bool | | | | | off | off | | | f
632 application_name | | | | | | user | string | | | | | | | | |
633 archive_command | | | | | | sighup | string | | | | | | | | |
634 archive_mode | off | | | | | sighup | enum | | | | on,off | off | off | | | f";
635 let config = parse_config_params(config_psql);
636 assert_eq!(config.len(), 4);
637 assert_eq!(
638 config[0],
639 PgConfig {
640 name: "allow_system_table_mods".to_owned(),
641 value: "off".parse().unwrap(),
642 }
643 );
644 assert_eq!(
645 config[1],
646 PgConfig {
647 name: "application_name".to_owned(),
648 value: "".parse().unwrap(),
649 }
650 );
651 assert_eq!(
652 config[2],
653 PgConfig {
654 name: "archive_command".to_owned(),
655 value: "".parse().unwrap(),
656 }
657 );
658 assert_eq!(
659 config[3],
660 PgConfig {
661 name: "archive_mode".to_owned(),
662 value: "off".parse().unwrap(),
663 }
664 );
665 }
666
667 #[test]
668 fn test_check_input() {
669 let invalids = [
670 "extension--",
671 "data;",
672 "invalid^#$$characters",
673 ";invalid",
674 "",
675 ];
676 for i in invalids.iter() {
677 assert!(!check_input(i), "input {} should be invalid", i);
678 }
679
680 let valids = [
681 "extension_a",
682 "schema_abc",
683 "extension",
684 "NewExtension",
685 "NewExtension123",
686 "postgis_tiger_geocoder-3",
687 "address_standardizer-3",
688 "xml2",
689 ];
690 for i in valids.iter() {
691 assert!(check_input(i), "input {} should be valid", i);
692 }
693 }
694}