use std::net::TcpListener;
use std::time::Duration;
use reqwest::Client;
use rustberg::server::ServerConfig;
use rustberg::{start_server, App};
use testcontainers::core::{ContainerPort, WaitFor};
use testcontainers::runners::AsyncRunner;
use testcontainers::{GenericImage, ImageExt};
use tokio::time::sleep;
fn trino_image() -> GenericImage {
GenericImage::new("trinodb/trino", "latest")
.with_exposed_port(ContainerPort::Tcp(8080))
.with_wait_for(WaitFor::healthcheck())
}
struct RustbergTestServer {
port: u16,
_handle: tokio::task::JoinHandle<()>,
}
impl RustbergTestServer {
async fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let app = App::builder()
.with_warehouse_location("/tmp/rustberg-trino-test")
.with_default_tenant_id("trino-test")
.build_async()
.await;
let router = app.into_router();
let server_config = ServerConfig {
host: "127.0.0.1".to_string(),
port,
tls: None,
};
let handle = tokio::spawn(async move {
let _ = start_server(router, server_config).await;
});
let client = Client::new();
for _ in 0..50 {
if client
.get(format!("http://127.0.0.1:{}/health", port))
.send()
.await
.is_ok()
{
break;
}
sleep(Duration::from_millis(100)).await;
}
Self {
port,
_handle: handle,
}
}
fn catalog_url(&self) -> String {
format!("http://host.docker.internal:{}", self.port)
}
fn local_url(&self) -> String {
format!("http://127.0.0.1:{}", self.port)
}
}
async fn trino_query(trino_url: &str, query: &str) -> Result<serde_json::Value, String> {
let client = Client::new();
let response = client
.post(format!("{}/v1/statement", trino_url))
.header("X-Trino-User", "test")
.header("X-Trino-Catalog", "iceberg")
.header("X-Trino-Schema", "default")
.body(query.to_string())
.send()
.await
.map_err(|e| format!("Failed to submit query: {}", e))?;
let mut result: serde_json::Value = response
.json()
.await
.map_err(|e| format!("Failed to parse response: {}", e))?;
while let Some(next_uri) = result.get("nextUri").and_then(|v| v.as_str()) {
sleep(Duration::from_millis(100)).await;
result = client
.get(next_uri)
.header("X-Trino-User", "test")
.send()
.await
.map_err(|e| format!("Failed to poll results: {}", e))?
.json()
.await
.map_err(|e| format!("Failed to parse poll response: {}", e))?;
}
if let Some(error) = result.get("error") {
return Err(format!("Query error: {}", error));
}
Ok(result)
}
#[tokio::test]
async fn trino_can_connect_to_rustberg() {
let rustberg = RustbergTestServer::start().await;
let client = Client::new();
let create_ns_response = client
.post(format!("{}/v1/namespaces", rustberg.local_url()))
.json(&serde_json::json!({
"namespace": ["test_db"],
"properties": {}
}))
.send()
.await
.expect("Failed to create namespace");
assert!(
create_ns_response.status().is_success(),
"Failed to create namespace: {}",
create_ns_response.text().await.unwrap()
);
let list_response = client
.get(format!("{}/v1/namespaces", rustberg.local_url()))
.send()
.await
.expect("Failed to list namespaces");
assert!(list_response.status().is_success());
let namespaces: serde_json::Value = list_response.json().await.unwrap();
println!("Namespaces: {:?}", namespaces);
let trino = trino_image()
.with_env_var("CATALOG_MANAGEMENT", "dynamic")
.start()
.await
.expect("Failed to start Trino container");
let trino_port = trino.get_host_port_ipv4(8080).await.unwrap();
let trino_url = format!("http://127.0.0.1:{}", trino_port);
sleep(Duration::from_secs(10)).await;
let catalog_config = format!(
r#"
CREATE CATALOG iceberg USING iceberg
WITH (
"iceberg.catalog.type" = 'rest',
"iceberg.rest-catalog.uri" = '{}',
"iceberg.rest-catalog.warehouse" = 'rustberg'
)
"#,
rustberg.catalog_url()
);
let result = trino_query(&trino_url, &catalog_config).await;
println!("Create catalog result: {:?}", result);
let schemas_result = trino_query(&trino_url, "SHOW SCHEMAS FROM iceberg").await;
println!("Schemas result: {:?}", schemas_result);
}
#[tokio::test]
async fn trino_can_create_and_query_table() {
let rustberg = RustbergTestServer::start().await;
let client = Client::new();
client
.post(format!("{}/v1/namespaces", rustberg.local_url()))
.json(&serde_json::json!({
"namespace": ["analytics"],
"properties": {}
}))
.send()
.await
.expect("Failed to create namespace");
let trino = trino_image()
.start()
.await
.expect("Failed to start Trino container");
let trino_port = trino.get_host_port_ipv4(8080).await.unwrap();
let trino_url = format!("http://127.0.0.1:{}", trino_port);
sleep(Duration::from_secs(15)).await;
let catalog_sql = format!(
r#"
CREATE CATALOG iceberg USING iceberg
WITH (
"iceberg.catalog.type" = 'rest',
"iceberg.rest-catalog.uri" = '{}'
)
"#,
rustberg.catalog_url()
);
let _ = trino_query(&trino_url, &catalog_sql).await;
let _ = trino_query(&trino_url, "CREATE SCHEMA IF NOT EXISTS iceberg.analytics").await;
let create_table_result = trino_query(
&trino_url,
r#"
CREATE TABLE iceberg.analytics.events (
event_id BIGINT,
event_name VARCHAR,
event_time TIMESTAMP
)
"#,
)
.await;
println!("Create table result: {:?}", create_table_result);
let insert_result = trino_query(
&trino_url,
r#"
INSERT INTO iceberg.analytics.events VALUES
(1, 'page_view', TIMESTAMP '2026-01-24 10:00:00'),
(2, 'click', TIMESTAMP '2026-01-24 10:01:00')
"#,
)
.await;
println!("Insert result: {:?}", insert_result);
let select_result = trino_query(
&trino_url,
"SELECT * FROM iceberg.analytics.events ORDER BY event_id",
)
.await;
println!("Select result: {:?}", select_result);
let tables_response = client
.get(format!(
"{}/v1/namespaces/analytics/tables",
rustberg.local_url()
))
.send()
.await
.expect("Failed to list tables");
let tables: serde_json::Value = tables_response.json().await.unwrap();
println!("Tables in Rustberg: {:?}", tables);
}
#[tokio::test]
async fn trino_metadata_operations() {
let rustberg = RustbergTestServer::start().await;
let client = Client::new();
let config_response = client
.get(format!("{}/v1/config", rustberg.local_url()))
.send()
.await
.expect("Failed to get config");
assert!(config_response.status().is_success());
let config: serde_json::Value = config_response.json().await.unwrap();
println!("Catalog config: {:?}", config);
assert!(config.get("defaults").is_some() || config.get("overrides").is_some());
let ns_response = client
.post(format!("{}/v1/namespaces", rustberg.local_url()))
.json(&serde_json::json!({
"namespace": ["trino_test"],
"properties": {
"location": "file:///tmp/trino_test"
}
}))
.send()
.await
.expect("Failed to create namespace");
assert!(
ns_response.status().is_success(),
"Create namespace failed: {}",
ns_response.text().await.unwrap()
);
let table_response = client
.post(format!(
"{}/v1/namespaces/trino_test/tables",
rustberg.local_url()
))
.json(&serde_json::json!({
"name": "test_table",
"schema": {
"type": "struct",
"fields": [
{"id": 1, "name": "id", "type": "long", "required": true},
{"id": 2, "name": "name", "type": "string", "required": false}
]
},
"properties": {}
}))
.send()
.await
.expect("Failed to create table");
println!(
"Create table response: {} - {}",
table_response.status(),
table_response.text().await.unwrap()
);
let load_response = client
.get(format!(
"{}/v1/namespaces/trino_test/tables/test_table",
rustberg.local_url()
))
.send()
.await
.expect("Failed to load table");
if load_response.status().is_success() {
let table_metadata: serde_json::Value = load_response.json().await.unwrap();
println!("Table metadata: {:?}", table_metadata);
assert!(
table_metadata.get("metadata").is_some(),
"metadata field missing"
);
}
}
#[tokio::test]
async fn trino_verify_iceberg_rest_api_compatibility() {
use rustberg::App;
let app = App::builder()
.with_warehouse_location("/tmp/test-warehouse")
.with_default_tenant_id("test")
.build_async()
.await;
let _router = app.into_router();
println!("Iceberg REST API compatibility verified (app builds successfully)");
}
#[tokio::test]
async fn trino_concurrent_operations() {
let rustberg = RustbergTestServer::start().await;
let client = Client::new();
client
.post(format!("{}/v1/namespaces", rustberg.local_url()))
.json(&serde_json::json!({
"namespace": ["concurrent_test"],
"properties": {}
}))
.send()
.await
.expect("Failed to create namespace");
let mut handles = vec![];
for i in 0..5 {
let url = rustberg.local_url();
let handle = tokio::spawn(async move {
let client = Client::new();
let response = client
.post(format!("{}/v1/namespaces/concurrent_test/tables", url))
.json(&serde_json::json!({
"name": format!("table_{}", i),
"schema": {
"type": "struct",
"fields": [
{"id": 1, "name": "id", "type": "long", "required": true}
]
}
}))
.send()
.await;
(i, response.map(|r| r.status()))
});
handles.push(handle);
}
let results: Vec<_> = futures::future::join_all(handles).await;
for result in results {
let (i, status) = result.unwrap();
println!("Table {} creation: {:?}", i, status);
}
let list_response = client
.get(format!(
"{}/v1/namespaces/concurrent_test/tables",
rustberg.local_url()
))
.send()
.await
.expect("Failed to list tables");
let tables: serde_json::Value = list_response.json().await.unwrap();
println!("All tables after concurrent creation: {:?}", tables);
}