use arrow_schema::{DataType, Field, Schema, TimeUnit};
use kyma_catalog::PostgresCatalog;
use kyma_core::catalog::{Catalog, TableConfig};
use kyma_format_tlm::TelemetryFormat;
use object_store::memory::InMemory;
use std::sync::Arc;
use testcontainers::{runners::AsyncRunner, ImageExt};
use testcontainers_modules::postgres::Postgres;
use crate::catalog_handler::SchemaCache;
use crate::QueryState;
pub struct TestServer {
base_url: String,
shutdown_tx: tokio::sync::oneshot::Sender<()>,
handle: tokio::task::JoinHandle<()>,
}
impl TestServer {
pub fn http_base_url(&self) -> &str {
&self.base_url
}
pub async fn shutdown(self) {
let _ = self.shutdown_tx.send(());
let _ = self.handle.await;
}
}
pub async fn seeded_state_empty() -> QueryState {
let container = Postgres::default()
.with_user("kyma")
.with_password("kyma_dev")
.with_db_name("kyma")
.with_name("pgvector/pgvector")
.with_tag("pg16")
.start()
.await
.expect("testcontainers: failed to start Postgres");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("testcontainers: failed to get mapped port");
let url = format!("postgres://kyma:kyma_dev@localhost:{port}/kyma");
std::env::set_var("KYMA_TEST_DATABASE_URL", &url);
let pg_catalog = PostgresCatalog::connect(&url)
.await
.expect("catalog connect + migrate");
let pg_pool = Arc::new(pg_catalog.pool().clone());
let catalog: Arc<dyn Catalog> = Arc::new(pg_catalog);
let store = Arc::new(InMemory::new());
let format = Arc::new(TelemetryFormat::new(store, "kyma-test"));
std::mem::forget(container);
QueryState {
catalog,
format,
schema_cache: Arc::new(SchemaCache::default()),
node_id: None,
pg_pool: Some(pg_pool),
}
}
pub async fn seeded_state_with_obs_otel_logs() -> QueryState {
let container = Postgres::default()
.with_user("kyma")
.with_password("kyma_dev")
.with_db_name("kyma")
.with_name("pgvector/pgvector")
.with_tag("pg16")
.start()
.await
.expect("testcontainers: failed to start Postgres");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("testcontainers: failed to get mapped port");
let url = format!("postgres://kyma:kyma_dev@localhost:{port}/kyma");
std::env::set_var("KYMA_TEST_DATABASE_URL", &url);
let pg_catalog = PostgresCatalog::connect(&url)
.await
.expect("catalog connect + migrate");
let pg_pool = Arc::new(pg_catalog.pool().clone());
let catalog: Arc<dyn Catalog> = Arc::new(pg_catalog);
let db_id = catalog
.create_database("obs")
.await
.expect("create_database obs");
let schema = Arc::new(Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("service.name", DataType::Utf8, false),
Field::new("severity_text", DataType::Utf8, true),
]));
catalog
.create_table(db_id, "otel_logs", schema, TableConfig::default())
.await
.expect("create_table otel_logs");
let store = Arc::new(InMemory::new());
let format = Arc::new(TelemetryFormat::new(store, "kyma-test"));
std::mem::forget(container);
QueryState {
catalog,
format,
schema_cache: Arc::new(SchemaCache::default()),
node_id: None,
pg_pool: Some(pg_pool),
}
}
pub async fn seeded_state_two_databases() -> QueryState {
let state = seeded_state_with_obs_otel_logs().await;
let db_id = state
.catalog
.create_database("stg")
.await
.expect("create_database stg");
let schema = Arc::new(Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("status", DataType::Int64, false),
Field::new("path", DataType::Utf8, false),
]));
state
.catalog
.create_table(db_id, "http_reqs", schema, TableConfig::default())
.await
.expect("create_table stg.http_reqs");
state
}
#[cfg(feature = "web-ui")]
pub async fn start_test_server_with_seeded_data() -> TestServer {
let state = seeded_state_with_obs_otel_logs().await;
let backend: std::sync::Arc<dyn crate::auth::AuthBackend> = std::sync::Arc::new(
crate::auth::EnvAuthBackend::from_str("test-read-token:read"),
);
let query_router = crate::router(state.clone()).layer(axum::middleware::from_fn_with_state(
crate::auth::AuthLayerState {
backend: backend.clone(),
required: crate::auth::Role::Read,
},
crate::auth::require_role_middleware,
));
let flight_router =
crate::flight_web_router(state).layer(axum::middleware::from_fn_with_state(
crate::auth::AuthLayerState {
backend: backend.clone(),
required: crate::auth::Role::Read,
},
crate::auth::require_role_middleware,
));
let app = query_router.merge(flight_router);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind ephemeral port");
let addr = listener.local_addr().expect("local_addr");
let base_url = format!("http://{addr}");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let handle = tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await
.ok();
});
TestServer {
base_url,
shutdown_tx,
handle,
}
}