use crate::{
apis::coredb_types::CoreDB,
extensions::{
database_queries, kubernetes_queries, types,
types::{Extension, ExtensionInstallLocationStatus, ExtensionStatus},
},
get_current_coredb_resource, Context,
};
use kube::runtime::controller::Action;
use crate::{
apis::coredb_types::CoreDBStatus,
extensions::{
database_queries::list_shared_preload_libraries,
kubernetes_queries::merge_location_status_into_extension_status_list, types::get_location_status,
},
trunk::extensions_that_require_load,
};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tracing::{error, warn};
pub async fn reconcile_extension_toggle_state(
cdb: &CoreDB,
ctx: Arc<Context>,
) -> Result<Vec<ExtensionStatus>, Action> {
let all_actually_installed_extensions = database_queries::get_all_extensions(cdb, ctx.clone()).await?;
let ext_status_updates = determine_updated_extensions_status(cdb, all_actually_installed_extensions);
kubernetes_queries::update_extensions_status(cdb, ext_status_updates.clone(), &ctx).await?;
let cdb = get_current_coredb_resource(cdb, ctx.clone()).await?;
let toggle_these_extensions = determine_extension_locations_to_toggle(&cdb);
let ext_status_updates =
toggle_extensions(ctx, ext_status_updates, &cdb, toggle_these_extensions).await?;
Ok(ext_status_updates)
}
async fn toggle_extensions(
ctx: Arc<Context>,
ext_status_updates: Vec<ExtensionStatus>,
cdb: &CoreDB,
toggle_these_extensions: Vec<Extension>,
) -> Result<Vec<ExtensionStatus>, Action> {
let current_shared_preload_libraries = list_shared_preload_libraries(cdb, ctx.clone()).await?;
let requires_load =
extensions_that_require_load(ctx.client.clone(), &cdb.metadata.namespace.clone().unwrap()).await?;
let mut ext_status_updates = ext_status_updates.clone();
for extension_to_toggle in toggle_these_extensions {
for location_to_toggle in extension_to_toggle.locations {
let expected_library_name = match requires_load.get(&extension_to_toggle.name) {
None => &extension_to_toggle.name,
Some(expected_library_name) => expected_library_name,
};
if location_to_toggle.enabled
&& requires_load.contains_key(&extension_to_toggle.name)
&& !current_shared_preload_libraries.contains(expected_library_name)
{
warn!(
"Extension {} requires load, but is not present in shared_preload_libraries for {}, checking if we should requeue.",
extension_to_toggle.name, cdb.metadata.name.clone().unwrap());
requeue_if_expecting_shared_preload_library(
cdb,
&extension_to_toggle.name,
requires_load.clone(),
)?;
}
match database_queries::toggle_extension(
cdb,
&extension_to_toggle.name,
location_to_toggle.clone(),
ctx.clone(),
)
.await
{
Ok(_) => {}
Err(error_message) => {
let mut location_status = match types::get_location_status(
cdb,
&extension_to_toggle.name,
&location_to_toggle.database,
) {
None => {
error!("There should always be an extension status for a location before attempting to toggle an extension for that location");
ExtensionInstallLocationStatus {
database: location_to_toggle.database.clone(),
schema: None,
version: None,
enabled: None,
error: Some(true),
error_message: None,
}
}
Some(location_status) => location_status,
};
location_status.error = Some(true);
location_status.error_message = Some(error_message);
ext_status_updates = kubernetes_queries::update_extension_location_in_status(
cdb,
ctx.clone(),
&extension_to_toggle.name,
&location_status,
)
.await?;
}
}
}
}
Ok(ext_status_updates)
}
fn requeue_if_expecting_shared_preload_library(
cdb: &CoreDB,
extension_to_toggle: &str,
requires_load: BTreeMap<String, String>,
) -> Result<(), Action> {
let expected_library_name = match requires_load.get(extension_to_toggle) {
None => extension_to_toggle,
Some(expected_library_name) => expected_library_name,
};
match cdb
.spec
.get_pg_config_by_name("shared_preload_libraries", requires_load.clone())
{
Ok(shared_preload_libraries_config_value) => match shared_preload_libraries_config_value {
None => {
warn!(
"Extension {} requires load, but shared_preload_libraries is not configured for {}, so we are not expecting a restart. Continuing.",
extension_to_toggle, cdb.metadata.name.clone().unwrap());
}
Some(value) => match value.value.to_string().contains(expected_library_name) {
true => {
warn!(
"Extension {} requires load, and is present in shared_preload_libraries for {}, requeuing.",
extension_to_toggle, cdb.metadata.name.clone().unwrap());
return Err(Action::requeue(Duration::from_secs(10)));
}
false => {
warn!(
"Extension {} requires load, but is not present in shared_preload_libraries for {}, allowing error.",
extension_to_toggle, cdb.metadata.name.clone().unwrap());
}
},
},
Err(e) => {
error!("Error getting shared_preload_libraries config value: {}", e);
return Err(Action::requeue(Duration::from_secs(300)));
}
};
Ok(())
}
pub fn determine_updated_extensions_status(
cdb: &CoreDB,
all_actually_installed_extensions: Vec<ExtensionStatus>,
) -> Vec<ExtensionStatus> {
let mut ext_status_updates: Vec<ExtensionStatus> = vec![];
for actual_extension in all_actually_installed_extensions {
let mut extension_status = ExtensionStatus {
name: actual_extension.name.clone(),
description: actual_extension.description.clone(),
locations: vec![],
};
for actual_location in actual_extension.locations {
let mut location_status = ExtensionInstallLocationStatus {
enabled: actual_location.enabled,
database: actual_location.database.clone(),
schema: actual_location.schema.clone(),
version: actual_location.version.clone(),
error: Some(false),
error_message: None,
};
match types::get_location_status(
cdb,
&actual_extension.name.clone(),
&actual_location.database.clone(),
) {
None => {}
Some(current_status) => {
if current_status.schema == actual_location.schema {
location_status.error = current_status.error;
location_status.error_message = current_status.error_message;
}
}
}
match types::get_location_spec(cdb, &actual_extension.name, &actual_location.database) {
None => {}
Some(desired_location) => {
if actual_location.enabled == Some(desired_location.enabled) {
location_status.error = Some(false);
location_status.error_message = None;
}
}
}
extension_status.locations.push(location_status);
}
extension_status
.locations
.dedup_by(|a, b| a.database == b.database);
extension_status
.locations
.sort_by(|a, b| a.database.cmp(&b.database));
ext_status_updates.push(extension_status);
}
let mut cdb_with_updated_extensions_status = cdb.clone();
cdb_with_updated_extensions_status.status = Some(CoreDBStatus {
extensions: Some(ext_status_updates.clone()),
..CoreDBStatus::default()
});
for desired_extension in &cdb.spec.extensions {
for desired_location in &desired_extension.locations {
if desired_location.clone().enabled
&& get_location_status(
&cdb_with_updated_extensions_status,
&desired_extension.name,
&desired_location.database,
)
.is_none()
{
let location_status = ExtensionInstallLocationStatus {
enabled: None,
database: desired_location.database.clone(),
schema: None,
version: desired_location.version.clone(),
error: Some(true),
error_message: Some("Extension is not installed".to_string()),
};
ext_status_updates = merge_location_status_into_extension_status_list(
&desired_extension.name.clone(),
&location_status,
ext_status_updates.clone(),
)
}
}
}
ext_status_updates.dedup_by(|a, b| a.name == b.name);
ext_status_updates.sort_by(|a, b| a.name.cmp(&b.name));
ext_status_updates
}
pub fn determine_extension_locations_to_toggle(cdb: &CoreDB) -> Vec<Extension> {
let mut extensions_to_toggle: Vec<Extension> = vec![];
for desired_extension in &cdb.spec.extensions {
let mut needs_toggle = false;
let mut extension_to_toggle = desired_extension.clone();
extension_to_toggle.locations = vec![];
for desired_location in &desired_extension.locations {
match types::get_location_status(cdb, &desired_extension.name, &desired_location.database) {
None => {
error!("When determining extensions to toggle, there should always be a location status for the desired location, because that should be included by determine_updated_extensions_status.");
}
Some(actual_status) => {
if actual_status.error.is_some()
&& (!actual_status
.error
.expect("We just checked this is not none, so we should be able to unwrap.")
&& actual_status.enabled.is_some()
&& actual_status.enabled.unwrap() != desired_location.enabled)
{
needs_toggle = true;
extension_to_toggle.locations.push(desired_location.clone());
}
}
}
}
if needs_toggle {
extensions_to_toggle.push(extension_to_toggle);
}
}
extensions_to_toggle
}
#[cfg(test)]
mod tests {
use super::*;
use crate::apis::coredb_types::{CoreDB, CoreDBSpec};
#[test]
fn test_determine_updated_extensions_status_empty() {
let cdb = CoreDB {
metadata: Default::default(),
spec: CoreDBSpec {
extensions: vec![],
..Default::default()
},
status: None,
};
let all_actually_installed_extensions = vec![]; let result = determine_updated_extensions_status(&cdb, all_actually_installed_extensions);
assert_eq!(result, vec![]);
}
}