use istziio_client::client_api::{DataRequest, StorageClient, StorageRequest, TableId};
use istziio_client::storage_client::StorageClientImpl;
use std::collections::HashMap;
use std::path::Path;
use std::time::Instant;
use std::{env, fs};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
benchmark_sync().await;
clear_local_cache();
benchmark_parallel().await;
}
fn clear_local_cache() {
let home = std::env::var("HOME").unwrap();
let cache_path = format!("{}/15721-s24-cache2/client/parquet_files", home);
let _ = fs::remove_dir_all(cache_path);
}
async fn benchmark_sync() {
let home = std::env::var("HOME").unwrap();
let bench_files_path = format!("{}/15721-s24-cache2/bench_files", home);
let map = create_table_file_map(&bench_files_path).unwrap();
let server_url = env::var("SERVER_URL").unwrap();
let mut client = setup_client(map.clone(), &server_url);
let table_ids: Vec<TableId> = map.keys().cloned().collect();
let load = load_gen_allonce(table_ids.clone());
load_run(&mut client, load).await;
}
async fn benchmark_parallel() {
let home = std::env::var("HOME").unwrap();
let bench_files_path = format!("{}/15721-s24-cache2/bench_files", home);
let map = create_table_file_map(&bench_files_path).unwrap();
let server_url = env::var("SERVER_URL").unwrap();
let clients = setup_clients(map.clone(), 5, &server_url); let table_ids: Vec<TableId> = map.keys().cloned().collect();
let load = load_gen_allonce(table_ids.clone());
parallel_load_run(clients, load).await;
}
async fn parallel_load_run(clients: Vec<Box<dyn StorageClient>>, requests: Vec<StorageRequest>) {
println!("------------Start running workload [IN PARALLEL]!------------");
let start = Instant::now();
let clients_num = clients.len();
let (tx, mut rx) = mpsc::channel(32);
for (client_id, mut client) in clients.into_iter().enumerate() {
let tx = tx.clone();
let requests = requests.clone();
tokio::spawn(async move {
let client_start = Instant::now();
let req = &requests[client_id];
let table_id = match req.data_request() {
DataRequest::Table(id) => id,
_ => panic!("Invalid request type"),
};
println!(
"Client {:?} requesting data for table {:?}",
client_id, table_id
);
let res = client.request_data_sync(req.clone()).await;
assert!(res.is_ok());
println!(
"Client {:?} received data for table {:?}",
client_id, table_id
);
let client_duration = client_start.elapsed();
println!("Client {:?} time used: {:?}", client_id, client_duration);
tx.send(client_duration).await.unwrap();
});
}
for _ in 0..clients_num {
let client_duration = rx.recv().await.unwrap();
println!("Client latency: {:?}", client_duration);
}
let duration = start.elapsed();
println!("Total time used: {:?}", duration);
}
async fn load_run(client: &mut dyn StorageClient, requests: Vec<StorageRequest>) {
println!("------------Start running workload [SEQUENTIALLY]!------------");
let start = Instant::now();
for req in requests {
let id = match req.data_request() {
DataRequest::Table(id) => id.to_owned(),
_ => panic!("Invalid request type"),
};
println!("Requesting data for table {:?}", id);
let res = client.request_data_sync(req).await;
assert!(res.is_ok());
println!("Received data for table {:?}", id);
}
let duration = start.elapsed();
println!("Time used: {:?}", duration);
}
fn load_gen_allonce(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
let mut requests = Vec::new();
for (req_id, table_id) in table_ids.into_iter().enumerate() {
requests.push(StorageRequest::new(req_id, DataRequest::Table(table_id)));
}
requests
}
#[allow(dead_code)]
fn load_gen_skewed(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
let mut requests = Vec::new();
for (req_id, table_id) in table_ids.iter().enumerate() {
requests.push(StorageRequest::new(req_id, DataRequest::Table(*table_id)));
}
requests.pop();
requests.push(StorageRequest::new(
table_ids.len(),
DataRequest::Table(table_ids[0]),
));
requests
}
fn setup_client(table_file_map: HashMap<TableId, String>, server_url: &str) -> StorageClientImpl {
StorageClientImpl::new_for_test(1, table_file_map, server_url, false)
}
fn setup_clients(
table_file_map: HashMap<TableId, String>,
num_clients: usize,
server_url: &str,
) -> Vec<Box<dyn StorageClient>> {
let mut clients = Vec::new();
for i in 0..num_clients {
let client = Box::new(StorageClientImpl::new_for_test(
i,
table_file_map.clone(),
server_url,
false,
)) as Box<dyn StorageClient>;
clients.push(client);
}
clients
}
fn create_table_file_map(directory: &str) -> Result<HashMap<TableId, String>, std::io::Error> {
let mut table_file_map: HashMap<TableId, String> = HashMap::new();
let dir = Path::new(directory);
let entries = fs::read_dir(dir)?;
for (id, entry) in entries.enumerate() {
let entry = entry?;
if entry.path().is_file() {
let filename = entry.file_name().into_string().unwrap();
table_file_map.insert(id as TableId, filename);
}
}
Ok(table_file_map)
}