use std::borrow::Cow;
use testcontainers::{core::WaitFor, Image};
const NAME: &str = "voltrondata/flight-sql";
const TAG: &str = "v1.4.1-slim";
#[derive(Clone, Debug, Default)]
pub struct ArrowFlightSQL {}
impl Image for ArrowFlightSQL {
fn name(&self) -> &str {
NAME
}
fn tag(&self) -> &str {
TAG
}
fn ready_conditions(&self) -> Vec<WaitFor> {
vec![WaitFor::message_on_stdout("Flight SQL server - started")]
}
fn env_vars(
&self,
) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
[
("FLIGHT_PASSWORD", "test"),
("DATABASE_FILENAME", "test.duckdb"),
]
}
}
#[cfg(test)]
mod tests {
use arrow_flight::{
flight_service_client::FlightServiceClient, sql::client::FlightSqlServiceClient,
};
use futures::TryStreamExt;
use testcontainers::runners::AsyncRunner;
use crate::arrow_flightsql::ArrowFlightSQL;
#[tokio::test]
async fn arrow_flightsql_select_version() -> Result<(), Box<dyn std::error::Error + 'static>> {
let image = ArrowFlightSQL::default();
let instance = image.start().await?;
let host = instance.get_host().await?;
let port = instance.get_host_port_ipv4(31337).await?;
let url = format!("http://{host}:{port}");
let service_client = FlightServiceClient::connect(url).await?;
let mut client = FlightSqlServiceClient::new_from_inner(service_client);
let _ = client.handshake("flight_username", "test").await?;
let mut statement = client
.prepare("SELECT VERSION();".to_string(), None)
.await?;
let flight_info = statement.execute().await?;
let ticket = flight_info.endpoint[0]
.ticket
.as_ref()
.expect("Ticket not present")
.clone();
let flight_data = client.do_get(ticket).await?;
let batches: Vec<_> = flight_data.try_collect().await?;
let batch = batches.first().expect("batch 0 not present");
let array = batch.columns().first().expect("column not present");
let data = array.to_data();
let buffers = data.buffers();
let buffer = buffers.get(1).expect("buffer not present");
let values = buffer.to_vec();
let version = String::from_utf8(values)?;
assert_eq!(version, "v1.0.0");
Ok(())
}
}