use std::vec;
use tokio::{runtime::Handle, task::block_in_place};
use crate::{error::Error, types::*};
use super::Client;
pub struct TableWriter {
client: Client,
table_name: String,
script: String,
columns: Vec<VectorImpl>,
column_types: Vec<DataType>,
column_names: Vec<String>,
buffer: Vec<VectorImpl>,
size: u32,
batch_size: u32,
}
impl TableWriter {
pub async fn new(mut client: Client, table_name: &str, batch_size: u32) -> Self {
if batch_size == 0 {
panic!("TableWriter: batch_size must be positive.");
}
let mut columns = vec![];
let mut column_types = vec![];
let mut column_names: Vec<String> = vec![];
let schema = client
.run_script(format!("schema({}).colDefs.typeInt", table_name).as_str())
.await;
if schema.is_err() {
panic!("Failed to get schema for table {}", table_name);
}
if let VectorImpl::Int(columns_ddb) = schema.unwrap().unwrap().as_vector().unwrap() {
for i in columns_ddb.iter() {
let data_type = DataType::try_from(i.0.unwrap() as u8).unwrap();
column_types.push(data_type);
let vec = match data_type {
DataType::Bool => BoolVector::new().into(),
DataType::Char => CharVector::new().into(),
DataType::Short => ShortVector::new().into(),
DataType::Int
| DataType::Date
| DataType::Month
| DataType::Time
| DataType::Minute
| DataType::Second
| DataType::DateTime
| DataType::DateHour => IntVector::new().into(),
DataType::Long
| DataType::Timestamp
| DataType::NanoTime
| DataType::NanoTimestamp => LongVector::new().into(),
DataType::Float => FloatVector::new().into(),
DataType::Double => DoubleVector::new().into(),
DataType::String | DataType::Symbol => StringVector::new().into(),
DataType::Decimal32 => Decimal32Vector::new().into(),
DataType::Decimal64 => Decimal64Vector::new().into(),
DataType::Decimal128 => Decimal128Vector::new().into(),
DataType::CharArray => CharArrayVector::new().into(),
DataType::ShortArray => ShortArrayVector::new().into(),
DataType::IntArray => IntArrayVector::new().into(),
DataType::LongArray => LongArrayVector::new().into(),
DataType::FloatArray => FloatArrayVector::new().into(),
DataType::DoubleArray => DoubleArrayVector::new().into(),
_ => unimplemented!(),
};
columns.push(vec);
}
}
if let VectorImpl::String(column_names_ddb) = client
.run_script(format!("schema({}).colDefs.name", table_name).as_str())
.await
.unwrap()
.unwrap()
.as_vector()
.unwrap()
{
column_names = column_names_ddb
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>();
}
let buffer = columns.clone();
Self {
client,
table_name: table_name.to_string(),
script: format!("tableInsert{{'{}'}}", table_name),
columns,
column_types,
column_names,
buffer,
size: 0,
batch_size,
}
}
pub async fn append_row(
&mut self,
row: &mut Vec<PrimitiveType>,
) -> Result<Option<ConstantImpl>, Error> {
if self.buffer.len() != row.len() {
panic!(
"Table {} has {} columns, but {} provided.",
self.table_name,
self.buffer.len(),
row.len()
);
}
let mut res: Result<(), Error> = Ok(());
for i in (0..self.buffer.len()).rev() {
let data = row.pop().unwrap();
match &data {
PrimitiveType::NaiveDate(d) => match self.column_types[i] {
DataType::Date => {
_ = self.buffer[i].push_primitive_type(Date::new(*d).ddb_rep().into())
}
DataType::Month => {
_ = self.buffer[i].push_primitive_type(Month::new(*d).ddb_rep().into())
}
t => {
res = Err(Error::InvalidConvert {
from: "NaiveDate".to_string(),
to: t.to_string(),
})
}
},
PrimitiveType::NaiveTime(t) => match self.column_types[i] {
DataType::Time => {
_ = self.buffer[i].push_primitive_type(Time::new(*t).ddb_rep().into())
}
DataType::Minute => {
_ = self.buffer[i].push_primitive_type(Minute::new(*t).ddb_rep().into())
}
DataType::Second => {
_ = self.buffer[i].push_primitive_type(Second::new(*t).ddb_rep().into())
}
DataType::NanoTime => {
_ = self.buffer[i].push_primitive_type(NanoTime::new(*t).ddb_rep().into())
}
t => {
res = Err(Error::InvalidConvert {
from: "NaiveTime".to_string(),
to: t.to_string(),
})
}
},
PrimitiveType::NaiveDateTime(dt) => match self.column_types[i] {
DataType::Timestamp => {
_ = self.buffer[i].push_primitive_type(Timestamp::new(*dt).ddb_rep().into())
}
DataType::NanoTimestamp => {
_ = self.buffer[i]
.push_primitive_type(NanoTimestamp::new(*dt).ddb_rep().into())
}
DataType::DateTime => {
_ = self.buffer[i].push_primitive_type(DateTime::new(*dt).ddb_rep().into())
}
DataType::DateHour => {
_ = self.buffer[i].push_primitive_type(DateHour::new(*dt).ddb_rep().into())
}
t => {
res = Err(Error::InvalidConvert {
from: "NaiveDateTime".to_string(),
to: t.to_string(),
})
}
},
_ => res = self.buffer[i].push_primitive_type(data),
};
if let Err(err) = res {
panic!(
"Failed to insert into column `{}`: {}.",
self.column_names[i].clone(),
err.to_string()
);
}
}
self.size += 1;
if self.size == self.batch_size {
return self.flush().await;
}
Ok(None)
}
pub async fn flush(&mut self) -> Result<Option<ConstantImpl>, Error> {
let mut builder = TableBuilder::new();
let content = std::mem::take(&mut self.buffer);
self.buffer = self.columns.clone();
self.size = 0;
builder.with_contents(content, self.column_names.clone());
self.client
.run_function(self.script.as_str(), &[builder.build().unwrap().into()])
.await
}
pub fn size(&self) -> usize {
self.buffer[0].len()
}
}
impl Drop for TableWriter {
fn drop(&mut self) {
block_in_place(|| {
Handle::current().block_on(async move {
let _ = self.flush().await;
});
});
}
}