use anyhow::{anyhow, Ok, Result};
use arrow::array::RecordBatch;
use futures::stream::StreamExt;
use log::info;
use tokio::sync::mpsc;
use crate::RequestParams;
use hyper::Uri;
use istziio_client::client_api::RequestId;
use lazy_static::lazy_static;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use reqwest::{Client, Response, Url};
use std::fs::File;
use std::io::Write;
use std::{collections::HashMap, time::Instant};
use tempfile::tempdir;
use tokio::sync::mpsc::channel;
use istziio_client::client_api::{DataRequest, StorageClient, StorageRequest, TableId};
const BATCH_SIZE: usize = 1024 * 8;
const CHANNEL_CAPACITY: usize = 32;
const PARAM_BUCKET_KEY: &str = "bucket";
const PARAM_KEYS_KEY: &str = "keys";
const PARAM_REQUEST_ID_KEY: &str = "request-id";
lazy_static! {
static ref TABLE_FILE_MAP: HashMap<TableId, String> = {
let mut m = HashMap::new();
m.insert(0, "userdata1.parquet".to_string());
for i in 1..=9 {
m.insert(i, format!("1m/random_data_1m_{}.parquet", i));
}
for i in 10..=19 {
m.insert(i, format!("100m/random_data_100m_{}.parquet", i - 10));
}
m
};
}
pub struct StorageClientImpl {
storage_server_endpoint: Uri,
_catalog_server_endpoint: Uri,
}
impl StorageClientImpl {
pub fn new(
storage_server_endpoint_str: &str,
catalog_server_endpoint_str: &str,
) -> Result<Self> {
let storage_server_endpoint = storage_server_endpoint_str.parse::<Uri>().map_err(|_| {
anyhow!(
"cannot resolve storage server endpoint: {}",
storage_server_endpoint_str
)
})?;
let catalog_server_endpoint = catalog_server_endpoint_str.parse::<Uri>().map_err(|_| {
anyhow!(
"cannot resolve catalog server endpoint: {}",
catalog_server_endpoint_str
)
})?;
Ok(Self {
storage_server_endpoint,
_catalog_server_endpoint: catalog_server_endpoint,
})
}
async fn get_info_from_catalog(&self, request: StorageRequest) -> Result<RequestParams> {
let bucket = "parpulse-test".to_string();
let table_id = match request.data_request() {
DataRequest::Table(id) => *id,
_ => {
return Err(anyhow!("Only table request is supported."));
}
};
let keys = vec![TABLE_FILE_MAP.get(&table_id).unwrap().to_string()];
Ok(RequestParams::S3((bucket, keys)))
}
async fn get_data_from_response(
response: Response,
request_id: RequestId,
) -> Result<mpsc::Receiver<RecordBatch>> {
if response.status().is_success() {
let poll_data_start = Instant::now();
let temp_dir = tempdir()?;
let file_path = temp_dir.path().join("tmp.parquet");
let mut file = File::create(&file_path)?;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk)?;
}
let poll_data_time = poll_data_start.elapsed();
info!(
"[Parpulse Timer] client pull data time for request {}: {:?}",
request_id, poll_data_time
);
let file = File::open(file_path)?;
let builder =
ParquetRecordBatchReaderBuilder::try_new(file)?.with_batch_size(BATCH_SIZE);
let mask = ProjectionMask::all();
let mut reader = builder.with_projection(mask).build()?;
let (tx, rx) = channel(CHANNEL_CAPACITY);
tokio::spawn(async move {
let decode_start = Instant::now();
while let Some(core::result::Result::Ok(rb)) = reader.next() {
tx.send(rb).await.unwrap();
}
let decode_time = decode_start.elapsed();
info!(
"[Parpulse Timer] client decode time for request {}: {:?}",
request_id, decode_time
);
});
Ok(rx)
} else {
Err(anyhow::anyhow!(
"Failed to download file. Response: {:?}, Body: {}",
response.status(),
response
.text()
.await
.unwrap_or_else(|_| String::from("Failed to read response body"))
))
}
}
async fn get_info_from_catalog_test(&self, request: StorageRequest) -> Result<RequestParams> {
let bucket = "tests-parquet".to_string();
let table_id = match request.data_request() {
DataRequest::Table(id) => id,
_ => {
return Err(anyhow!("Only table request is supported."));
}
};
let keys = vec![TABLE_FILE_MAP.get(table_id).unwrap().to_string()];
Ok(RequestParams::MockS3((bucket, keys)))
}
fn get_request_url_and_params(
&self,
location: (String, Vec<String>),
request_id: RequestId,
) -> Result<(String, Vec<(&str, String)>)> {
let scheme = self
.storage_server_endpoint
.scheme()
.ok_or_else(|| anyhow!("Failed to get the scheme of the storage server endpoint."))?
.to_owned();
let authority = self
.storage_server_endpoint
.authority()
.ok_or_else(|| anyhow!("Failed to get the authority of the storage server endpoint."))?
.to_owned();
let path = "/file";
let url = Uri::builder()
.scheme(scheme)
.authority(authority)
.path_and_query(path)
.build()
.unwrap();
let params = vec![
(PARAM_BUCKET_KEY, location.0),
(PARAM_KEYS_KEY, location.1.join(",")),
(PARAM_REQUEST_ID_KEY, request_id.to_string()),
];
Ok((url.to_string(), params))
}
pub async fn request_data_test(
&self,
request: StorageRequest,
) -> Result<mpsc::Receiver<RecordBatch>> {
let request_id = request.request_id();
let location = match self.get_info_from_catalog_test(request).await? {
RequestParams::MockS3(location) => location,
_ => {
return Err(anyhow!(
"Failed to get location of the file from the catalog server."
));
}
};
let client = Client::new();
let (url, mut params) = self.get_request_url_and_params(location, request_id)?;
params.push(("is_test", "true".to_owned()));
let url = Url::parse_with_params(&url, params)?;
let response = client.get(url).send().await?;
Self::get_data_from_response(response, request_id).await
}
}
#[async_trait::async_trait]
impl StorageClient for StorageClientImpl {
async fn request_data(&self, request: StorageRequest) -> Result<mpsc::Receiver<RecordBatch>> {
let request_id = request.request_id();
let location = match self.get_info_from_catalog(request.clone()).await? {
RequestParams::S3(location) => location,
_ => {
return Err(anyhow!(
"Failed to get location of the file from the catalog server."
));
}
};
let client_init_start = Instant::now();
let client = Client::new();
let client_init_time = client_init_start.elapsed();
info!(
"[Parpulse Timer] client init time for request {}: {:?}",
request_id, client_init_time
);
let (url, params) = self.get_request_url_and_params(location, request_id)?;
let url = Url::parse_with_params(&url, params)?;
let get_response_start = Instant::now();
let response = client.get(url).send().await?;
let get_response_time = get_response_start.elapsed();
info!(
"[Parpulse Timer] client get response time for request {}: {:?}",
request_id, get_response_time
);
Self::get_data_from_response(response, request_id).await
}
async fn request_data_sync(&self, _request: StorageRequest) -> Result<Vec<RecordBatch>> {
todo!()
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::StringArray;
use mockito::Server;
#[tokio::test]
async fn test_storage_client_disk() {
let mut server = Server::new_async().await;
println!("server host: {}", server.host_with_port());
server
.mock(
"GET",
"/file?bucket=tests-parquet&keys=userdata1.parquet&is_test=true",
)
.with_body_from_file("../storage-node/tests/parquet/userdata1.parquet")
.create_async()
.await;
let server_endpoint = server.url() + "/";
let storage_client = StorageClientImpl::new(&server_endpoint, "localhost:3031")
.expect("Failed to create storage client.");
let request = StorageRequest::new(0, DataRequest::Table(0));
let mut receiver = storage_client
.request_data_test(request)
.await
.expect("Failed to get data from the server.");
let mut record_batches = vec![];
while let Some(record_batch) = receiver.recv().await {
record_batches.push(record_batch);
}
assert!(!record_batches.is_empty());
let first_batch = &record_batches[0];
assert_eq!(first_batch.num_columns(), 13);
let real_first_names = StringArray::from(vec!["Amanda", "Albert", "Evelyn"]);
let read_last_names = StringArray::from(vec!["Jordan", "Freeman", "Morgan"]);
let first_names = first_batch
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let last_names = first_batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..3 {
assert_eq!(first_names.value(i), real_first_names.value(i));
assert_eq!(last_names.value(i), read_last_names.value(i));
}
}
}