#![allow(clippy::size_of_in_element_count)]
#![allow(clippy::type_complexity)]
use crate::common::{BorrowedValue, Field, Precision, Ty, Value};
use bytes::Bytes;
use itertools::Itertools;
use rayon::prelude::*;
use serde::Deserialize;
use taos_error::Error;
use std::{
cell::{Cell, RefCell, UnsafeCell},
ffi::c_void,
fmt::Debug,
fmt::Display,
ops::Deref,
ptr::NonNull,
rc::Rc,
};
pub mod layout;
pub mod meta;
mod data;
use layout::Layout;
#[allow(clippy::missing_safety_doc)]
#[allow(clippy::should_implement_trait)]
pub mod views;
pub use views::ColumnView;
use views::*;
pub use data::*;
pub use meta::*;
mod de;
mod rows;
pub use rows::*;
use derive_builder::Builder;
#[derive(Debug, Clone, Copy)]
#[repr(C, packed(1))]
struct Header {
version: u32,
length: u32,
nrows: u32,
ncols: u32,
flag: u32,
group_id: u64,
}
impl Default for Header {
fn default() -> Self {
Self {
version: 1,
length: Default::default(),
nrows: Default::default(),
ncols: Default::default(),
flag: u32::MAX,
group_id: Default::default(),
}
}
}
impl Header {
fn as_bytes(&self) -> &[u8] {
unsafe {
let ptr = self as *const Self;
let len = std::mem::size_of::<Self>();
std::slice::from_raw_parts(ptr as *const u8, len)
}
}
fn len(&self) -> usize {
self.length as _
}
fn nrows(&self) -> usize {
self.nrows as _
}
fn ncols(&self) -> usize {
self.ncols as _
}
}
pub struct RawBlock {
layout: Rc<RefCell<Layout>>,
version: Version,
data: Cell<Bytes>,
rows: usize,
cols: usize,
precision: Precision,
database: Option<String>,
table: Option<String>,
fields: Vec<String>,
group_id: u64,
schemas: Schemas,
lengths: Lengths,
columns: Vec<ColumnView>,
}
unsafe impl Send for RawBlock {}
unsafe impl Sync for RawBlock {}
impl Debug for RawBlock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Raw")
.field("layout", &self.layout)
.field("version", &self.version)
.field("data", &"...")
.field("rows", &self.rows)
.field("cols", &self.cols)
.field("precision", &self.precision)
.field("table", &self.table)
.field("fields", &self.fields)
.field("group_id", &self.group_id)
.field("schemas", &self.schemas)
.field("lengths", &self.lengths)
.field("columns", &self.columns)
.finish()
}
}
impl RawBlock {
pub fn from_views(views: &[ColumnView], precision: Precision) -> Self {
Self::parse_from_raw_block(views_to_raw_block(views), precision)
}
pub unsafe fn parse_from_ptr(ptr: *mut c_void, precision: Precision) -> Self {
let header = &*(ptr as *const Header);
let len = header.length as usize;
let bytes = std::slice::from_raw_parts(ptr as *const u8, len);
let bytes = Bytes::from(bytes.to_vec());
Self::parse_from_raw_block(bytes, precision).with_layout(Layout::default())
}
#[allow(clippy::needless_range_loop)]
pub unsafe fn parse_from_ptr_v2(
ptr: *const *const c_void,
fields: &[Field],
lengths: &[u32],
rows: usize,
precision: Precision,
) -> Self {
let mut bytes = Vec::new();
for i in 0..fields.len() {
let slice = ptr.add(i).read();
bytes.extend_from_slice(std::slice::from_raw_parts(
slice as *const u8,
lengths[i] as usize * rows,
));
}
Self::parse_from_raw_block_v2(bytes, fields, lengths, rows, precision)
}
pub fn parse_from_raw_block_v2(
bytes: impl Into<Bytes>,
fields: &[Field],
lengths: &[u32],
rows: usize,
precision: Precision,
) -> Self {
use bytes::BufMut;
debug_assert_eq!(fields.len(), lengths.len());
#[inline(always)]
fn bool_is_null(v: *const bool) -> bool {
unsafe { (v as *const u8).read_unaligned() == 0x02 }
}
#[inline(always)]
fn tiny_int_is_null(v: *const i8) -> bool {
unsafe { (v as *const u8).read_unaligned() == 0x80 }
}
#[inline(always)]
fn small_int_is_null(v: *const i16) -> bool {
unsafe { (v as *const u16).read_unaligned() == 0x8000 }
}
#[inline(always)]
fn int_is_null(v: *const i32) -> bool {
unsafe { (v as *const u32).read_unaligned() == 0x80000000 }
}
#[inline(always)]
fn big_int_is_null(v: *const i64) -> bool {
unsafe { (v as *const u64).read_unaligned() == 0x8000000000000000 }
}
#[inline(always)]
fn u_tiny_int_is_null(v: *const u8) -> bool {
unsafe { v.read_unaligned() == 0xFF }
}
#[inline(always)]
fn u_small_int_is_null(v: *const u16) -> bool {
unsafe { v.read_unaligned() == 0xFFFF }
}
#[inline(always)]
fn u_int_is_null(v: *const u32) -> bool {
unsafe { v.read_unaligned() == 0xFFFFFFFF }
}
#[inline(always)]
fn u_big_int_is_null(v: *const u64) -> bool {
unsafe { v.read_unaligned() == 0xFFFFFFFFFFFFFFFF }
}
#[inline(always)]
fn float_is_null(v: *const f32) -> bool {
unsafe { (v as *const u32).read_unaligned() == 0x7FF00000 }
}
#[inline(always)]
fn double_is_null(v: *const f64) -> bool {
unsafe { (v as *const u64).read_unaligned() == 0x7FFFFF0000000000 }
}
let layout = Rc::new(RefCell::new(Layout::INLINE_DEFAULT.with_schema_changed()));
let bytes = bytes.into();
let cols = fields.len();
let mut schemas_bytes =
bytes::BytesMut::with_capacity(rows * std::mem::size_of::<ColSchema>());
fields
.iter()
.for_each(|f| schemas_bytes.put(f.to_column_schema().as_bytes()));
let schemas = Schemas::from(schemas_bytes);
let mut data_lengths = LengthsMut::new(cols);
let mut columns = Vec::new();
let mut offset = 0;
for (i, (field, length)) in fields.iter().zip(lengths).enumerate() {
macro_rules! _primitive_view {
($ty:ident, $prim:ty) => {
{
debug_assert_eq!(field.bytes(), *length);
let start = offset;
offset += rows * std::mem::size_of::<$prim>() as usize;
let data = bytes.slice(start..offset);
let nulls = NullBits::from_iter((0..rows).map(|row| unsafe {
paste::paste!{ [<$ty:snake _is_null>] (
data
.as_ptr()
.offset(row as isize * std::mem::size_of::<$prim>() as isize)
as *const $prim,
) }
}));
data_lengths[i] = data.len() as u32;
let column = paste::paste! { ColumnView::$ty([<$ty View>] { nulls, data }) };
columns.push(column);
}};
}
match field.ty() {
Ty::Null => unreachable!(),
Ty::Bool => _primitive_view!(Bool, bool),
Ty::TinyInt => _primitive_view!(TinyInt, i8),
Ty::SmallInt => _primitive_view!(SmallInt, i16),
Ty::Int => _primitive_view!(Int, i32),
Ty::BigInt => _primitive_view!(BigInt, i64),
Ty::UTinyInt => _primitive_view!(UTinyInt, u8),
Ty::USmallInt => _primitive_view!(USmallInt, u16),
Ty::UInt => _primitive_view!(UInt, u32),
Ty::UBigInt => _primitive_view!(UBigInt, u64),
Ty::Float => _primitive_view!(Float, f32),
Ty::Double => _primitive_view!(Double, f64),
Ty::VarChar => {
let start = offset;
offset += *length as usize * rows;
let data = bytes.slice(start..offset);
let data_ptr = data.as_ptr();
let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
let offset = row as i32 * *length as i32;
let ptr = data_ptr.offset(offset as isize);
let len = (ptr as *const u16).read_unaligned();
if len == 1 && *ptr.offset(2) == 0xFF {
-1
} else {
offset
}
}));
columns.push(ColumnView::VarChar(VarCharView { offsets, data }));
data_lengths[i] = *length * rows as u32;
}
Ty::Timestamp => {
let start = offset;
offset += rows * std::mem::size_of::<i64>();
let data = bytes.slice(start..offset);
let nulls = NullBits::from_iter((0..rows).map(|row| unsafe {
big_int_is_null(
&(data
.as_ptr()
.offset(row as isize * std::mem::size_of::<i64>() as isize)
as *const i64)
.read_unaligned() as _,
)
}));
data_lengths[i] = data.len() as u32;
let column = ColumnView::Timestamp(TimestampView {
nulls,
data,
precision,
});
columns.push(column);
}
Ty::NChar => {
let start = offset;
offset += *length as usize * rows;
let data = bytes.slice(start..offset);
let data_ptr = data.as_ptr();
let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
let offset = row as i32 * *length as i32;
let ptr = data_ptr.offset(offset as isize);
let len = (ptr as *const u16).read_unaligned();
if len == 4 && (ptr.offset(2) as *const u32).read_unaligned() == 0xFFFFFFFF
{
-1
} else {
offset
}
}));
columns.push(ColumnView::NChar(NCharView {
offsets,
data,
is_chars: UnsafeCell::new(false),
version: Version::V2,
layout: layout.clone(),
}));
data_lengths[i] = *length * rows as u32;
}
Ty::Json => {
let start = offset;
offset += *length as usize * rows;
let data = bytes.slice(start..offset);
let data_ptr = data.as_ptr();
let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
let offset = row as i32 * *length as i32;
let ptr = data_ptr.offset(offset as isize);
let len = (ptr as *const u16).read_unaligned();
if len == 4 && (ptr.offset(2) as *const u32).read_unaligned() == 0xFFFFFFFF
{
-1
} else {
offset
}
}));
columns.push(ColumnView::Json(JsonView { offsets, data }));
data_lengths[i] = *length * rows as u32;
}
Ty::VarBinary => todo!(),
Ty::Decimal => todo!(),
Ty::Blob => todo!(),
Ty::MediumBlob => todo!(),
}
}
Self {
layout,
version: Version::V2,
data: Cell::new(bytes),
rows,
cols,
schemas,
lengths: data_lengths.into_lengths(),
precision,
database: None,
table: None,
fields: fields.iter().map(|s| s.name().to_string()).collect(),
columns,
group_id: 0,
}
}
pub fn parse_from_raw_block(bytes: impl Into<Bytes>, precision: Precision) -> Self {
let schema_start: usize = std::mem::size_of::<Header>();
let layout = Rc::new(RefCell::new(Layout::INLINE_DEFAULT));
let bytes = bytes.into();
let ptr = bytes.as_ptr();
let header = unsafe { &*(ptr as *const Header) };
let rows = header.nrows();
let cols = header.ncols();
let len = header.len();
debug_assert_eq!(bytes.len(), len);
let group_id = header.group_id;
let schema_end = schema_start + cols * std::mem::size_of::<ColSchema>();
let schemas = Schemas::from(bytes.slice(schema_start..schema_end));
let lengths_end = schema_end + std::mem::size_of::<u32>() * cols;
let lengths = Lengths::from(bytes.slice(schema_end..lengths_end));
let mut data_offset = lengths_end;
let mut columns = Vec::with_capacity(cols);
for col in 0..cols {
let length = unsafe { lengths.get_unchecked(col) } as usize;
let schema = unsafe { schemas.get_unchecked(col) };
macro_rules! _primitive_value {
($ty:ident, $prim:ty) => {{
let o1 = data_offset;
let o2 = data_offset + ((rows + 7) >> 3); data_offset = o2 + rows * std::mem::size_of::<$prim>();
let nulls = bytes.slice(o1..o2);
let data = bytes.slice(o2..data_offset);
ColumnView::$ty(paste::paste! {[<$ty View>] {
nulls: NullBits(nulls),
data,
}})
}};
}
let column = match schema.ty {
Ty::Null => unreachable!("raw block does not contains type NULL"),
Ty::Bool => _primitive_value!(Bool, i8),
Ty::TinyInt => _primitive_value!(TinyInt, i8),
Ty::SmallInt => _primitive_value!(SmallInt, i16),
Ty::Int => _primitive_value!(Int, i32),
Ty::BigInt => _primitive_value!(BigInt, i64),
Ty::Float => _primitive_value!(Float, f32),
Ty::Double => _primitive_value!(Double, f64),
Ty::VarChar => {
let o1 = data_offset;
let o2 = data_offset + std::mem::size_of::<i32>() * rows;
data_offset = o2 + length;
let offsets = Offsets::from(bytes.slice(o1..o2));
let data = bytes.slice(o2..data_offset);
ColumnView::VarChar(VarCharView { offsets, data })
}
Ty::Timestamp => {
let o1 = data_offset;
let o2 = data_offset + ((rows + 7) >> 3);
data_offset = o2 + rows * std::mem::size_of::<i64>();
let nulls = bytes.slice(o1..o2);
let data = bytes.slice(o2..data_offset);
ColumnView::Timestamp(TimestampView {
nulls: NullBits(nulls),
data,
precision,
})
}
Ty::NChar => {
let o1 = data_offset;
let o2 = data_offset + std::mem::size_of::<i32>() * rows;
data_offset = o2 + length;
let offsets = Offsets::from(bytes.slice(o1..o2));
let data = bytes.slice(o2..data_offset);
ColumnView::NChar(NCharView {
offsets,
data,
is_chars: UnsafeCell::new(true),
version: Version::V3,
layout: layout.clone(),
})
}
Ty::UTinyInt => _primitive_value!(UTinyInt, u8),
Ty::USmallInt => _primitive_value!(USmallInt, u16),
Ty::UInt => _primitive_value!(UInt, u32),
Ty::UBigInt => _primitive_value!(UBigInt, u64),
Ty::Json => {
let o1 = data_offset;
let o2 = data_offset + std::mem::size_of::<i32>() * rows;
data_offset = o2 + length;
let offsets = Offsets::from(bytes.slice(o1..o2));
let data = bytes.slice(o2..data_offset);
ColumnView::Json(JsonView { offsets, data })
}
ty => {
unreachable!("unsupported type: {ty}")
}
};
columns.push(column);
debug_assert!(data_offset <= len);
}
RawBlock {
layout,
version: Version::V3,
data: Cell::new(bytes),
rows,
cols,
precision,
group_id,
schemas,
lengths,
database: None,
table: None,
fields: Vec::new(),
columns,
}
}
pub fn with_database_name(&mut self, name: impl Into<String>) -> &mut Self {
self.database = Some(name.into());
self
}
pub fn with_table_name(&mut self, name: impl Into<String>) -> &mut Self {
self.table = Some(name.into());
self.layout.borrow_mut().with_table_name();
self
}
pub fn with_field_names<S: Display, I: IntoIterator<Item = S>>(
&mut self,
names: I,
) -> &mut Self {
self.fields = names.into_iter().map(|name| name.to_string()).collect();
self.layout.borrow_mut().with_field_names();
self
}
fn with_layout(mut self, layout: Layout) -> Self {
self.layout = Rc::new(RefCell::new(layout));
self
}
#[inline]
pub fn ncols(&self) -> usize {
self.columns.len()
}
#[inline]
pub fn nrows(&self) -> usize {
self.rows
}
#[inline]
pub const fn precision(&self) -> Precision {
self.precision
}
#[inline]
pub const fn group_id(&self) -> u64 {
self.group_id
}
#[inline]
pub fn table_name(&self) -> Option<&str> {
self.table.as_deref()
}
#[inline]
pub fn tmq_db_name(&self) -> Option<&str> {
self.database.as_deref()
}
#[inline]
pub fn schemas(&self) -> &[ColSchema] {
&self.schemas
}
pub fn field_names(&self) -> &[String] {
&self.fields
}
#[inline]
pub fn columns(&self) -> std::slice::Iter<ColumnView> {
self.columns.iter()
}
pub fn column_views(&self) -> &[ColumnView] {
&self.columns
}
#[inline]
pub fn rows<'a>(&self) -> RowsIter<'a> {
RowsIter {
raw: NonNull::new(self as *const Self as *mut Self).unwrap(),
row: 0,
_marker: std::marker::PhantomData,
}
}
#[inline]
pub fn into_rows<'a>(self) -> IntoRowsIter<'a>
where
Self: 'a,
{
IntoRowsIter {
raw: self,
row: 0,
_marker: std::marker::PhantomData,
}
}
#[inline]
pub fn deserialize<'de, 'a: 'de, T>(
&'a self,
) -> std::iter::Map<rows::RowsIter<'_>, fn(RowView<'a>) -> Result<T, DeError>>
where
T: Deserialize<'de>,
{
self.rows().map(|mut row| T::deserialize(&mut row))
}
pub fn as_raw_bytes(&self) -> &[u8] {
if self.layout.borrow().schema_changed() {
let bytes = views_to_raw_block(&self.columns);
let bytes = bytes.into();
self.data.replace(bytes);
self.layout.borrow_mut().set_schema_changed(false);
unsafe { &*self.data.as_ptr() }
} else {
unsafe { &(*self.data.as_ptr()) }
}
}
pub fn is_null(&self, row: usize, col: usize) -> bool {
if row >= self.nrows() || col >= self.ncols() {
return true;
}
unsafe { self.columns.get_unchecked(col).is_null_unchecked(row) }
}
#[inline]
pub unsafe fn get_raw_value_unchecked(
&self,
row: usize,
col: usize,
) -> (Ty, u32, *const c_void) {
let view = self.columns.get_unchecked(col);
view.get_raw_value_unchecked(row)
}
pub fn get_ref(&self, row: usize, col: usize) -> Option<BorrowedValue> {
if row >= self.nrows() || col >= self.ncols() {
return None;
}
Some(unsafe { self.get_ref_unchecked(row, col) })
}
#[inline]
pub unsafe fn get_ref_unchecked(&self, row: usize, col: usize) -> BorrowedValue {
self.columns.get_unchecked(col).get_ref_unchecked(row)
}
pub fn to_values(&self) -> Vec<Vec<Value>> {
self.rows().map(|row| row.into_values()).collect_vec()
}
pub fn write<W: std::io::Write>(&self, _wtr: W) -> std::io::Result<usize> {
todo!()
}
fn layout(&self) -> Layout {
Layout::from_bits(self.layout.borrow().as_inner()).unwrap()
}
pub fn fields(&self) -> Vec<Field> {
self.schemas()
.iter()
.zip(self.field_names())
.map(|(schema, name)| Field::new(name, schema.ty, schema.len))
.collect_vec()
}
pub fn pretty_format(&self) -> PrettyBlock {
PrettyBlock::new(self)
}
pub fn to_create(&self) -> Option<MetaCreate> {
self.table_name().map(|table_name| MetaCreate::Normal {
table_name: table_name.to_string(),
columns: self.fields(),
})
}
pub fn concat(&self, rhs: &Self) -> Self {
debug_assert_eq!(self.ncols(), rhs.ncols());
#[cfg(debug_assertions)]
{
for (l, r) in self.fields().into_iter().zip(rhs.fields()) {
debug_assert_eq!(l, r);
}
}
let fields = self.field_names();
let views = rhs
.column_views()
.iter()
.zip(self.column_views())
.collect_vec()
.into_par_iter()
.map(|(r, l)| r.concat_strictly(l))
.collect::<Vec<_>>();
let mut block = Self::from_views(&views, self.precision());
block.with_field_names(fields);
if let Some(table_name) = self.table_name().or(rhs.table_name()) {
block.with_table_name(table_name);
}
block
}
}
impl std::ops::Add for RawBlock {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
self.concat(&rhs)
}
}
impl std::ops::Add for &RawBlock {
type Output = RawBlock;
fn add(self, rhs: Self) -> Self::Output {
self.concat(rhs)
}
}
pub struct PrettyBlock<'a> {
raw: &'a RawBlock,
}
impl<'a> PrettyBlock<'a> {
fn new(raw: &'a RawBlock) -> Self {
Self { raw }
}
}
impl<'a> Deref for PrettyBlock<'a> {
type Target = RawBlock;
fn deref(&self) -> &Self::Target {
self.raw
}
}
impl<'a> Display for PrettyBlock<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use prettytable::{Row, Table};
let mut table = Table::new();
writeln!(
f,
"Table view with {} rows, {} columns",
self.nrows(),
self.ncols()
)?;
table.set_titles(Row::from_iter(self.field_names()));
let nrows = self.nrows();
const MAX_DISPLAY_ROWS: usize = 10;
let mut rows_iter = self.raw.rows();
if f.alternate() {
for row in rows_iter {
table.add_row(Row::from_iter(
row.map(|s| s.1.to_string().unwrap_or_default()),
));
}
} else if nrows > 2 * MAX_DISPLAY_ROWS {
for row in (&mut rows_iter).take(MAX_DISPLAY_ROWS) {
table.add_row(Row::from_iter(
row.map(|s| s.1.to_string().unwrap_or_default()),
));
}
table.add_row(Row::from_iter(std::iter::repeat("...").take(self.ncols())));
for row in rows_iter.skip(nrows - 2 * MAX_DISPLAY_ROWS) {
table.add_row(Row::from_iter(
row.map(|s| s.1.to_string().unwrap_or_default()),
));
}
} else {
for row in rows_iter {
table.add_row(Row::from_iter(
row.map(|s| s.1.to_string().unwrap_or_default()),
));
}
}
f.write_fmt(format_args!("{}", table))?;
Ok(())
}
}
struct InlineBlock(Bytes);
impl From<InlineBlock> for Bytes {
fn from(raw: InlineBlock) -> Self {
raw.0
}
}
impl crate::prelude::sync::Inlinable for InlineBlock {
fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self>
where
Self: Sized,
{
use crate::prelude::sync::InlinableRead;
let version = reader.read_u32()?;
let len = reader.read_u32()?;
let mut bytes = vec![0; len as usize];
unsafe {
std::ptr::copy_nonoverlapping(
version.to_le_bytes().as_ptr(),
bytes.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
std::ptr::copy_nonoverlapping(
len.to_le_bytes().as_ptr(),
bytes.as_mut_ptr().offset(4),
std::mem::size_of::<u32>(),
);
}
let buf = &mut bytes[8..];
reader.read_exact(buf)?;
Ok(Self(bytes.into()))
}
fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
wtr.write_all(self.0.as_ref())?;
Ok(self.0.len())
}
}
#[async_trait::async_trait]
impl crate::prelude::AsyncInlinable for InlineBlock {
async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
reader: &mut R,
) -> std::io::Result<Self>
where
Self: Sized,
{
use tokio::io::*;
let version = reader.read_u32_le().await?;
let len = reader.read_u32_le().await?;
let mut bytes = vec![0; len as usize];
unsafe {
std::ptr::copy_nonoverlapping(
version.to_le_bytes().as_ptr(),
bytes.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
std::ptr::copy_nonoverlapping(
len.to_le_bytes().as_ptr(),
bytes.as_mut_ptr().offset(4),
std::mem::size_of::<u32>(),
);
}
let buf = &mut bytes[8..];
reader.read_exact(buf).await?;
Ok(Self(bytes.into()))
}
async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
&self,
wtr: &mut W,
) -> std::io::Result<usize> {
use tokio::io::*;
wtr.write_all(self.0.as_ref()).await?;
Ok(self.0.len())
}
}
impl crate::prelude::sync::Inlinable for RawBlock {
fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
use crate::prelude::sync::InlinableRead;
let layout = reader.read_u32()?;
let layout = Layout::from_bits(layout).expect("should be layout");
let precision = layout.precision();
let raw: InlineBlock = reader.read_inlinable()?;
let mut raw = Self::parse_from_raw_block(raw.0, precision);
let cols = raw.ncols();
if layout.expect_table_name() {
let name = reader.read_inlined_str::<2>()?;
raw.with_table_name(name);
}
if layout.expect_field_names() {
let names: Vec<_> = (0..cols)
.map(|_| reader.read_inlined_str::<1>())
.try_collect()?;
raw.with_field_names(names);
}
Ok(raw)
}
fn read_optional_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Option<Self>> {
use crate::prelude::sync::InlinableRead;
let layout = reader.read_u32()?;
if layout == 0xFFFFFFFF {
return Ok(None);
}
let layout = Layout::from_bits(layout).expect("should be layout");
let precision = layout.precision();
let raw: InlineBlock = reader.read_inlinable()?;
let mut raw = Self::parse_from_raw_block(raw.0, precision);
if layout.expect_table_name() {
let name = reader.read_inlined_str::<2>()?;
raw.with_table_name(name);
}
if layout.expect_field_names() {
let names: Vec<_> = (0..raw.ncols())
.map(|_| reader.read_inlined_str::<1>())
.try_collect()?;
raw.with_field_names(names);
}
log::trace!(
"table name: {}, cols: {}, rows: {}",
&raw.table_name().unwrap_or("(?)"),
raw.ncols(),
raw.nrows()
);
Ok(Some(raw))
}
fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
use crate::prelude::sync::InlinableWrite;
let layout = self.layout();
let mut l = wtr.write_u32_le(layout.as_inner())?;
let raw = self.as_raw_bytes();
wtr.write_all(raw)?;
l += raw.len();
if layout.expect_table_name() {
let name = self.table_name().expect("table name should be known");
l += wtr.write_inlined_bytes::<2>(name.as_bytes())?;
}
if layout.expect_field_names() {
debug_assert_eq!(self.field_names().len(), self.ncols());
for field in self.field_names() {
l += wtr.write_inlined_str::<1>(field)?;
}
}
Ok(l)
}
}
#[repr(C)]
#[derive(Default, Debug, Copy, Clone)]
pub enum SchemalessProtocol {
Unknown = 0,
#[default]
Line,
Telnet,
Json,
}
#[repr(C)]
#[derive(Default, Debug, Copy, Clone)]
pub enum SchemalessPrecision {
NonConfigured = 0,
Hours,
Minutes,
Seconds,
#[default]
Millisecond,
Microsecond,
Nanosecond,
}
impl From<SchemalessPrecision> for String {
fn from(precision: SchemalessPrecision) -> Self {
match precision {
SchemalessPrecision::Millisecond => "ms".to_string(),
SchemalessPrecision::Microsecond => "us".to_string(),
SchemalessPrecision::Nanosecond => "ns".to_string(),
_ => todo!(),
}
}
}
#[derive(Default, Builder, Debug)]
#[builder(setter(into))]
pub struct SmlData {
protocol: SchemalessProtocol,
#[builder(setter(into, strip_option), default)]
precision: SchemalessPrecision,
data: Vec<String>,
#[builder(setter(into, strip_option), default)]
ttl: Option<i32>,
#[builder(setter(into, strip_option), default)]
req_id: Option<u64>,
}
impl SmlData {
#[inline]
pub fn protocol(&self) -> SchemalessProtocol {
self.protocol
}
#[inline]
pub fn precision(&self) -> SchemalessPrecision {
self.precision
}
#[inline]
pub fn data(&self) -> &Vec<String> {
&self.data
}
#[inline]
pub fn ttl(&self) -> Option<i32> {
self.ttl
}
#[inline]
pub fn req_id(&self) -> Option<u64> {
self.req_id
}
}
impl From<SmlDataBuilderError> for Error {
fn from(value: SmlDataBuilderError) -> Self {
Error::from_any(value)
}
}
#[async_trait::async_trait]
impl crate::prelude::AsyncInlinable for RawBlock {
async fn read_optional_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
reader: &mut R,
) -> std::io::Result<Option<Self>> {
use crate::util::AsyncInlinableRead;
use tokio::io::*;
let layout = reader.read_u32_le().await?;
if layout == 0xFFFFFFFF {
return Ok(None);
}
let layout = Layout::from_bits(layout).expect("invalid layout");
let precision = layout.precision();
let raw: InlineBlock =
<InlineBlock as crate::prelude::AsyncInlinable>::read_inlined(reader).await?;
let mut raw = Self::parse_from_raw_block(raw.0, precision);
if layout.expect_table_name() {
let name = reader.read_inlined_str::<2>().await?;
raw.with_table_name(name);
}
if layout.expect_field_names() {
let mut names = Vec::with_capacity(raw.ncols());
for _ in 0..raw.ncols() {
names.push(reader.read_inlined_str::<1>().await?);
}
raw.with_field_names(names);
}
Ok(Some(raw))
}
async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
reader: &mut R,
) -> std::io::Result<Self> {
<Self as crate::prelude::AsyncInlinable>::read_optional_inlined(reader)
.await?
.ok_or(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid raw data format",
))
}
async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
&self,
wtr: &mut W,
) -> std::io::Result<usize> {
use crate::util::AsyncInlinableWrite;
use tokio::io::*;
let layout = self.layout();
wtr.write_u32_le(layout.as_inner()).await?;
let raw = self.as_raw_bytes();
wtr.write_all(raw).await?;
let mut l = std::mem::size_of::<u32>() + raw.len();
if layout.expect_table_name() {
let name = self.table_name().expect("table name should be known");
l += wtr.write_inlined_bytes::<2>(name.as_bytes()).await?;
}
if layout.expect_field_names() {
debug_assert_eq!(self.field_names().len(), self.ncols());
for field in self.field_names() {
l += wtr.write_inlined_str::<1>(field).await?;
}
}
Ok(l)
}
}
#[tokio::test]
async fn test_raw_from_v2() {
use crate::prelude::AsyncInlinable;
use std::ops::Deref;
let bytes = b"\x10\x86\x1aA \xcc)AB\xc2\x14AZ],A\xa2\x8d$A\x87\xb9%A\xf5~\x0fA\x96\xf7,AY\xee\x17A1|\x15As\x00\x00\x00q\x00\x00\x00s\x00\x00\x00t\x00\x00\x00u\x00\x00\x00t\x00\x00\x00n\x00\x00\x00n\x00\x00\x00n\x00\x00\x00r\x00\x00\x00";
let block = RawBlock::parse_from_raw_block_v2(
bytes.as_slice(),
&[Field::new("a", Ty::Float, 4), Field::new("b", Ty::Int, 4)],
&[4, 4],
10,
Precision::Millisecond,
);
assert!(block.lengths.deref() == &[40, 40]);
let bytes = include_bytes!("../../../tests/test.txt");
let block = RawBlock::parse_from_raw_block_v2(
bytes.as_slice(),
&[
Field::new("ts", Ty::Timestamp, 8),
Field::new("current", Ty::Float, 4),
Field::new("voltage", Ty::Int, 4),
Field::new("phase", Ty::Float, 4),
Field::new("group_id", Ty::Int, 4),
Field::new("location", Ty::VarChar, 16),
],
&[8, 4, 4, 4, 4, 18],
10,
Precision::Millisecond,
);
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
ts: String,
current: f32,
voltage: i32,
phase: f32,
group_id: i32,
location: String,
}
let rows: Vec<Record> = block.deserialize().try_collect().unwrap();
dbg!(rows);
let bytes = views_to_raw_block(&block.columns);
let raw2 = RawBlock::parse_from_raw_block(bytes, block.precision);
dbg!(&raw2);
let inlined = raw2.inlined().await;
dbg!(&inlined);
let mut raw3 = RawBlock::read_inlined(&mut inlined.as_slice())
.await
.unwrap();
raw3.with_field_names(["ts", "current", "voltage", "phase", "group_id", "location"])
.with_table_name("meters");
dbg!(&raw3);
let raw4 = RawBlock::read_inlined(&mut raw3.inlined().await.as_slice())
.await
.unwrap();
dbg!(&raw4);
assert_eq!(raw3.table_name(), raw4.table_name());
let raw5 = RawBlock::read_optional_inlined(&mut raw3.inlined().await.as_slice())
.await
.unwrap();
dbg!(raw5);
}
#[test]
fn test_v2_full() {
let bytes = include_bytes!("../../../tests/v2.block.gz");
use flate2::read::GzDecoder;
use std::io::prelude::*;
let mut buf = Vec::new();
let len = GzDecoder::new(&bytes[..]).read_to_end(&mut buf).unwrap();
assert_eq!(len, 66716);
let block = RawBlock::parse_from_raw_block_v2(
buf,
&[
Field::new("ts", Ty::Timestamp, 8),
Field::new("b1", Ty::Bool, 1),
Field::new("c8i1", Ty::TinyInt, 1),
Field::new("c16i1", Ty::SmallInt, 2),
Field::new("c32i1", Ty::Int, 4),
Field::new("c64i1", Ty::BigInt, 8),
Field::new("c8u1", Ty::UTinyInt, 1),
Field::new("c16u1", Ty::USmallInt, 2),
Field::new("c32u1", Ty::UInt, 4),
Field::new("c64u1", Ty::UBigInt, 8),
Field::new("cb1", Ty::VarChar, 100),
Field::new("cn1", Ty::NChar, 10),
Field::new("b2", Ty::Bool, 1),
Field::new("c8i2", Ty::TinyInt, 1),
Field::new("c16i2", Ty::SmallInt, 2),
Field::new("c32i2", Ty::Int, 4),
Field::new("c64i2", Ty::BigInt, 8),
Field::new("c8u2", Ty::UTinyInt, 1),
Field::new("c16u2", Ty::USmallInt, 2),
Field::new("c32u2", Ty::UInt, 4),
Field::new("c64u2", Ty::UBigInt, 8),
Field::new("cb2", Ty::VarChar, 100),
Field::new("cn2", Ty::NChar, 10),
Field::new("jt", Ty::Json, 4096),
],
&[
8, 1, 1, 2, 4, 8, 1, 2, 4, 8, 102, 42, 1, 1, 2, 4, 8, 1, 2, 4, 8, 12, 66, 16387,
],
4,
Precision::Millisecond,
);
let bytes = views_to_raw_block(&block.columns);
let raw2 = RawBlock::parse_from_raw_block(bytes, block.precision);
let added = &raw2 + &raw2;
dbg!(raw2, added);
}
#[test]
fn test_v2_null() {
let raw = RawBlock::parse_from_raw_block_v2(
[0x2, 0].as_slice(),
&[Field::new("b", Ty::Bool, 1)],
&[1],
2,
Precision::Millisecond,
);
dbg!(&raw);
let bytes = views_to_raw_block(&raw.columns);
let raw2 = RawBlock::parse_from_raw_block(bytes, raw.precision);
dbg!(raw2);
let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
assert!(null.is_null());
let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(1, 0) };
assert!(!null.is_null());
let raw = RawBlock::parse_from_raw_block_v2(
[0x80].as_slice(),
&[Field::new("b", Ty::TinyInt, 1)],
&[1],
1,
Precision::Millisecond,
);
dbg!(&raw);
let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
assert!(null.is_null());
let raw = RawBlock::parse_from_raw_block_v2(
[0, 0, 0, 0x80].as_slice(),
&[Field::new("a", Ty::Int, 4)],
&[4],
1,
Precision::Millisecond,
);
dbg!(&raw);
let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
assert!(null.is_null());
let raw = RawBlock::parse_from_raw_block_v2(
[0, 0, 0xf0, 0x7f].as_slice(),
&[Field::new("a", Ty::Float, 4)],
&[4],
1,
Precision::Millisecond,
);
let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
dbg!(raw);
assert!(null.is_null());
}
#[test]
fn test_from_v2() {
let raw = RawBlock::parse_from_raw_block_v2(
[1].as_slice(),
&[Field::new("a", Ty::TinyInt, 1)],
&[1],
1,
Precision::Millisecond,
);
let bytes = raw.as_raw_bytes();
let bytes = Bytes::copy_from_slice(bytes);
let raw2 = RawBlock::parse_from_raw_block(bytes, raw.precision());
dbg!(&raw, raw2);
let raw = RawBlock::parse_from_raw_block_v2(
[1, 0, 0, 0].as_slice(),
&[Field::new("a", Ty::Int, 4)],
&[4],
1,
Precision::Millisecond,
);
dbg!(&raw);
let raw = RawBlock::parse_from_raw_block_v2(
[2, 0, b'a', b'b'].as_slice(),
&[Field::new("b", Ty::VarChar, 2)],
&[4],
1,
Precision::Millisecond,
);
dbg!(&raw);
let raw = RawBlock::parse_from_raw_block_v2(
[2, 0, b'a', b'b'].as_slice(),
&[Field::new("b", Ty::VarChar, 2)],
&[4],
1,
Precision::Millisecond,
);
dbg!(&raw);
let raw = RawBlock::parse_from_raw_block_v2(
&[1, 1, 1][..],
&[
Field::new("a", Ty::TinyInt, 1),
Field::new("b", Ty::SmallInt, 2),
],
&[1, 2],
1,
Precision::Millisecond,
);
dbg!(&raw);
println!("{}", raw.pretty_format());
}