use crate::AppState;
use crate::data::clients::AthenaClientRecord;
use crate::drivers::postgresql::sqlx_driver::RegisteredClient;
use crate::parser::resolve_compatible_postgres_uri;
use crate::provisioning::{
DockerManagedContainer, PostgresUriSummary, ProvisioningMetadataSummary,
ProvisioningMonitorEntry, ProvisioningMonitorNetwork, ProvisioningMonitorProbe,
ProvisioningMonitorSummary, private_catalog_pg_uri, runtime_config_from_root,
summarize_catalog_route_bindings, summarize_postgres_uri, summarize_provisioning_metadata,
};
use crate::utils::sqlx_postgres_connect_uri::sanitize_sqlx_postgres_connect_uri;
use sqlx::Row;
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use tokio::time::timeout;
const PROBE_TIMEOUT_SECS: u64 = 3;
pub(super) async fn build_provisioning_monitor_entries(
state: &AppState,
containers: &[DockerManagedContainer],
runtime_clients: &HashMap<String, RegisteredClient>,
catalog_clients: &HashMap<String, AthenaClientRecord>,
) -> Vec<ProvisioningMonitorEntry> {
let expected_tables = runtime_config_from_root().expected_tables;
let mut entries: Vec<ProvisioningMonitorEntry> = Vec::new();
let mut seen_client_keys: HashSet<String> = HashSet::new();
let mut seen_container_keys: HashSet<String> = HashSet::new();
for container in containers {
let entry = build_docker_monitor_entry(
state,
container,
runtime_clients,
catalog_clients,
&expected_tables,
)
.await;
seen_client_keys.insert(normalize_monitor_key(&entry.client_name));
if let Some(container_name) = entry.container_name.as_deref() {
seen_container_keys.insert(normalize_monitor_key(container_name));
}
entries.push(entry);
}
for catalog_client in catalog_clients.values() {
if !is_managed_catalog_client(catalog_client) {
continue;
}
let metadata = summarize_provisioning_metadata(&catalog_client.metadata);
let kind = classify_monitor_kind(&metadata);
let client_key = normalize_monitor_key(&catalog_client.client_name);
let container_key = metadata
.container_name
.as_deref()
.map(normalize_monitor_key)
.unwrap_or_default();
if kind == "docker"
&& (seen_client_keys.contains(&client_key)
|| (!container_key.is_empty() && seen_container_keys.contains(&container_key)))
{
continue;
}
let runtime_client = runtime_clients.get(&client_key);
let entry = build_catalog_monitor_entry(
state,
catalog_client,
runtime_client,
metadata,
&expected_tables,
)
.await;
seen_client_keys.insert(client_key);
if let Some(container_name) = entry.container_name.as_deref() {
seen_container_keys.insert(normalize_monitor_key(container_name));
}
entries.push(entry);
}
entries.sort_by(|left, right| {
left.kind.cmp(&right.kind).then_with(|| {
left.client_name
.to_lowercase()
.cmp(&right.client_name.to_lowercase())
})
});
entries
}
pub(super) fn summarize_provisioning_monitors(
entries: &[ProvisioningMonitorEntry],
) -> ProvisioningMonitorSummary {
let mut summary = ProvisioningMonitorSummary {
total: entries.len(),
..ProvisioningMonitorSummary::default()
};
for entry in entries {
match entry.kind.as_str() {
"docker" => summary.docker += 1,
_ => summary.managed += 1,
}
match entry.probe.status.as_str() {
"ok" => summary.healthy += 1,
"missing_tables" => summary.warning += 1,
"unreachable" => summary.unreachable += 1,
_ => summary.unknown += 1,
}
}
summary
}
async fn build_docker_monitor_entry(
state: &AppState,
container: &DockerManagedContainer,
runtime_clients: &HashMap<String, RegisteredClient>,
catalog_clients: &HashMap<String, AthenaClientRecord>,
expected_tables: &[String],
) -> ProvisioningMonitorEntry {
let candidate_keys = container_candidate_keys(container);
let runtime_client = find_runtime_client(runtime_clients, &candidate_keys);
let catalog_client = find_catalog_client(catalog_clients, &candidate_keys);
let metadata_value = catalog_client
.map(|client| &client.metadata)
.cloned()
.unwrap_or_else(|| serde_json::json!({}));
let metadata = summarize_provisioning_metadata(&metadata_value);
let client_name = runtime_client
.map(|client| client.client_name.clone())
.or_else(|| catalog_client.map(|client| client.client_name.clone()))
.or_else(|| container_client_label(container))
.or_else(|| container_client_alias(&container.container_name))
.unwrap_or_else(|| container.container_name.clone());
let private_uri = resolve_private_target_uri(catalog_client, runtime_client);
let catalog_uri = resolve_catalog_target_uri(catalog_client);
let mut private_endpoint = private_uri
.as_deref()
.and_then(summarize_postgres_uri)
.unwrap_or_default();
let mut catalog_endpoint = catalog_uri
.as_deref()
.and_then(summarize_postgres_uri)
.unwrap_or_default();
merge_endpoint_with_metadata(&mut private_endpoint, &metadata);
merge_endpoint_with_metadata(&mut catalog_endpoint, &metadata);
let probe = if !container.running {
ProvisioningMonitorProbe {
status: "unreachable".to_string(),
checked_via: "container_state".to_string(),
provisioned: None,
present_tables: Vec::new(),
missing_tables: expected_tables.to_vec(),
error: Some("Docker container is not running.".to_string()),
}
} else {
probe_client_target(
state,
runtime_client,
private_uri.as_deref().or(catalog_uri.as_deref()),
expected_tables,
)
.await
};
ProvisioningMonitorEntry {
key: normalize_monitor_key(&client_name),
kind: "docker".to_string(),
client_name,
description: catalog_client
.and_then(|client| client.description.clone())
.or_else(|| runtime_client.and_then(|client| client.description.clone())),
source: catalog_client
.map(|client| client.source.clone())
.or_else(|| runtime_client.map(|client| client.source.clone())),
managed_by: metadata.managed_by.clone(),
provider: metadata
.provider
.clone()
.or_else(|| Some("docker".to_string())),
mode: metadata
.mode
.clone()
.or_else(|| Some("dedicated_container".to_string())),
container_name: Some(container.container_name.clone()),
running: Some(container.running),
status: container.status.clone(),
image: container.image.clone().or_else(|| metadata.image.clone()),
labels: container.labels.clone(),
runtime_registered: runtime_client.is_some(),
runtime_pool_connected: runtime_client.map(|client| client.pool_connected),
catalog_registered: catalog_client.is_some(),
is_active: catalog_client
.map(|client| client.is_active)
.or_else(|| runtime_client.map(|client| client.is_active)),
is_frozen: catalog_client
.map(|client| client.is_frozen)
.or_else(|| runtime_client.map(|client| client.is_frozen)),
last_seen_at: catalog_client
.and_then(|client| client.last_seen_at.map(|value| value.to_rfc3339())),
last_synced_from_config_at: catalog_client.and_then(|client| {
client
.last_synced_from_config_at
.map(|value| value.to_rfc3339())
}),
updated_at: catalog_client.map(|client| client.updated_at.to_rfc3339()),
network: ProvisioningMonitorNetwork {
private_endpoint,
catalog_endpoint,
public_endpoints: summarize_catalog_route_bindings(&metadata_value),
},
probe,
}
}
async fn build_catalog_monitor_entry(
state: &AppState,
catalog_client: &AthenaClientRecord,
runtime_client: Option<&RegisteredClient>,
metadata: ProvisioningMetadataSummary,
expected_tables: &[String],
) -> ProvisioningMonitorEntry {
let private_uri = resolve_private_target_uri(Some(catalog_client), runtime_client);
let catalog_uri = resolve_catalog_target_uri(Some(catalog_client));
let mut private_endpoint = private_uri
.as_deref()
.and_then(summarize_postgres_uri)
.unwrap_or_default();
let mut catalog_endpoint = catalog_uri
.as_deref()
.and_then(summarize_postgres_uri)
.unwrap_or_default();
merge_endpoint_with_metadata(&mut private_endpoint, &metadata);
merge_endpoint_with_metadata(&mut catalog_endpoint, &metadata);
let probe = probe_client_target(
state,
runtime_client,
private_uri.as_deref().or(catalog_uri.as_deref()),
expected_tables,
)
.await;
ProvisioningMonitorEntry {
key: normalize_monitor_key(&catalog_client.client_name),
kind: classify_monitor_kind(&metadata).to_string(),
client_name: catalog_client.client_name.clone(),
description: catalog_client.description.clone(),
source: Some(catalog_client.source.clone()),
managed_by: metadata.managed_by.clone(),
provider: metadata.provider.clone(),
mode: metadata.mode.clone(),
container_name: metadata.container_name.clone(),
running: None,
status: None,
image: metadata.image.clone(),
labels: HashMap::new(),
runtime_registered: runtime_client.is_some(),
runtime_pool_connected: runtime_client.map(|client| client.pool_connected),
catalog_registered: true,
is_active: Some(catalog_client.is_active),
is_frozen: Some(catalog_client.is_frozen),
last_seen_at: catalog_client.last_seen_at.map(|value| value.to_rfc3339()),
last_synced_from_config_at: catalog_client
.last_synced_from_config_at
.map(|value| value.to_rfc3339()),
updated_at: Some(catalog_client.updated_at.to_rfc3339()),
network: ProvisioningMonitorNetwork {
private_endpoint,
catalog_endpoint,
public_endpoints: summarize_catalog_route_bindings(&catalog_client.metadata),
},
probe,
}
}
async fn probe_client_target(
state: &AppState,
runtime_client: Option<&RegisteredClient>,
fallback_uri: Option<&str>,
expected_tables: &[String],
) -> ProvisioningMonitorProbe {
if let Some(runtime_client) = runtime_client
&& let Some(pool) = state.pg_registry.get_pool(&runtime_client.client_name)
{
return probe_tables_with_pool(&pool, expected_tables, "runtime_pool").await;
}
let Some(uri) = fallback_uri
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return ProvisioningMonitorProbe {
status: "unknown".to_string(),
checked_via: "unavailable".to_string(),
provisioned: None,
present_tables: Vec::new(),
missing_tables: expected_tables.to_vec(),
error: Some("No private or catalog Postgres URI is available for probing.".to_string()),
};
};
probe_tables_with_uri(uri, expected_tables).await
}
async fn probe_tables_with_uri(uri: &str, expected_tables: &[String]) -> ProvisioningMonitorProbe {
let sanitized_uri = sanitize_sqlx_postgres_connect_uri(uri);
let connect_future = PgPoolOptions::new()
.max_connections(1)
.acquire_timeout(Duration::from_secs(2))
.connect(sanitized_uri.as_ref());
let pool = match timeout(Duration::from_secs(PROBE_TIMEOUT_SECS), connect_future).await {
Ok(Ok(pool)) => pool,
Ok(Err(err)) => {
return ProvisioningMonitorProbe {
status: "unreachable".to_string(),
checked_via: "direct_uri".to_string(),
provisioned: None,
present_tables: Vec::new(),
missing_tables: expected_tables.to_vec(),
error: Some(err.to_string()),
};
}
Err(_) => {
return ProvisioningMonitorProbe {
status: "unreachable".to_string(),
checked_via: "direct_uri".to_string(),
provisioned: None,
present_tables: Vec::new(),
missing_tables: expected_tables.to_vec(),
error: Some("Timed out while connecting to the Postgres target.".to_string()),
};
}
};
probe_tables_with_pool(&pool, expected_tables, "direct_uri").await
}
async fn probe_tables_with_pool(
pool: &PgPool,
expected_tables: &[String],
checked_via: &str,
) -> ProvisioningMonitorProbe {
let query = sqlx::query(
"SELECT table_name::text FROM information_schema.tables WHERE table_schema = 'public'",
)
.fetch_all(pool);
let rows = match timeout(Duration::from_secs(PROBE_TIMEOUT_SECS), query).await {
Ok(Ok(rows)) => rows,
Ok(Err(err)) => {
return ProvisioningMonitorProbe {
status: "unreachable".to_string(),
checked_via: checked_via.to_string(),
provisioned: None,
present_tables: Vec::new(),
missing_tables: expected_tables.to_vec(),
error: Some(err.to_string()),
};
}
Err(_) => {
return ProvisioningMonitorProbe {
status: "unreachable".to_string(),
checked_via: checked_via.to_string(),
provisioned: None,
present_tables: Vec::new(),
missing_tables: expected_tables.to_vec(),
error: Some("Timed out while checking Athena tables.".to_string()),
};
}
};
let existing_tables: HashSet<String> = rows
.iter()
.filter_map(|row| row.try_get::<String, _>("table_name").ok())
.collect();
let present_tables: Vec<String> = expected_tables
.iter()
.filter(|table| existing_tables.contains(table.as_str()))
.cloned()
.collect();
let missing_tables: Vec<String> = expected_tables
.iter()
.filter(|table| !existing_tables.contains(table.as_str()))
.cloned()
.collect();
let provisioned = missing_tables.is_empty();
ProvisioningMonitorProbe {
status: if provisioned {
"ok".to_string()
} else {
"missing_tables".to_string()
},
checked_via: checked_via.to_string(),
provisioned: Some(provisioned),
present_tables,
missing_tables,
error: None,
}
}
fn resolve_private_target_uri(
catalog_client: Option<&AthenaClientRecord>,
runtime_client: Option<&RegisteredClient>,
) -> Option<String> {
catalog_client
.and_then(|client| private_catalog_pg_uri(&client.metadata))
.or_else(|| {
runtime_client.and_then(|client| {
client
.pg_uri
.as_deref()
.map(resolve_compatible_postgres_uri)
.or_else(|| {
client
.config_uri_template
.as_deref()
.map(resolve_compatible_postgres_uri)
})
})
})
.or_else(|| {
catalog_client.and_then(|client| {
client
.config_uri_template
.as_deref()
.map(resolve_compatible_postgres_uri)
.or_else(|| {
client
.pg_uri
.as_deref()
.map(resolve_compatible_postgres_uri)
})
})
})
}
fn resolve_catalog_target_uri(catalog_client: Option<&AthenaClientRecord>) -> Option<String> {
catalog_client.and_then(|client| {
client
.pg_uri
.as_deref()
.map(resolve_compatible_postgres_uri)
.or_else(|| {
client
.config_uri_template
.as_deref()
.map(resolve_compatible_postgres_uri)
})
})
}
fn merge_endpoint_with_metadata(
endpoint: &mut PostgresUriSummary,
metadata: &ProvisioningMetadataSummary,
) {
if endpoint.host.is_none() {
endpoint.host = metadata.host.clone();
}
if endpoint.port.is_none() {
endpoint.port = metadata.host_port;
}
if endpoint.database_name.is_none() {
endpoint.database_name = metadata.database_name.clone();
}
if endpoint.username.is_none() {
endpoint.username = metadata.username.clone();
}
}
fn classify_monitor_kind(metadata: &ProvisioningMetadataSummary) -> &'static str {
if matches!(metadata.provider.as_deref(), Some("docker")) {
"docker"
} else {
"managed"
}
}
fn is_managed_catalog_client(client: &AthenaClientRecord) -> bool {
let metadata = summarize_provisioning_metadata(&client.metadata);
metadata.managed_by.is_some()
|| metadata.provider.is_some()
|| metadata.mode.is_some()
|| !summarize_catalog_route_bindings(&client.metadata).is_empty()
|| private_catalog_pg_uri(&client.metadata).is_some()
}
fn find_runtime_client<'a>(
runtime_clients: &'a HashMap<String, RegisteredClient>,
candidate_keys: &[String],
) -> Option<&'a RegisteredClient> {
candidate_keys
.iter()
.find_map(|candidate| runtime_clients.get(candidate))
}
fn find_catalog_client<'a>(
catalog_clients: &'a HashMap<String, AthenaClientRecord>,
candidate_keys: &[String],
) -> Option<&'a AthenaClientRecord> {
candidate_keys
.iter()
.find_map(|candidate| catalog_clients.get(candidate))
}
fn container_candidate_keys(container: &DockerManagedContainer) -> Vec<String> {
let mut keys = Vec::new();
if let Some(label) = container_client_label(container) {
keys.push(normalize_monitor_key(&label));
}
if let Some(alias) = container_client_alias(&container.container_name) {
keys.push(normalize_monitor_key(&alias));
}
keys.push(normalize_monitor_key(&container.container_name));
keys.sort();
keys.dedup();
keys
}
fn container_client_label(container: &DockerManagedContainer) -> Option<String> {
container
.labels
.get("athena.client")
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn container_client_alias(container_name: &str) -> Option<String> {
container_name
.trim()
.strip_prefix("athena-pg-")
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn normalize_monitor_key(value: &str) -> String {
value.trim().to_ascii_lowercase()
}