use crate::app::{app_resources, apps_that_use_topic};
use crate::application_types::ApplicationValues;
use crate::dsh_api_client::DshApiClient;
use crate::error::DshApiResult;
use crate::parse::TopicString;
use crate::types::{AppCatalogApp, AppCatalogAppResourcesValue, Application, Topic};
use crate::{Dependant, DependantApp, DependantApplication};
use futures::try_join;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub enum TopicInjection {
#[serde(rename = "env")]
EnvVar { env_var_name: String },
#[serde(rename = "topic")]
Topic { topic_name: String },
}
impl TopicInjection {
pub(crate) fn env_var<T>(env_var: T) -> Self
where
T: Into<String>,
{
Self::EnvVar { env_var_name: env_var.into() }
}
pub(crate) fn topic<T>(topic_name: T) -> Self
where
T: Into<String>,
{
Self::Topic { topic_name: topic_name.into() }
}
}
impl Display for TopicInjection {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TopicInjection::EnvVar { env_var_name } => write!(f, "{}", env_var_name),
TopicInjection::Topic { topic_name } => write!(f, "{{ topic('{}') }}", topic_name),
}
}
}
impl DshApiClient {
pub async fn topic_dependant_applications(&self, topic_id: &str) -> DshApiResult<Vec<DependantApplication<TopicInjection>>> {
let application_configuration_map = self.get_application_configuration_map().await?;
let mut dependant_applications = Vec::<DependantApplication<TopicInjection>>::new();
for application in topic_env_vars_from_applications(topic_id, &application_configuration_map) {
dependant_applications.push(DependantApplication::new(
application.id.to_string(),
application.application.instances,
application.values.iter().map(|(env_var, _)| TopicInjection::env_var(*env_var)).collect_vec(),
));
}
Ok(dependant_applications)
}
pub async fn topic_dependant_apps(&self, topic_id: &str) -> DshApiResult<Vec<DependantApp>> {
let appcatalogapp_configuration_map = self.get_appcatalogapp_configuration_map().await?;
let mut dependant_apps = Vec::<DependantApp>::new();
for (app_id, _, resource_ids) in apps_that_use_topic(topic_id, &appcatalogapp_configuration_map) {
dependant_apps.push(DependantApp::new(
app_id.to_string(),
resource_ids.iter().map(|resource_id| resource_id.to_string()).collect_vec(),
));
}
Ok(dependant_apps)
}
pub async fn topic_dependants(&self, topic_id: &str) -> DshApiResult<Vec<Dependant<TopicInjection>>> {
let (application_configuration_map, appcatalogapp_configuration_map) = try_join!(self.get_application_configuration_map(), self.get_appcatalogapp_configuration_map())?;
let mut dependants = Vec::<Dependant<TopicInjection>>::new();
for application in topic_injections_from_applications(topic_id, &application_configuration_map) {
dependants.push(Dependant::service(
application.id,
application.application.instances,
application.values.iter().cloned().collect_vec(),
));
}
for (app_id, _, resource_ids) in apps_that_use_topic(topic_id, &appcatalogapp_configuration_map) {
dependants.push(Dependant::app(
app_id.to_string(),
resource_ids.iter().map(|resource_id| resource_id.to_string()).collect_vec(),
));
}
dependants.sort();
Ok(dependants)
}
pub async fn topics_with_dependant_applications(&self) -> DshApiResult<Vec<(String, Vec<DependantApplication<TopicInjection>>)>> {
let (topic_ids, application_configuration_map) = try_join!(self.get_topic_ids(), self.get_application_configuration_map())?;
let mut topics = Vec::<(String, Vec<DependantApplication<TopicInjection>>)>::new();
for topic_id in topic_ids {
let mut dependant_applications: Vec<DependantApplication<TopicInjection>> = vec![];
for application in topic_env_vars_from_applications(topic_id.as_str(), &application_configuration_map) {
dependant_applications.push(DependantApplication::new(
application.id.to_string(),
application.application.instances,
application.values.iter().map(|(env_var, _)| TopicInjection::env_var(*env_var)).collect_vec(),
));
}
topics.push((topic_id, dependant_applications));
}
Ok(topics)
}
pub async fn topics_with_dependant_apps(&self) -> DshApiResult<Vec<(String, Vec<DependantApp>)>> {
let (topic_ids, apps) = try_join!(self.get_topic_ids(), self.get_appcatalogapp_configuration_map())?;
let mut topics = Vec::<(String, Vec<DependantApp>)>::new();
for topic_id in topic_ids {
let mut dependant_apps: Vec<DependantApp> = vec![];
for (app_id, _, resource_ids) in apps_that_use_topic(topic_id.as_str(), &apps) {
dependant_apps.push(DependantApp::new(
app_id.to_string(),
resource_ids.iter().map(|resource_id| resource_id.to_string()).collect_vec(),
));
}
topics.push((topic_id, dependant_apps));
}
Ok(topics)
}
pub async fn topics_with_dependants(&self) -> DshApiResult<Vec<(String, Vec<Dependant<TopicInjection>>)>> {
let (topic_ids, application_configuration_map, appcatalogapp_configuration_map) = try_join!(
self.get_topic_ids(),
self.get_application_configuration_map(),
self.get_appcatalogapp_configuration_map()
)?;
let mut topics = Vec::<(String, Vec<Dependant<TopicInjection>>)>::new();
for topic_id in topic_ids {
let mut dependants: Vec<Dependant<TopicInjection>> = vec![];
for application in topic_env_vars_from_applications(topic_id.as_str(), &application_configuration_map) {
dependants.push(Dependant::service(
application.id,
application.application.instances,
application.values.iter().map(|(env_var, _)| TopicInjection::env_var(*env_var)).collect_vec(),
));
}
for (app_id, _, resource_ids) in apps_that_use_topic(topic_id.as_str(), &appcatalogapp_configuration_map) {
dependants.push(Dependant::app(
app_id.to_string(),
resource_ids.iter().map(|resource_id| resource_id.to_string()).collect_vec(),
));
}
topics.push((topic_id, dependants));
}
Ok(topics)
}
}
pub fn topic_env_vars_from_application<'a>(topic_name: &str, application: &'a Application) -> Vec<(&'a str, TopicString<'a>)> {
let mut env_var_keys: Vec<(&str, TopicString)> = application
.env
.iter()
.filter_map(|(env_key, env_value)| {
TopicString::try_from(env_value.as_str())
.ok()
.and_then(|topic| if topic.name() == topic_name { Some((env_key.as_str(), topic)) } else { None })
})
.collect_vec();
env_var_keys.sort_by_key(|(env_key, _)| *env_key);
env_var_keys
}
pub fn topic_env_vars_from_applications<'a>(topic_id: &str, applications: &'a HashMap<String, Application>) -> Vec<ApplicationValues<'a, (&'a str, TopicString<'a>)>> {
let mut application_injections: Vec<ApplicationValues<(&'a str, TopicString<'a>)>> = vec![];
for (application_id, application) in applications {
let environment_variable_keys = topic_env_vars_from_application(topic_id, application);
if !environment_variable_keys.is_empty() {
application_injections.push(ApplicationValues::new(application_id, application, environment_variable_keys));
}
}
application_injections.sort();
application_injections
}
pub fn topic_injections_from_application(topic_name: &str, application: &Application) -> Vec<TopicInjection> {
let mut topic_injections: Vec<TopicInjection> = vec![];
for (env_key, env_value) in &application.env {
if let Ok(topic) = TopicString::try_from(env_value.as_str()) {
if topic.name() == topic_name {
topic_injections.push(TopicInjection::env_var(env_key))
}
}
}
for application_topic in &application.topics {
if let Ok(topic) = TopicString::try_from(application_topic.as_str()) {
if topic.name() == topic_name {
topic_injections.push(TopicInjection::topic(application_topic))
}
}
}
topic_injections.sort();
topic_injections
}
pub fn topic_injections_from_applications<'a>(topic_id: &str, applications: &'a HashMap<String, Application>) -> Vec<ApplicationValues<'a, TopicInjection>> {
let mut application_injections: Vec<ApplicationValues<TopicInjection>> = vec![];
for (application_id, application) in applications {
let environment_variable_keys = topic_injections_from_application(topic_id, application);
if !environment_variable_keys.is_empty() {
application_injections.push(ApplicationValues::new(application_id, application, environment_variable_keys));
}
}
application_injections.sort();
application_injections
}
pub fn topic_resources_from_app(app: &AppCatalogApp) -> Vec<(&str, &Topic)> {
app_resources(app, &|resource_value| match resource_value {
AppCatalogAppResourcesValue::Topic(topic) => Some(topic),
_ => None,
})
}
pub fn topic_used_in_application(topic_id: &str, application: &Application) -> bool {
application.topics.iter().any(|id| id == topic_id)
}
pub fn topic_used_in_applications<'a>(topic_id: &str, applications: &'a HashMap<String, Application>) -> Vec<(&'a str, &'a Application)> {
let mut application_tuples: Vec<(&str, &Application)> = applications
.iter()
.filter_map(|(application_id, application)| if topic_used_in_application(topic_id, application) { Some((application_id.as_str(), application)) } else { None })
.collect_vec();
application_tuples.sort_by(|(id_a, _), (id_b, _)| id_a.cmp(id_b));
application_tuples
}
pub fn topics_from_application(application: &Application) -> Vec<&str> {
application.topics.iter().map(|topic_id| topic_id.as_str()).collect_vec()
}
pub fn topics_from_applications(applications: &HashMap<String, Application>) -> Vec<ApplicationValues<&str>> {
let mut application_tuples = applications
.iter()
.filter_map(|(application_id, application)| {
let mut application_topics = topics_from_application(application);
if !application_topics.is_empty() {
application_topics.sort();
Some(ApplicationValues::new(application_id, application, application_topics))
} else {
None
}
})
.collect_vec();
application_tuples.sort();
application_tuples
}