use arrow_schema::{DataType, Field, Schema, TimeUnit};
use axum::body::Body;
use axum::http::{Request, StatusCode};
use kyma_catalog::PostgresCatalog;
use kyma_core::catalog::{Catalog, TableConfig};
use kyma_core::tenant::DEFAULT_TENANT;
use serde_json::Value;
use std::sync::Arc;
use testcontainers::{runners::AsyncRunner, ImageExt};
use tower::ServiceExt;
fn cleanup_write_app(
catalog: Arc<dyn Catalog>,
) -> impl tower::Service<
axum::http::Request<axum::body::Body>,
Response = axum::http::Response<axum::body::Body>,
Error = std::convert::Infallible,
Future = impl std::future::Future<
Output = Result<axum::http::Response<axum::body::Body>, std::convert::Infallible>,
>,
> {
let backend: std::sync::Arc<dyn kyma_server::auth::AuthBackend> = std::sync::Arc::new(
kyma_server::auth::EnvAuthBackend::from_str("test-write-token:write"),
);
kyma_server::cleanup_write_router(catalog).layer(axum::middleware::from_fn_with_state(
kyma_server::auth::AuthLayerState {
backend,
required: kyma_server::auth::Role::Write,
},
kyma_server::auth::require_role_middleware,
))
}
async fn post_cleanup<S>(
app: S,
db: &str,
table: &str,
before: &str,
) -> axum::http::Response<Body>
where
S: tower::Service<
Request<Body>,
Response = axum::http::Response<Body>,
Error = std::convert::Infallible,
>,
{
let before_enc = before.replace(':', "%3A").replace('+', "%2B");
let uri = format!("/v1/database/{db}/table/{table}/cleanup?before={before_enc}");
let req = Request::builder()
.method("POST")
.uri(uri)
.header("authorization", "Bearer test-write-token")
.body(Body::empty())
.unwrap();
app.oneshot(req).await.unwrap()
}
async fn body_json(resp: axum::http::Response<Body>) -> Value {
let bytes = axum::body::to_bytes(resp.into_body(), 1 << 20)
.await
.unwrap();
serde_json::from_slice(&bytes).unwrap()
}
async fn raw_catalog() -> (PostgresCatalog, testcontainers::ContainerAsync<testcontainers_modules::postgres::Postgres>) {
let container = testcontainers_modules::postgres::Postgres::default()
.with_user("kyma")
.with_password("kyma_dev")
.with_db_name("kyma")
.with_name("pgvector/pgvector")
.with_tag("pg16")
.start()
.await
.expect("testcontainers: failed to start Postgres");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("testcontainers: failed to get mapped port");
let url = format!("postgres://kyma:kyma_dev@localhost:{port}/kyma");
let catalog = PostgresCatalog::connect(&url)
.await
.expect("catalog connect + migrate");
(catalog, container)
}
fn sample_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("body", DataType::Utf8, true),
]))
}
#[tokio::test]
async fn cleanup_removes_soft_deleted_extents_and_returns_counts() {
let (catalog, _container) = raw_catalog().await;
let db_id = catalog
.create_database("testdb")
.await
.expect("create_database");
let table_id = catalog
.create_table(db_id, "testtable", sample_schema(), TableConfig::default())
.await
.expect("create_table");
let pool = catalog.pool();
let table_uuid = *table_id.as_uuid();
let schema_snap_uuid: uuid::Uuid = sqlx::query_scalar(
"SELECT schema_snapshot_id FROM tables WHERE id = $1",
)
.bind(table_uuid)
.fetch_one(pool)
.await
.expect("fetch schema_snapshot_id");
sqlx::query(
"INSERT INTO extents (tenant_id, table_id, schema_snapshot_id, object_path, byte_size, row_count, deleted_at)
VALUES ($1, $2, $3, 'test/path/old.kyma', 4096, 200, now() - interval '2 days')",
)
.bind(DEFAULT_TENANT.as_uuid())
.bind(table_uuid)
.bind(schema_snap_uuid)
.execute(pool)
.await
.expect("insert soft-deleted extent");
sqlx::query(
"INSERT INTO extents (tenant_id, table_id, schema_snapshot_id, object_path, byte_size, row_count)
VALUES ($1, $2, $3, 'test/path/live.kyma', 2048, 50)",
)
.bind(DEFAULT_TENANT.as_uuid())
.bind(table_uuid)
.bind(schema_snap_uuid)
.execute(pool)
.await
.expect("insert live extent");
let pool_for_verify = catalog.pool().clone();
let catalog_arc: Arc<dyn Catalog> = Arc::new(catalog);
let app = cleanup_write_app(catalog_arc.clone());
let resp = post_cleanup(app, "testdb", "testtable", "2030-01-01T00:00:00Z").await;
assert_eq!(resp.status(), StatusCode::OK, "expected 200 OK from cleanup");
let body = body_json(resp).await;
assert_eq!(
body["extents_deleted"].as_u64().unwrap(),
1,
"exactly 1 extent should be deleted"
);
assert_eq!(
body["rows_freed"].as_u64().unwrap(),
200,
"rows_freed should equal the deleted extent's row_count"
);
assert_eq!(
body["bytes_freed"].as_u64().unwrap(),
4096,
"bytes_freed should equal the deleted extent's byte_size"
);
let remaining: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM extents WHERE table_id = $1")
.bind(table_uuid)
.fetch_one(&pool_for_verify)
.await
.expect("count remaining extents");
assert_eq!(remaining, 1, "the live extent must still exist");
}
#[tokio::test]
async fn cleanup_respects_before_cutoff() {
let (catalog, _container) = raw_catalog().await;
let db_id = catalog.create_database("db2").await.expect("create_database");
let table_id = catalog
.create_table(db_id, "tbl2", sample_schema(), TableConfig::default())
.await
.expect("create_table");
let pool = catalog.pool();
let table_uuid = *table_id.as_uuid();
let schema_snap_uuid: uuid::Uuid =
sqlx::query_scalar("SELECT schema_snapshot_id FROM tables WHERE id = $1")
.bind(table_uuid)
.fetch_one(pool)
.await
.expect("fetch schema_snapshot_id");
sqlx::query(
"INSERT INTO extents (tenant_id, table_id, schema_snapshot_id, object_path, byte_size, row_count, deleted_at)
VALUES ($1, $2, $3, 'test/path/recent.kyma', 512, 10, now() - interval '1 hour')",
)
.bind(DEFAULT_TENANT.as_uuid())
.bind(table_uuid)
.bind(schema_snap_uuid)
.execute(pool)
.await
.expect("insert recent soft-deleted extent");
let catalog_arc: Arc<dyn Catalog> = Arc::new(catalog);
let app = cleanup_write_app(catalog_arc.clone());
let two_days_ago = (chrono::Utc::now() - chrono::Duration::days(2))
.format("%Y-%m-%dT%H:%M:%SZ")
.to_string();
let resp = post_cleanup(app, "db2", "tbl2", &two_days_ago).await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(
body["extents_deleted"].as_u64().unwrap(),
0,
"no extents should be cleaned when before < deleted_at"
);
}
#[tokio::test]
async fn cleanup_unknown_table_returns_404() {
let state = kyma_server::test_support::seeded_state_empty().await;
let app = cleanup_write_app(state.catalog.clone());
let resp = post_cleanup(app, "no_such_db", "no_such_table", "2030-01-01T00:00:00Z").await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn cleanup_missing_auth_returns_401() {
let state = kyma_server::test_support::seeded_state_empty().await;
let backend: std::sync::Arc<dyn kyma_server::auth::AuthBackend> = std::sync::Arc::new(
kyma_server::auth::EnvAuthBackend::from_str("test-write-token:write"),
);
let app = kyma_server::cleanup_write_router(state.catalog.clone()).layer(
axum::middleware::from_fn_with_state(
kyma_server::auth::AuthLayerState {
backend,
required: kyma_server::auth::Role::Write,
},
kyma_server::auth::require_role_middleware,
),
);
let req = Request::builder()
.method("POST")
.uri("/v1/database/db/table/tbl/cleanup?before=2030-01-01T00:00:00Z")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}