use std::collections::HashMap;
use futures::StreamExt;
use pancake_db_client::{Client, make_partition, make_row, SegmentKey};
use pancake_db_client::errors::{ClientResult, ClientErrorKind};
use pancake_db_idl::ddl::{CreateTableRequest, DropTableRequest, GetSchemaRequest};
use pancake_db_idl::dml::{DeleteFromSegmentRequest, ListSegmentsRequest, Segment, WriteToPartitionRequest};
use pancake_db_idl::dtype::DataType;
use pancake_db_idl::partition_dtype::PartitionDataType;
use pancake_db_idl::schema::{ColumnMeta, PartitionMeta, Schema};
use rand::{Rng, thread_rng};
use tokio;
use tonic::Code;
const TABLE_NAME: &str = "client_runthrough_table";
const N_PARTITIONS: i64 = 3;
#[tokio::main]
async fn main() -> ClientResult<()> {
let mut client = Client::connect("http://localhost:3842").await?;
let drop_table_res = client.drop_table(DropTableRequest {
table_name: TABLE_NAME.to_string(),
..Default::default()
}).await;
match drop_table_res {
Ok(resp) => {
println!("Dropped existing table: {:?}", resp);
Ok(())
},
Err(err) => {
match err.kind {
ClientErrorKind::Grpc { code: Code::NotFound } => Ok(()),
_ => Err(err),
}
}
}?;
let i_meta = ColumnMeta {
dtype: DataType::Int64 as i32,
..Default::default()
};
let s_meta = ColumnMeta {
dtype: DataType::String as i32,
nested_list_depth: 1,
..Default::default()
};
let mut columns = HashMap::new();
columns.insert("i".to_string(), i_meta);
columns.insert("s".to_string(), s_meta);
let mut partitioning = HashMap::new();
partitioning.insert("pk".to_string(), PartitionMeta {
dtype: PartitionDataType::Int64 as i32,
..Default::default()
});
let create_table_req = CreateTableRequest {
table_name: TABLE_NAME.to_string(),
schema: Some(Schema {
partitioning,
columns: columns.clone(),
..Default::default()
}),
..Default::default()
};
let create_resp = client.create_table(create_table_req).await?;
println!("Created table: {:?}", create_resp);
let get_schema_req = GetSchemaRequest {
table_name: TABLE_NAME.to_string(),
..Default::default()
};
let get_schema_resp = client.get_schema(get_schema_req).await?;
println!("Got schema: {:?}", get_schema_resp);
let max_concurrency = 16;
futures::stream::repeat(0).take(1000) .for_each_concurrent(
max_concurrency,
|_| async {
let mut rows = Vec::with_capacity(255);
let mut rng = thread_rng();
for _ in 0..49 {
rows.push(make_row! {
"i" => i64::MAX / rng.gen_range(1..i64::MAX),
"s" => vec!["item 0".to_string(), "item 1".to_string()],
})
}
rows.push(make_row! {});
let req = WriteToPartitionRequest {
table_name: TABLE_NAME.to_string(),
rows,
partition: make_partition! {
"pk" => rng.gen_range(0..N_PARTITIONS)
},
..Default::default()
};
client.clone().write_to_partition(req).await.expect("write failed");
}
)
.await;
let list_segments_req = ListSegmentsRequest {
table_name: TABLE_NAME.to_string(),
..Default::default()
};
let list_resp = client.list_segments(list_segments_req).await?;
println!("Listed segments: {:?}", list_resp);
let segment = list_resp.segments[0].clone();
let Segment { segment_id, partition, .. } = segment;
let delete_req = DeleteFromSegmentRequest {
table_name: TABLE_NAME.to_string(),
segment_id: segment_id.clone(),
partition,
row_ids: vec![0, 4, 10],
..Default::default()
};
let delete_resp = client.delete_from_segment(delete_req.clone()).await?;
println!("Deleted rows from segment {}: {:?}", segment_id, delete_resp);
let delete_resp = client.delete_from_segment(delete_req).await?;
println!("Idempotently deleted same rows again: {:?}", delete_resp);
let mut read_columns = columns;
read_columns.insert("_row_id".to_string(), ColumnMeta {
dtype: DataType::Int64 as i32,
..Default::default()
});
let mut total = 0;
for segment in &list_resp.segments {
println!("\npartition {:?} segment {:?}", segment.partition, segment.segment_id);
let segment_key = SegmentKey {
table_name: TABLE_NAME.to_string(),
segment_id: segment.segment_id.clone(),
partition: segment.partition.clone(),
..Default::default()
};
let rows = client.decode_segment(
&segment_key,
&read_columns,
).await?;
let count = rows.len();
total += count;
println!("read segment {} with {} rows (total {})", segment.segment_id, count, total);
for i in 0..5 {
println!("\t{}th row: {:?}", i, rows[i].clone());
}
println!("\t...");
}
Ok(())
}