Skip to main content

streamling_e2e/
inspect.rs

1//! Test inspection utilities for debugging e2e tests.
2
3use crate::resources::{ClickHouseResource, KafkaResource, PostgresResource};
4use crate::{E2eConfig, E2eError, Result};
5use tracing::info;
6
7/// Inspect resources for a specific test UUID
8pub async fn inspect_test(test_uuid: &str) -> Result<()> {
9    // Extract UUID part (remove 'test_' prefix if present)
10    let uuid = test_uuid.strip_prefix("test_").unwrap_or(test_uuid);
11
12    println!("\n=== Inspecting resources for test UUID: {} ===\n", uuid);
13
14    let config = E2eConfig::from_env()?;
15
16    // ============================================================================
17    // Kafka Topic Inspection
18    // ============================================================================
19
20    println!("=== Kafka Topic: test_{}_topic ===", uuid);
21    let topic = format!("test_{}_topic", uuid);
22
23    // Inspect messages in the topic (without creating it)
24    info!("Checking topic and fetching messages...");
25    let (messages, highest_offset) = KafkaResource::inspect_topic_messages(
26        &config.kafka_broker,
27        &config.schema_registry_url,
28        &topic,
29        100,
30        20,
31    )
32    .await?;
33
34    if messages.is_empty() {
35        println!("[WARN] Topic {} does not exist or has no messages", topic);
36    } else {
37        for (offset, key, id_str) in &messages {
38            println!("{}\t{}\t{}", offset, key, id_str);
39        }
40        if let Some(max_offset) = highest_offset {
41            println!("[INFO] Highest offset: {}", max_offset);
42        }
43    }
44
45    // ============================================================================
46    // PostgreSQL Database Inspection
47    // ============================================================================
48
49    println!("\n=== PostgreSQL Database: test_{} ===", uuid);
50    let pg_db = format!("test_{}", uuid);
51
52    // Try to connect to the database
53    // E2E_POSTGRES_URL format: postgres://user:pass@host:port/db?sslmode=disable
54    // We need to extract the base URL and connect to 'postgres' database first
55    let parsed_url = url::Url::parse(&config.postgres_url)
56        .map_err(|e| E2eError::Postgres(sqlx::Error::Configuration(e.to_string().into())))?;
57
58    // Build admin URL pointing to 'postgres' database
59    let mut admin_url = parsed_url.clone();
60    admin_url.set_path("/postgres");
61    let admin_url_str = admin_url.as_str();
62
63    match PostgresResource::connect_existing(admin_url_str, &pg_db).await {
64        Ok(postgres) => {
65            info!("Database exists. Listing tables...");
66
67            let tables = postgres.list_tables().await?;
68
69            if tables.is_empty() {
70                println!("[WARN] No tables found in database {}", pg_db);
71            } else {
72                for table in &tables {
73                    println!("\nTable: {}", table);
74                    let count = postgres
75                        .count(&format!("SELECT COUNT(*) FROM public.\"{}\"", table))
76                        .await?;
77                    println!("Row count: {}", count);
78
79                    if count > 0 {
80                        println!("Sample data (first 5 rows):");
81                        let columns = postgres.get_column_names(table).await?;
82                        println!("{}", columns.join("\t"));
83
84                        let sample_data = postgres.get_sample_data(table, 5).await?;
85                        for row in sample_data {
86                            println!("{}", row.join("\t"));
87                        }
88                    }
89                }
90            }
91        }
92        Err(e) => {
93            println!("[WARN] PostgreSQL database {} does not exist: {}", pg_db, e);
94        }
95    }
96
97    // ============================================================================
98    // ClickHouse Database Inspection
99    // ============================================================================
100
101    println!("\n=== ClickHouse Database: test_{} ===", uuid);
102    let ch_db = format!("test_{}", uuid);
103
104    match ClickHouseResource::connect_existing(&config.clickhouse_url, &ch_db).await {
105        Ok(clickhouse) => {
106            info!("Database exists. Listing tables...");
107
108            let tables = clickhouse.list_tables().await?;
109
110            if tables.is_empty() {
111                println!("[WARN] No tables found in database {}", ch_db);
112            } else {
113                for table in &tables {
114                    println!("\nTable: {}", table);
115                    let count = clickhouse
116                        .count(&format!("SELECT COUNT(*) FROM {}", table))
117                        .await?;
118                    println!("Row count: {}", count);
119
120                    if count > 0 {
121                        println!("Sample data (first 5 rows):");
122                        match clickhouse.get_sample_data_formatted(table, 5).await {
123                            Ok(formatted) => println!("{}", formatted),
124                            Err(e) => println!("[WARN] Could not fetch sample data: {}", e),
125                        }
126                    }
127                }
128            }
129        }
130        Err(e) => {
131            println!("[WARN] ClickHouse database {} does not exist: {}", ch_db, e);
132        }
133    }
134
135    println!("\n=== Inspection Complete ===");
136    Ok(())
137}