streamling_e2e/
inspect.rs1use crate::resources::{ClickHouseResource, KafkaResource, PostgresResource};
4use crate::{E2eConfig, E2eError, Result};
5use tracing::info;
6
7pub async fn inspect_test(test_uuid: &str) -> Result<()> {
9 let uuid = test_uuid.strip_prefix("test_").unwrap_or(test_uuid);
11
12 println!("\n=== Inspecting resources for test UUID: {} ===\n", uuid);
13
14 let config = E2eConfig::from_env()?;
15
16 println!("=== Kafka Topic: test_{}_topic ===", uuid);
21 let topic = format!("test_{}_topic", uuid);
22
23 info!("Checking topic and fetching messages...");
25 let (messages, highest_offset) = KafkaResource::inspect_topic_messages(
26 &config.kafka_broker,
27 &config.schema_registry_url,
28 &topic,
29 100,
30 20,
31 )
32 .await?;
33
34 if messages.is_empty() {
35 println!("[WARN] Topic {} does not exist or has no messages", topic);
36 } else {
37 for (offset, key, id_str) in &messages {
38 println!("{}\t{}\t{}", offset, key, id_str);
39 }
40 if let Some(max_offset) = highest_offset {
41 println!("[INFO] Highest offset: {}", max_offset);
42 }
43 }
44
45 println!("\n=== PostgreSQL Database: test_{} ===", uuid);
50 let pg_db = format!("test_{}", uuid);
51
52 let parsed_url = url::Url::parse(&config.postgres_url)
56 .map_err(|e| E2eError::Postgres(sqlx::Error::Configuration(e.to_string().into())))?;
57
58 let mut admin_url = parsed_url.clone();
60 admin_url.set_path("/postgres");
61 let admin_url_str = admin_url.as_str();
62
63 match PostgresResource::connect_existing(admin_url_str, &pg_db).await {
64 Ok(postgres) => {
65 info!("Database exists. Listing tables...");
66
67 let tables = postgres.list_tables().await?;
68
69 if tables.is_empty() {
70 println!("[WARN] No tables found in database {}", pg_db);
71 } else {
72 for table in &tables {
73 println!("\nTable: {}", table);
74 let count = postgres
75 .count(&format!("SELECT COUNT(*) FROM public.\"{}\"", table))
76 .await?;
77 println!("Row count: {}", count);
78
79 if count > 0 {
80 println!("Sample data (first 5 rows):");
81 let columns = postgres.get_column_names(table).await?;
82 println!("{}", columns.join("\t"));
83
84 let sample_data = postgres.get_sample_data(table, 5).await?;
85 for row in sample_data {
86 println!("{}", row.join("\t"));
87 }
88 }
89 }
90 }
91 }
92 Err(e) => {
93 println!("[WARN] PostgreSQL database {} does not exist: {}", pg_db, e);
94 }
95 }
96
97 println!("\n=== ClickHouse Database: test_{} ===", uuid);
102 let ch_db = format!("test_{}", uuid);
103
104 match ClickHouseResource::connect_existing(&config.clickhouse_url, &ch_db).await {
105 Ok(clickhouse) => {
106 info!("Database exists. Listing tables...");
107
108 let tables = clickhouse.list_tables().await?;
109
110 if tables.is_empty() {
111 println!("[WARN] No tables found in database {}", ch_db);
112 } else {
113 for table in &tables {
114 println!("\nTable: {}", table);
115 let count = clickhouse
116 .count(&format!("SELECT COUNT(*) FROM {}", table))
117 .await?;
118 println!("Row count: {}", count);
119
120 if count > 0 {
121 println!("Sample data (first 5 rows):");
122 match clickhouse.get_sample_data_formatted(table, 5).await {
123 Ok(formatted) => println!("{}", formatted),
124 Err(e) => println!("[WARN] Could not fetch sample data: {}", e),
125 }
126 }
127 }
128 }
129 }
130 Err(e) => {
131 println!("[WARN] ClickHouse database {} does not exist: {}", ch_db, e);
132 }
133 }
134
135 println!("\n=== Inspection Complete ===");
136 Ok(())
137}