use std::collections::{HashMap, HashSet};
use futures::StreamExt;
use pancake_db_idl::ddl::{AlterTableRequest, CreateTableRequest, DropTableRequest, GetSchemaRequest};
use pancake_db_idl::ddl::create_table_request::SchemaMode;
use pancake_db_idl::dml::{DeleteFromSegmentRequest, FieldValue, ListSegmentsRequest, Row, WriteToPartitionRequest};
use pancake_db_idl::dml::field_value::Value;
use pancake_db_idl::dtype::DataType;
use pancake_db_idl::schema::{ColumnMeta, Schema};
use rand::Rng;
use structopt::StructOpt;
use tokio;
use tokio::time::Duration;
use tonic::Code;
use pancake_db_client::{Client, SegmentKey};
use pancake_db_client::errors::{ClientError, ClientErrorKind, ClientResult};
const TABLE_NAME: &str = "fuzz_test_table";
const BATCH_SIZE: usize = 250;
#[derive(Clone, Debug, StructOpt)]
#[structopt(name = "Fuzz Test")]
pub struct Opt {
#[structopt(long, default_value = "2")]
pub small_n_rows: usize,
#[structopt(long, default_value = "10000")]
pub big_n_rows: usize,
#[structopt(long, default_value = "10")]
pub max_deletions_per_req: usize,
#[structopt(long, default_value = "12")]
pub compaction_wait_time: u64,
#[structopt(long, default_value = "10")]
pub num_evolutions: usize,
}
impl Opt {
pub fn validate(&self) {
if self.big_n_rows % BATCH_SIZE != 0 {
panic!("please make big-n-rows a multiple of {}", BATCH_SIZE)
}
}
}
#[tokio::main]
async fn main() -> ClientResult<()> {
let mut client = Client::connect("http://localhost:3842").await?;
let drop_table_res = client.drop_table(DropTableRequest {
table_name: TABLE_NAME.to_string(),
..Default::default()
}).await;
match drop_table_res {
Ok(resp) => {
println!("Dropped existing table: {:?}", resp);
Ok(())
},
Err(err) => {
match err.kind {
ClientErrorKind::Grpc { code: Code::NotFound } => Ok(()),
_ => Err(err),
}
}
}?;
let mut initial_columns = HashMap::new();
initial_columns.insert(
"col_0".to_string(),
ColumnMeta {
dtype: DataType::Int64 as i32,
..Default::default()
}
);
let mut schema = Schema {
columns: initial_columns,
..Default::default()
};
let create_table_req = CreateTableRequest {
table_name: TABLE_NAME.to_string(),
schema: Some(schema.clone()),
..Default::default()
};
let create_resp = client.create_table(create_table_req).await?;
println!("Created table for the first time: {:?}", create_resp);
let opt: Opt = Opt::from_args();
let mut row_counts = vec![0, 0]; let mut n_deletions_ub = 0;
for iter in 1..opt.num_evolutions + 1 {
iterate(iter, &mut schema, &opt, client.clone(), &mut row_counts, &mut n_deletions_ub).await?;
}
Ok(())
}
async fn iterate(i: usize, schema: &mut Schema, opt: &Opt, client: Client, row_counts: &mut Vec<usize>, n_deletions_ub: &mut usize) -> ClientResult<()> {
evolve_schema(i, schema, client.clone()).await?;
let write_rows_future = write_rows(i, opt, client.clone(), row_counts);
if i > 1 {
let delete_future = delete(opt, client.clone(), n_deletions_ub);
let (write_rows_res, delete_res) = tokio::join!(
write_rows_future,
delete_future,
);
write_rows_res?;
delete_res?;
} else {
write_rows_future.await?;
}
assert_reads(i, client, row_counts, *n_deletions_ub).await?;
Ok(())
}
async fn evolve_schema(i: usize, schema: &mut Schema, mut client: Client) -> ClientResult<()> {
let new_column_name = format!("col_{}", i);
let new_column = ColumnMeta {
dtype: DataType::Int64 as i32,
..Default::default()
};
schema.columns.insert(new_column_name.clone(), new_column.clone());
let mut rng = rand::thread_rng();
if rng.gen_bool(0.5) {
let mut new_columns = HashMap::new();
new_columns.insert(new_column_name, new_column);
let alter_req = AlterTableRequest {
table_name: TABLE_NAME.to_string(),
new_columns,
..Default::default()
};
println!("altering table: {:?}", alter_req);
client.alter_table(alter_req).await?;
} else {
let create_req = CreateTableRequest {
table_name: TABLE_NAME.to_string(),
schema: Some(schema.clone()),
mode: SchemaMode::AddNewColumns as i32,
..Default::default()
};
println!("declaratively creating table: {:?}", create_req);
client.create_table(create_req).await?;
}
let get_schema_req = GetSchemaRequest {
table_name: TABLE_NAME.to_string(),
..Default::default()
};
let resp = client.get_schema(get_schema_req)
.await
.expect("getting schema failed");
let resp_schema = resp.schema.unwrap();
if &resp_schema != schema {
return Err(ClientError::other(format!(
"schema mismatch; expected {:?} but DB responded {:?}",
schema,
resp_schema
)));
}
Ok(())
}
async fn write_rows(i: usize, opt: &Opt, client: Client, row_counts: &mut Vec<usize>) -> ClientResult<()> {
let mut rng = rand::thread_rng();
let last_row_count = *row_counts.last().unwrap();
let small_write = rng.gen_bool(0.5);
let (n_batches, n_rows_per_batch) = if small_write {
(1, opt.small_n_rows)
} else {
(opt.big_n_rows / BATCH_SIZE, BATCH_SIZE)
};
let mut rows = Vec::with_capacity(n_rows_per_batch);
for _ in 0..n_rows_per_batch {
let mut row = Row::default();
for col_idx in 0..i + 1 {
if rng.gen_bool(0.5) {
row.fields.insert(
format!("col_{}", col_idx),
FieldValue {
value: Some(Value::Int64Val(rng.gen())),
..Default::default()
}
);
}
}
rows.push(row);
}
let write_to_partition_req = WriteToPartitionRequest {
table_name: TABLE_NAME.to_string(),
rows,
..Default::default()
};
if small_write {
println!("writing {} rows: {:?}", opt.small_n_rows, write_to_partition_req.rows);
} else {
println!("writing {} rows in {} concurrent batches", opt.big_n_rows, n_batches);
}
let max_concurrency = 16;
futures::stream::repeat(0).take(n_batches) .for_each_concurrent(
max_concurrency,
|_| async {
client.clone().write_to_partition(write_to_partition_req.clone())
.await
.expect("write failed");
}
)
.await;
println!("done with writes");
row_counts.push(last_row_count + n_batches * n_rows_per_batch);
if !small_write && rng.gen_bool(0.5) {
println!("waiting {} seconds for compaction to settle...", opt.compaction_wait_time);
tokio::time::sleep(Duration::from_secs(opt.compaction_wait_time)).await;
}
Ok(())
}
async fn delete(opt: &Opt, mut client: Client, n_deletions_ub: &mut usize) -> ClientResult<()> {
println!("listing segments for deletion");
let list_segments_response = client.list_segments(ListSegmentsRequest {
table_name: TABLE_NAME.to_string(),
include_metadata: true,
..Default::default()
}).await?;
let mut rng = rand::thread_rng();
let segments = &list_segments_response.segments;
let segment = &segments[rng.gen_range(0..segments.len())];
let mut to_delete = Vec::new();
for _ in 0..opt.max_deletions_per_req {
let metadata = segment.metadata.clone().unwrap();
let row_id_range = metadata.row_count; to_delete.push(rng.gen_range(0..row_id_range) as u32);
}
let distinct_to_delete: HashSet<_> = to_delete.iter().cloned().collect();
println!("deleting {} distinct row ids", distinct_to_delete.len());
let req = DeleteFromSegmentRequest {
table_name: TABLE_NAME.to_string(),
segment_id: segment.segment_id.to_string(),
row_ids: to_delete,
..Default::default()
};
*n_deletions_ub += opt.max_deletions_per_req;
client.delete_from_segment(req).await?;
Ok(())
}
async fn assert_reads(i: usize, mut client: Client, row_counts: &[usize], n_deletions_ub: usize) -> ClientResult<()> {
let list_req = ListSegmentsRequest {
table_name: TABLE_NAME.to_string(),
..Default::default()
};
println!("listing segments: {:?}", list_req);
let list_resp = client.list_segments(list_req).await?;
let current_row_count = *row_counts.last().unwrap();
let mut col_row_counts = Vec::new();
let mut col_null_counts = Vec::new();
for _ in 0..i + 1 {
col_row_counts.push(0);
col_null_counts.push(0);
}
for segment in &list_resp.segments {
println!("checking all columns for segment {}", segment.segment_id);
let segment_key = SegmentKey {
table_name: TABLE_NAME.to_string(),
partition: HashMap::new(),
segment_id: segment.segment_id.clone(),
};
let correlation_id = pancake_db_client::new_correlation_id();
let is_deleted = client.decode_is_deleted(
&segment_key,
&correlation_id,
).await?;
for col_idx in 0..i + 1 {
let col_meta = ColumnMeta {
dtype: DataType::Int64 as i32,
..Default::default()
};
let col_name = format!("col_{}", col_idx);
let fvs = client.decode_segment_column(
&segment_key,
&col_name,
&col_meta,
&is_deleted,
&correlation_id,
).await?;
col_row_counts[col_idx] += fvs.len();
for fv in &fvs {
if fv.value.is_none() {
col_null_counts[col_idx] += 1;
}
}
}
}
for col_idx in 0..i + 1 {
let l = col_row_counts[col_idx];
if l > current_row_count || l + n_deletions_ub < current_row_count {
return Err(ClientError::other(format!(
"expected {} to {} rows for col_{} at evolution {} but found {}",
current_row_count as i64 - n_deletions_ub as i64,
current_row_count,
col_idx,
i,
l,
)));
}
let n_nulls = col_null_counts[col_idx];
let n_nulls_lb = row_counts[col_idx].max(n_deletions_ub) - n_deletions_ub;
if n_nulls < n_nulls_lb {
return Err(ClientError::other(format!(
"expected the first {} rows for col_{} to be null, but only {} rows were null",
n_nulls_lb,
col_idx,
n_nulls,
)));
}
}
Ok(())
}