use crate::app::app_resources;
use crate::application_types::ApplicationValues;
use crate::bucket::BucketInjection;
use crate::dsh_api_client::DshApiClient;
use crate::error::DshApiResult;
use crate::platform::CloudProvider;
use crate::query_processor::{Match, QueryProcessor};
use crate::secret::SecretInjection;
use crate::topic::TopicInjection;
use crate::types::{AllocationStatus, AppCatalogApp, AppCatalogAppResourcesValue, Application};
use crate::vhost::VhostInjection;
use crate::volume::VolumeInjection;
#[allow(unused_imports)]
use crate::DshApiError;
use crate::{bucket, secret, topic, vhost, volume, DependantApplication};
use futures::future::{join, try_join_all};
use itertools::Itertools;
use regex::Regex;
use std::collections::HashMap;
use std::sync::LazyLock;
impl DshApiClient {
pub async fn application_ids(&self) -> DshApiResult<Vec<String>> {
let mut application_ids: Vec<String> = self
.get_application_configuration_map()
.await?
.keys()
.map(|application_id| application_id.to_string())
.collect();
application_ids.sort();
Ok(application_ids)
}
pub async fn application_ids_with_allocation_statuses(&self) -> DshApiResult<Vec<(String, AllocationStatus)>> {
let application_ids: Vec<String> = self.application_ids().await?;
let allocation_statuses = try_join_all(application_ids.iter().map(|application_id| self.get_application_status(application_id.as_str()))).await?;
Ok(application_ids.into_iter().zip(allocation_statuses).collect_vec())
}
pub async fn applications(&self) -> DshApiResult<Vec<(String, Application)>> {
self.applications_filtered(&|_| true).await
}
pub async fn applications_dependant_on_bucket(&self, bucket_id: &str) -> DshApiResult<Vec<DependantApplication<BucketInjection>>> {
let (applications, bucket_name) = match self.platform().cloud_provider() {
CloudProvider::AWS => (self.get_application_configuration_map().await?, None),
CloudProvider::Azure => {
let (applications, bucket_name) = join(self.get_application_configuration_map(), self.bucket_name(bucket_id)).await;
(applications?, bucket_name.ok())
}
};
Ok(
bucket::bucket_injections_from_applications(bucket_id, bucket_name.as_deref(), &applications)
.into_iter()
.map(|application_values| {
DependantApplication::new(
application_values.id.to_string(),
application_values.application.instances,
application_values.values,
)
})
.collect_vec(),
)
}
pub async fn applications_dependant_on_secret(&self, secret_name: &str) -> DshApiResult<Vec<DependantApplication<SecretInjection>>> {
let applications = self.get_application_configuration_map().await?;
Ok(
secret::secret_env_vars_from_applications(secret_name, &applications)
.iter()
.map(|application_values| {
DependantApplication::new(
application_values.id.to_string(),
application_values.application.instances,
application_values.values.iter().map(|env_var| SecretInjection::env_var(*env_var)).collect_vec(),
)
})
.collect_vec(),
)
}
pub async fn applications_dependant_on_scratch_topic(&self, topic: &str) -> DshApiResult<Vec<DependantApplication<TopicInjection>>> {
let applications = self.get_application_configuration_map().await?;
Ok(
topic::topic_used_in_applications(topic, &applications)
.iter()
.map(|(application_id, application)| DependantApplication::new(application_id.to_string(), application.instances, vec![TopicInjection::topic(topic)]))
.collect_vec(),
)
}
pub async fn applications_dependant_on_vhost(&self, vhost: &str) -> DshApiResult<Vec<DependantApplication<VhostInjection>>> {
let applications = self.get_application_configuration_map().await?;
Ok(
vhost::vhost_port_mappings_from_applications(vhost, &applications)
.iter()
.map(|application_values| {
DependantApplication::new(
application_values.id.to_string(),
application_values.application.instances,
application_values
.values
.iter()
.map(|(port, port_mapping)| VhostInjection::vhost(*port, Some(port_mapping.to_string())))
.collect_vec(),
)
})
.collect_vec(),
)
}
pub async fn applications_dependant_on_volume(&self, volume: &str) -> DshApiResult<Vec<DependantApplication<VolumeInjection>>> {
let applications = self.get_application_configuration_map().await?;
Ok(
volume::volume_paths_from_applications(volume, &applications)
.iter()
.map(|application_values| {
DependantApplication::new(
application_values.id.to_string(),
application_values.application.instances,
application_values.values.iter().map(|path| VolumeInjection::volume(*path)).collect_vec(),
)
})
.collect_vec(),
)
}
pub async fn applications_filtered(&self, predicate: &dyn Fn(&Application) -> bool) -> DshApiResult<Vec<(String, Application)>> {
let mut matching_applications: Vec<(String, Application)> = self
.get_application_configuration_map()
.await?
.into_iter()
.filter(|(_, application)| predicate(application))
.collect_vec();
matching_applications.sort_by(|(id_a, _), (id_b, _)| id_a.cmp(id_b));
Ok(matching_applications)
}
#[allow(clippy::type_complexity)]
pub async fn applications_that_use_env_value(&self, query_processor: &dyn QueryProcessor) -> DshApiResult<Vec<(String, Application, Vec<(String, Match)>)>> {
let mut matches: Vec<(String, Application, Vec<(String, Match)>)> = vec![];
let applications: Vec<(String, Application)> = self.applications().await?;
for (application_id, application) in applications {
let mut matching_envs: Vec<(String, Match)> = vec![];
for (key, value) in &application.env {
if let Some(matching) = query_processor.matching(value.as_str()) {
matching_envs.push((key.to_string(), matching));
}
}
if !matching_envs.is_empty() {
matching_envs.sort_by(|(env_a, _), (env_b, _)| env_a.cmp(env_b));
matches.push((application_id, application, matching_envs));
}
}
Ok(matches)
}
pub async fn guid(&self) -> DshApiResult<(usize, usize)> {
static GUID_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^([0-9]+):([0-9]+)$").unwrap());
match self.get_application_configuration_map().await?.iter().take(1).last() {
Some((_, application)) => match GUID_REGEX.captures(application.user.as_str()) {
Some(captures) => Ok((
captures.get(1).unwrap().as_str().parse::<usize>().unwrap(),
captures.get(2).unwrap().as_str().parse::<usize>().unwrap(),
)),
None => Err(DshApiError::unexpected(format!("illegal user value {}", application.user))),
},
None => Err(DshApiError::unexpected("no applications deployed")),
}
}
}
pub fn application_environment_variables(query_processor: &dyn QueryProcessor, application: &Application) -> Vec<(String, Match)> {
let mut matching_envs: Vec<(String, Match)> = vec![];
for (key, value) in &application.env {
if let Some(matching) = query_processor.matching(value.as_str()) {
matching_envs.push((key.to_string(), matching));
}
}
matching_envs.sort_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
matching_envs
}
pub fn application_resources_from_app(app: &AppCatalogApp) -> Vec<(&str, &Application)> {
app_resources(app, &|resource_value| match resource_value {
AppCatalogAppResourcesValue::Application(application) => Some(application),
_ => None,
})
}
pub fn applications_environment_variables<'a>(query_processor: &dyn QueryProcessor, applications: &'a HashMap<String, Application>) -> Vec<ApplicationValues<'a, (String, Match)>> {
let mut matching_applications: Vec<ApplicationValues<'a, (String, Match)>> = vec![];
for (application_id, application) in applications {
let matches: Vec<(String, Match)> = application_environment_variables(query_processor, application);
if !matches.is_empty() {
matching_applications.push(ApplicationValues::new(application_id, application, matches));
}
}
matching_applications.sort();
matching_applications
}