use arrow_ipc::CompressionType;
use futures::{Stream, StreamExt};
use reqwest::Response;
use crate::{Result, arrow::SendableRecordBatchStream};
use super::db::ServerVersion;
pub fn stream_as_ipc(
data: SendableRecordBatchStream,
) -> Result<impl Stream<Item = Result<bytes::Bytes>>> {
let options = arrow_ipc::writer::IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
const WRITE_BUF_SIZE: usize = 4096;
let buf = Vec::with_capacity(WRITE_BUF_SIZE);
let writer =
arrow_ipc::writer::StreamWriter::try_new_with_options(buf, &data.schema(), options)?;
let stream = futures::stream::try_unfold(
(data, writer, false),
move |(mut data, mut writer, finished)| async move {
if finished {
return Ok(None);
}
match data.next().await {
Some(Ok(batch)) => {
writer.write(&batch)?;
let buffer = std::mem::take(writer.get_mut());
Ok(Some((bytes::Bytes::from(buffer), (data, writer, false))))
}
Some(Err(e)) => Err(e),
None => {
writer.finish()?;
let buffer = std::mem::take(writer.get_mut());
Ok(Some((bytes::Bytes::from(buffer), (data, writer, true))))
}
}
},
);
Ok(stream)
}
pub fn stream_as_body(data: SendableRecordBatchStream) -> Result<reqwest::Body> {
let stream = stream_as_ipc(data)?;
Ok(reqwest::Body::wrap_stream(stream))
}
pub fn parse_server_version(req_id: &str, rsp: &Response) -> Result<ServerVersion> {
let version = rsp
.headers()
.get("phalanx-version")
.map(|v| {
let v = v.to_str().map_err(|e| crate::Error::Http {
source: e.into(),
request_id: req_id.to_string(),
status_code: Some(rsp.status()),
})?;
ServerVersion::parse(v).map_err(|e| crate::Error::Http {
source: e.into(),
request_id: req_id.to_string(),
status_code: Some(rsp.status()),
})
})
.transpose()?
.unwrap_or_default();
Ok(version)
}