use anyhow::{Context, Result};
use arrow_flight::{decode::FlightRecordBatchStream, sql::client::FlightSqlServiceClient};
use datafusion::arrow::array::RecordBatch;
use futures::stream::StreamExt;
use micromegas_analytics::time::TimeRange;
use tonic::transport::Channel;
pub struct Client {
inner: FlightSqlServiceClient<Channel>,
}
impl Client {
pub fn new(channel: Channel) -> Self {
let inner = FlightSqlServiceClient::new(channel);
Self { inner }
}
pub fn inner_mut(&mut self) -> &mut FlightSqlServiceClient<Channel> {
&mut self.inner
}
fn set_query_range(&mut self, query_range: Option<TimeRange>) {
self.inner.set_header(
"query_range_begin",
query_range.map_or(String::from(""), |r| r.begin.to_rfc3339()),
);
self.inner.set_header(
"query_range_end",
query_range.map_or(String::from(""), |r| r.end.to_rfc3339()),
);
}
pub async fn query(
&mut self,
sql: String,
query_range: Option<TimeRange>,
) -> Result<Vec<RecordBatch>> {
self.set_query_range(query_range);
let info = self.inner.execute(sql, None).await?;
let ticket = info.endpoint[0]
.ticket
.clone()
.with_context(|| "reading ticket from endpoint")?;
let flight_data_stream = self.inner.do_get(ticket).await?.into_inner();
let mut record_batch_stream = FlightRecordBatchStream::new(flight_data_stream);
let mut batches = vec![];
while let Some(batch_res) = record_batch_stream.next().await {
batches.push(batch_res?);
}
Ok(batches)
}
}