use crate::resources::{ClickHouseResource, KafkaResource, PostgresResource};
use crate::{E2eConfig, E2eError, Result};
use tracing::info;
pub async fn inspect_test(test_uuid: &str) -> Result<()> {
let uuid = test_uuid.strip_prefix("test_").unwrap_or(test_uuid);
println!("\n=== Inspecting resources for test UUID: {} ===\n", uuid);
let config = E2eConfig::from_env()?;
println!("=== Kafka Topic: test_{}_topic ===", uuid);
let topic = format!("test_{}_topic", uuid);
info!("Checking topic and fetching messages...");
let (messages, highest_offset) = KafkaResource::inspect_topic_messages(
&config.kafka_broker,
&config.schema_registry_url,
&topic,
100,
20,
)
.await?;
if messages.is_empty() {
println!("[WARN] Topic {} does not exist or has no messages", topic);
} else {
for (offset, key, id_str) in &messages {
println!("{}\t{}\t{}", offset, key, id_str);
}
if let Some(max_offset) = highest_offset {
println!("[INFO] Highest offset: {}", max_offset);
}
}
println!("\n=== PostgreSQL Database: test_{} ===", uuid);
let pg_db = format!("test_{}", uuid);
let parsed_url = url::Url::parse(&config.postgres_url)
.map_err(|e| E2eError::Postgres(sqlx::Error::Configuration(e.to_string().into())))?;
let mut admin_url = parsed_url.clone();
admin_url.set_path("/postgres");
let admin_url_str = admin_url.as_str();
match PostgresResource::connect_existing(admin_url_str, &pg_db).await {
Ok(postgres) => {
info!("Database exists. Listing tables...");
let tables = postgres.list_tables().await?;
if tables.is_empty() {
println!("[WARN] No tables found in database {}", pg_db);
} else {
for table in &tables {
println!("\nTable: {}", table);
let count = postgres
.count(&format!("SELECT COUNT(*) FROM public.\"{}\"", table))
.await?;
println!("Row count: {}", count);
if count > 0 {
println!("Sample data (first 5 rows):");
let columns = postgres.get_column_names(table).await?;
println!("{}", columns.join("\t"));
let sample_data = postgres.get_sample_data(table, 5).await?;
for row in sample_data {
println!("{}", row.join("\t"));
}
}
}
}
}
Err(e) => {
println!("[WARN] PostgreSQL database {} does not exist: {}", pg_db, e);
}
}
println!("\n=== ClickHouse Database: test_{} ===", uuid);
let ch_db = format!("test_{}", uuid);
match ClickHouseResource::connect_existing(&config.clickhouse_url, &ch_db).await {
Ok(clickhouse) => {
info!("Database exists. Listing tables...");
let tables = clickhouse.list_tables().await?;
if tables.is_empty() {
println!("[WARN] No tables found in database {}", ch_db);
} else {
for table in &tables {
println!("\nTable: {}", table);
let count = clickhouse
.count(&format!("SELECT COUNT(*) FROM {}", table))
.await?;
println!("Row count: {}", count);
if count > 0 {
println!("Sample data (first 5 rows):");
match clickhouse.get_sample_data_formatted(table, 5).await {
Ok(formatted) => println!("{}", formatted),
Err(e) => println!("[WARN] Could not fetch sample data: {}", e),
}
}
}
}
}
Err(e) => {
println!("[WARN] ClickHouse database {} does not exist: {}", ch_db, e);
}
}
println!("\n=== Inspection Complete ===");
Ok(())
}