taos_query/common/raw/
mod.rs

1#![allow(clippy::size_of_in_element_count)]
2#![allow(clippy::type_complexity)]
3
4use crate::common::{BorrowedValue, Field, Precision, Ty, Value};
5
6use bytes::Bytes;
7use itertools::Itertools;
8use rayon::prelude::*;
9
10use serde::Deserialize;
11use taos_error::Error;
12
13use std::{
14    cell::{Cell, RefCell, UnsafeCell},
15    ffi::c_void,
16    fmt::Debug,
17    fmt::Display,
18    ops::Deref,
19    ptr::NonNull,
20    rc::Rc,
21};
22
23pub mod layout;
24pub mod meta;
25
26mod data;
27
28use layout::Layout;
29
30#[allow(clippy::missing_safety_doc)]
31#[allow(clippy::should_implement_trait)]
32pub mod views;
33
34pub use views::ColumnView;
35
36use views::*;
37
38pub use data::*;
39pub use meta::*;
40
41mod de;
42mod rows;
43pub use rows::*;
44
45use derive_builder::Builder;
46
47#[derive(Debug, Clone, Copy)]
48#[repr(C, packed(1))]
49struct Header {
50    version: u32,
51    length: u32,
52    nrows: u32,
53    ncols: u32,
54    flag: u32,
55    group_id: u64,
56}
57
58impl Default for Header {
59    fn default() -> Self {
60        Self {
61            version: 1,
62            length: Default::default(),
63            nrows: Default::default(),
64            ncols: Default::default(),
65            flag: u32::MAX,
66            group_id: Default::default(),
67        }
68    }
69}
70
71impl Header {
72    fn as_bytes(&self) -> &[u8] {
73        unsafe {
74            let ptr = self as *const Self;
75            let len = std::mem::size_of::<Self>();
76            std::slice::from_raw_parts(ptr as *const u8, len)
77        }
78    }
79
80    fn len(&self) -> usize {
81        self.length as _
82    }
83
84    fn nrows(&self) -> usize {
85        self.nrows as _
86    }
87    fn ncols(&self) -> usize {
88        self.ncols as _
89    }
90}
91
92/// Raw data block format (B for bytes):
93///
94/// ```text,ignore
95/// +-----+----------+---------------+-----------+-----------------------+-----------------+
96/// | header | col_schema... | length... | (bitmap or offsets    | col data)   ... |
97/// | 28 B   | (1+4)B * cols | 4B * cols | (row+7)/8 or 4 * rows | length[col] ... |
98/// +-----+----------+---------------+-----------+-----------------------+-----------------+
99/// ```
100///
101/// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
102/// recorded in the first segment, next to the struct header
103// #[derive(Debug)]
104pub struct RawBlock {
105    /// Layout is auto detected.
106    layout: Rc<RefCell<Layout>>,
107    /// Raw bytes version, may be v2 or v3.
108    version: Version,
109    /// Data is required, which could be v2 websocket block or a v3 raw block.
110    data: Cell<Bytes>,
111    /// Number of rows in current data block.
112    rows: usize,
113    /// Number of columns (or fields) in current data block.
114    cols: usize,
115    /// Timestamp precision in current data block.
116    precision: Precision,
117    /// Database name, if is tmq message.
118    database: Option<String>,
119    /// Table name of current data block.
120    table: Option<String>,
121    /// Field names of current data block.
122    fields: Vec<String>,
123    /// Group id in current data block, it always be 0 in v2 block, and be meaningful in v3.
124    group_id: u64,
125    /// Column schemas of current data block, contains only data type and the length defined in `create table`.
126    schemas: Schemas,
127    /// Data lengths collection for all columns.
128    lengths: Lengths,
129    /// A vector of [ColumnView] that represent column of values efficiently.
130    columns: Vec<ColumnView>,
131}
132
133unsafe impl Send for RawBlock {}
134unsafe impl Sync for RawBlock {}
135
136impl Debug for RawBlock {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        // todo: more helpful debug impl.
139        f.debug_struct("Raw")
140            .field("layout", &self.layout)
141            .field("version", &self.version)
142            .field("data", &"...")
143            .field("rows", &self.rows)
144            .field("cols", &self.cols)
145            .field("precision", &self.precision)
146            .field("table", &self.table)
147            .field("fields", &self.fields)
148            // .field("raw_fields", &self.raw_fields)
149            .field("group_id", &self.group_id)
150            .field("schemas", &self.schemas)
151            .field("lengths", &self.lengths)
152            .field("columns", &self.columns)
153            .finish()
154    }
155}
156
157impl RawBlock {
158    pub fn from_views(views: &[ColumnView], precision: Precision) -> Self {
159        Self::parse_from_raw_block(views_to_raw_block(views), precision)
160    }
161
162    /// # Safety
163    ///
164    /// When using this with correct TDengine 3.x raw block pointer, it's safe.
165    pub unsafe fn parse_from_ptr(ptr: *mut c_void, precision: Precision) -> Self {
166        let header = &*(ptr as *const Header);
167        let len = header.length as usize;
168        let bytes = std::slice::from_raw_parts(ptr as *const u8, len);
169        let bytes = Bytes::from(bytes.to_vec());
170        Self::parse_from_raw_block(bytes, precision).with_layout(Layout::default())
171    }
172
173    /// # Safety
174    ///
175    /// When using this with correct TDengine 2.x block pointer, it's safe.
176    #[allow(clippy::needless_range_loop)]
177    pub unsafe fn parse_from_ptr_v2(
178        ptr: *const *const c_void,
179        fields: &[Field],
180        lengths: &[u32],
181        rows: usize,
182        precision: Precision,
183    ) -> Self {
184        let mut bytes = Vec::new();
185        for i in 0..fields.len() {
186            let slice = ptr.add(i).read();
187            bytes.extend_from_slice(std::slice::from_raw_parts(
188                slice as *const u8,
189                lengths[i] as usize * rows,
190            ));
191        }
192        Self::parse_from_raw_block_v2(bytes, fields, lengths, rows, precision)
193    }
194
195    pub fn parse_from_raw_block_v2(
196        bytes: impl Into<Bytes>,
197        fields: &[Field],
198        lengths: &[u32],
199        rows: usize,
200        precision: Precision,
201    ) -> Self {
202        use bytes::BufMut;
203        debug_assert_eq!(fields.len(), lengths.len());
204
205        #[inline(always)]
206        fn bool_is_null(v: *const bool) -> bool {
207            unsafe { (v as *const u8).read_unaligned() == 0x02 }
208        }
209        #[inline(always)]
210        fn tiny_int_is_null(v: *const i8) -> bool {
211            unsafe { (v as *const u8).read_unaligned() == 0x80 }
212        }
213        #[inline(always)]
214        fn small_int_is_null(v: *const i16) -> bool {
215            unsafe { (v as *const u16).read_unaligned() == 0x8000 }
216        }
217        #[inline(always)]
218        fn int_is_null(v: *const i32) -> bool {
219            unsafe { (v as *const u32).read_unaligned() == 0x80000000 }
220        }
221        #[inline(always)]
222        fn big_int_is_null(v: *const i64) -> bool {
223            unsafe { (v as *const u64).read_unaligned() == 0x8000000000000000 }
224        }
225        #[inline(always)]
226        fn u_tiny_int_is_null(v: *const u8) -> bool {
227            unsafe { v.read_unaligned() == 0xFF }
228        }
229        #[inline(always)]
230        fn u_small_int_is_null(v: *const u16) -> bool {
231            unsafe { v.read_unaligned() == 0xFFFF }
232        }
233        #[inline(always)]
234        fn u_int_is_null(v: *const u32) -> bool {
235            unsafe { v.read_unaligned() == 0xFFFFFFFF }
236        }
237        #[inline(always)]
238        fn u_big_int_is_null(v: *const u64) -> bool {
239            unsafe { v.read_unaligned() == 0xFFFFFFFFFFFFFFFF }
240        }
241        #[inline(always)]
242        fn float_is_null(v: *const f32) -> bool {
243            unsafe { (v as *const u32).read_unaligned() == 0x7FF00000 }
244        }
245        #[inline(always)]
246        fn double_is_null(v: *const f64) -> bool {
247            unsafe { (v as *const u64).read_unaligned() == 0x7FFFFF0000000000 }
248        }
249
250        // const BOOL_NULL: u8 = 0x2;
251        // const TINY_INT_NULL: i8 = i8::MIN;
252        // const SMALL_INT_NULL: i16 = i16::MIN;
253        // const INT_NULL: i32 = i32::MIN;
254        // const BIG_INT_NULL: i64 = i64::MIN;
255        // const FLOAT_NULL: f32 = 0x7FF00000i32 as f32;
256        // const DOUBLE_NULL: f64 = 0x7FFFFF0000000000i64 as f64;
257        // const U_TINY_INT_NULL: u8 = u8::MAX;
258        // const U_SMALL_INT_NULL: u16 = u16::MAX;
259        // const U_INT_NULL: u32 = u32::MAX;
260        // const U_BIG_INT_NULL: u64 = u64::MAX;
261
262        let layout = Rc::new(RefCell::new(Layout::INLINE_DEFAULT.with_schema_changed()));
263
264        let bytes = bytes.into();
265        let cols = fields.len();
266
267        let mut schemas_bytes =
268            bytes::BytesMut::with_capacity(rows * std::mem::size_of::<ColSchema>());
269        fields
270            .iter()
271            .for_each(|f| schemas_bytes.put(f.to_column_schema().as_bytes()));
272        let schemas = Schemas::from(schemas_bytes);
273
274        let mut data_lengths = LengthsMut::new(cols);
275
276        let mut columns = Vec::new();
277
278        let mut offset = 0;
279
280        for (i, (field, length)) in fields.iter().zip(lengths).enumerate() {
281            macro_rules! _primitive_view {
282                ($ty:ident, $prim:ty) => {
283                    {
284                    debug_assert_eq!(field.bytes(), *length);
285                    // column start
286                    let start = offset;
287                    // column end
288                    offset += rows * std::mem::size_of::<$prim>() as usize;
289                    // byte slice from start to end: `[start, end)`.
290                    let data = bytes.slice(start..offset);
291                    let nulls = NullBits::from_iter((0..rows).map(|row| unsafe {
292                        paste::paste!{ [<$ty:snake _is_null>] (
293                            data
294                                .as_ptr()
295                                .offset(row as isize * std::mem::size_of::<$prim>() as isize)
296                                as *const $prim,
297                        ) }
298                    }));
299                    // value as target type
300                    // let value_slice = unsafe {
301                    //     std::slice::from_raw_parts(
302                    //         transmute::<*const u8, *const $prim>(data.as_ptr()),
303                    //         rows,
304                    //     )
305                    // };
306                    // Set data lengths for v3-compatible block.
307                    data_lengths[i] = data.len() as u32;
308
309                    // generate nulls bitmap.
310                    // let nulls = NullsMut::from_bools(
311                    //     value_slice
312                    //         .iter()
313                    //         .map(|v| paste::paste!{ [<$ty:snake _is_null>](v as _) })
314                    //         // .map(|b| *b as u64 == paste::paste! { [<$ty:snake:upper _NULL>] }),
315                    // )
316                    // .into_nulls();
317                    // build column view
318                    let column = paste::paste! { ColumnView::$ty([<$ty View>] { nulls, data }) };
319                    columns.push(column);
320                }};
321            }
322
323            match field.ty() {
324                Ty::Null => unreachable!(),
325
326                // Signed integers columns.
327                Ty::Bool => _primitive_view!(Bool, bool),
328                Ty::TinyInt => _primitive_view!(TinyInt, i8),
329                Ty::SmallInt => _primitive_view!(SmallInt, i16),
330                Ty::Int => _primitive_view!(Int, i32),
331                Ty::BigInt => _primitive_view!(BigInt, i64),
332                // Unsigned integers columns.
333                Ty::UTinyInt => _primitive_view!(UTinyInt, u8),
334                Ty::USmallInt => _primitive_view!(USmallInt, u16),
335                Ty::UInt => _primitive_view!(UInt, u32),
336                Ty::UBigInt => _primitive_view!(UBigInt, u64),
337                // Float columns.
338                Ty::Float => _primitive_view!(Float, f32),
339                Ty::Double => _primitive_view!(Double, f64),
340                Ty::VarChar => {
341                    let start = offset;
342                    offset += *length as usize * rows;
343                    let data = bytes.slice(start..offset);
344                    let data_ptr = data.as_ptr();
345
346                    let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
347                        let offset = row as i32 * *length as i32;
348                        let ptr = data_ptr.offset(offset as isize);
349                        let len = (ptr as *const u16).read_unaligned();
350                        if len == 1 && *ptr.offset(2) == 0xFF {
351                            -1
352                        } else {
353                            offset
354                        }
355                    }));
356
357                    columns.push(ColumnView::VarChar(VarCharView { offsets, data }));
358
359                    data_lengths[i] = *length * rows as u32;
360                }
361                Ty::Timestamp => {
362                    // column start
363                    let start = offset;
364                    // column end
365                    offset += rows * std::mem::size_of::<i64>();
366                    // byte slice from start to end: `[start, end)`.
367                    let data = bytes.slice(start..offset);
368                    let nulls = NullBits::from_iter((0..rows).map(|row| unsafe {
369                        big_int_is_null(
370                            &(data
371                                .as_ptr()
372                                .offset(row as isize * std::mem::size_of::<i64>() as isize)
373                                as *const i64)
374                                .read_unaligned() as _,
375                        )
376                    }));
377                    // Set data lengths for v3-compatible block.
378                    data_lengths[i] = data.len() as u32;
379
380                    // build column view
381                    let column = ColumnView::Timestamp(TimestampView {
382                        nulls,
383                        data,
384                        precision,
385                    });
386                    columns.push(column);
387                }
388                Ty::NChar => {
389                    let start = offset;
390                    offset += *length as usize * rows;
391                    let data = bytes.slice(start..offset);
392                    let data_ptr = data.as_ptr();
393
394                    let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
395                        let offset = row as i32 * *length as i32;
396                        let ptr = data_ptr.offset(offset as isize);
397                        let len = (ptr as *const u16).read_unaligned();
398                        if len == 4 && (ptr.offset(2) as *const u32).read_unaligned() == 0xFFFFFFFF
399                        {
400                            -1
401                        } else {
402                            offset
403                        }
404                    }));
405
406                    columns.push(ColumnView::NChar(NCharView {
407                        offsets,
408                        data,
409                        is_chars: UnsafeCell::new(false),
410                        version: Version::V2,
411                        layout: layout.clone(),
412                    }));
413
414                    data_lengths[i] = *length * rows as u32;
415                }
416                Ty::Json => {
417                    let start = offset;
418                    offset += *length as usize * rows;
419                    let data = bytes.slice(start..offset);
420                    let data_ptr = data.as_ptr();
421
422                    let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
423                        let offset = row as i32 * *length as i32;
424                        let ptr = data_ptr.offset(offset as isize);
425                        let len = (ptr as *const u16).read_unaligned();
426                        if len == 4 && (ptr.offset(2) as *const u32).read_unaligned() == 0xFFFFFFFF
427                        {
428                            -1
429                        } else {
430                            offset
431                        }
432                    }));
433
434                    columns.push(ColumnView::Json(JsonView { offsets, data }));
435
436                    data_lengths[i] = *length * rows as u32;
437                }
438                Ty::VarBinary => todo!(),
439                Ty::Decimal => todo!(),
440                Ty::Blob => todo!(),
441                Ty::MediumBlob => todo!(),
442                Ty::Geometry => todo!(),
443            }
444        }
445
446        Self {
447            layout,
448            version: Version::V2,
449            data: Cell::new(bytes),
450            rows,
451            cols,
452            schemas,
453            lengths: data_lengths.into_lengths(),
454            precision,
455            database: None,
456            table: None,
457            fields: fields.iter().map(|s| s.name().to_string()).collect(),
458            columns,
459            group_id: 0,
460            // raw_fields: Vec::new(),
461        }
462    }
463
464    pub fn parse_from_raw_block(bytes: impl Into<Bytes>, precision: Precision) -> Self {
465        let schema_start: usize = std::mem::size_of::<Header>();
466
467        let layout = Rc::new(RefCell::new(Layout::INLINE_DEFAULT));
468
469        let bytes = bytes.into();
470        let ptr = bytes.as_ptr();
471
472        let header = unsafe { &*(ptr as *const Header) };
473
474        let rows = header.nrows();
475        let cols = header.ncols();
476        let len = header.len();
477        debug_assert_eq!(bytes.len(), len);
478        let group_id = header.group_id;
479
480        let schema_end = schema_start + cols * std::mem::size_of::<ColSchema>();
481        let schemas = Schemas::from(bytes.slice(schema_start..schema_end));
482        // dbg!(&schemas);
483        let lengths_end = schema_end + std::mem::size_of::<u32>() * cols;
484        let lengths = Lengths::from(bytes.slice(schema_end..lengths_end));
485        // dbg!(&lengths);
486        let mut data_offset = lengths_end;
487        let mut columns = Vec::with_capacity(cols);
488        for col in 0..cols {
489            // go for each column
490            let length = unsafe { lengths.get_unchecked(col) } as usize;
491            let schema = unsafe { schemas.get_unchecked(col) };
492
493            macro_rules! _primitive_value {
494                ($ty:ident, $prim:ty) => {{
495                    let o1 = data_offset;
496                    let o2 = data_offset + ((rows + 7) >> 3); // null bitmap len.
497                    data_offset = o2 + rows * std::mem::size_of::<$prim>();
498                    let nulls = bytes.slice(o1..o2);
499                    let data = bytes.slice(o2..data_offset);
500                    ColumnView::$ty(paste::paste! {[<$ty View>] {
501                        nulls: NullBits(nulls),
502                        data,
503                    }})
504                }};
505            }
506
507            let column = match schema.ty {
508                Ty::Null => unreachable!("raw block does not contains type NULL"),
509                Ty::Bool => _primitive_value!(Bool, i8),
510                Ty::TinyInt => _primitive_value!(TinyInt, i8),
511                Ty::SmallInt => _primitive_value!(SmallInt, i16),
512                Ty::Int => _primitive_value!(Int, i32),
513                Ty::BigInt => _primitive_value!(BigInt, i64),
514                Ty::Float => _primitive_value!(Float, f32),
515                Ty::Double => _primitive_value!(Double, f64),
516                Ty::VarChar => {
517                    let o1 = data_offset;
518                    let o2 = data_offset + std::mem::size_of::<i32>() * rows;
519                    data_offset = o2 + length;
520
521                    let offsets = Offsets::from(bytes.slice(o1..o2));
522                    let data = bytes.slice(o2..data_offset);
523
524                    ColumnView::VarChar(VarCharView { offsets, data })
525                }
526                Ty::Timestamp => {
527                    let o1 = data_offset;
528                    let o2 = data_offset + ((rows + 7) >> 3);
529                    data_offset = o2 + rows * std::mem::size_of::<i64>();
530                    let nulls = bytes.slice(o1..o2);
531                    let data = bytes.slice(o2..data_offset);
532                    ColumnView::Timestamp(TimestampView {
533                        nulls: NullBits(nulls),
534                        data,
535                        precision,
536                    })
537                }
538                Ty::NChar => {
539                    let o1 = data_offset;
540                    let o2 = data_offset + std::mem::size_of::<i32>() * rows;
541                    data_offset = o2 + length;
542
543                    let offsets = Offsets::from(bytes.slice(o1..o2));
544                    let data = bytes.slice(o2..data_offset);
545
546                    ColumnView::NChar(NCharView {
547                        offsets,
548                        data,
549                        is_chars: UnsafeCell::new(true),
550                        version: Version::V3,
551                        layout: layout.clone(),
552                    })
553                }
554                Ty::UTinyInt => _primitive_value!(UTinyInt, u8),
555                Ty::USmallInt => _primitive_value!(USmallInt, u16),
556                Ty::UInt => _primitive_value!(UInt, u32),
557                Ty::UBigInt => _primitive_value!(UBigInt, u64),
558                Ty::Json => {
559                    let o1 = data_offset;
560                    let o2 = data_offset + std::mem::size_of::<i32>() * rows;
561                    data_offset = o2 + length;
562
563                    let offsets = Offsets::from(bytes.slice(o1..o2));
564                    let data = bytes.slice(o2..data_offset);
565
566                    ColumnView::Json(JsonView { offsets, data })
567                }
568                Ty::VarBinary => {
569                    let o1 = data_offset;
570                    let o2 = data_offset + std::mem::size_of::<i32>() * rows;
571                    data_offset = o2 + length;
572
573                    let offsets = Offsets::from(bytes.slice(o1..o2));
574                    let data: Bytes = bytes.slice(o2..data_offset);
575
576                    ColumnView::VarBinary(VarBinaryView { offsets, data })
577                }
578                Ty::Geometry => {
579                    let o1 = data_offset;
580                    let o2 = data_offset + std::mem::size_of::<i32>() * rows;
581                    data_offset = o2 + length;
582
583                    let offsets = Offsets::from(bytes.slice(o1..o2));
584                    let data = bytes.slice(o2..data_offset);
585
586                    ColumnView::Geometry(GeometryView { offsets, data })
587                }
588
589                ty => {
590                    unreachable!("unsupported type: {ty}")
591                }
592            };
593            columns.push(column);
594            debug_assert!(data_offset <= len);
595        }
596        RawBlock {
597            layout,
598            version: Version::V3,
599            data: Cell::new(bytes),
600            rows,
601            cols,
602            precision,
603            group_id,
604            schemas,
605            lengths,
606            database: None,
607            table: None,
608            fields: Vec::new(),
609            columns,
610        }
611    }
612
613    /// Set table name of the block
614    pub fn with_database_name(&mut self, name: impl Into<String>) -> &mut Self {
615        self.database = Some(name.into());
616        self
617    }
618    /// Set table name of the block
619    pub fn with_table_name(&mut self, name: impl Into<String>) -> &mut Self {
620        self.table = Some(name.into());
621        self.layout.borrow_mut().with_table_name();
622
623        self
624    }
625
626    /// Set field names of the block
627    pub fn with_field_names<S: Display, I: IntoIterator<Item = S>>(
628        &mut self,
629        names: I,
630    ) -> &mut Self {
631        self.fields = names.into_iter().map(|name| name.to_string()).collect();
632        self.layout.borrow_mut().with_field_names();
633        self
634    }
635
636    fn with_layout(mut self, layout: Layout) -> Self {
637        self.layout = Rc::new(RefCell::new(layout));
638        self
639    }
640
641    /// Number of columns
642    #[inline]
643    pub fn ncols(&self) -> usize {
644        self.columns.len()
645    }
646
647    /// Number of rows
648    #[inline]
649    pub fn nrows(&self) -> usize {
650        self.rows
651    }
652
653    /// Precision for current block.
654    #[inline]
655    pub const fn precision(&self) -> Precision {
656        self.precision
657    }
658
659    #[inline]
660    pub const fn group_id(&self) -> u64 {
661        self.group_id
662    }
663
664    #[inline]
665    pub fn table_name(&self) -> Option<&str> {
666        self.table.as_deref()
667    }
668
669    // todo: db name?
670    #[inline]
671    pub fn tmq_db_name(&self) -> Option<&str> {
672        self.database.as_deref()
673    }
674
675    #[inline]
676    pub fn schemas(&self) -> &[ColSchema] {
677        &self.schemas
678    }
679
680    /// Get field names.
681    pub fn field_names(&self) -> &[String] {
682        &self.fields
683    }
684
685    /// Data view in columns.
686    #[inline]
687    pub fn columns(&self) -> std::slice::Iter<ColumnView> {
688        self.columns.iter()
689    }
690
691    pub fn column_views(&self) -> &[ColumnView] {
692        &self.columns
693    }
694
695    /// Data view in rows.
696    #[inline]
697    pub fn rows<'a>(&self) -> RowsIter<'a> {
698        RowsIter {
699            raw: NonNull::new(self as *const Self as *mut Self).unwrap(),
700            row: 0,
701            _marker: std::marker::PhantomData,
702        }
703    }
704
705    #[inline]
706    pub fn into_rows<'a>(self) -> IntoRowsIter<'a>
707    where
708        Self: 'a,
709    {
710        IntoRowsIter {
711            raw: self,
712            row: 0,
713            _marker: std::marker::PhantomData,
714        }
715    }
716
717    #[inline]
718    pub fn deserialize<'de, 'a: 'de, T>(
719        &'a self,
720    ) -> std::iter::Map<rows::RowsIter<'_>, fn(RowView<'a>) -> Result<T, DeError>>
721    where
722        T: Deserialize<'de>,
723    {
724        self.rows().map(|mut row| T::deserialize(&mut row))
725    }
726
727    pub fn as_raw_bytes(&self) -> &[u8] {
728        if self.layout.borrow().schema_changed() {
729            let bytes = views_to_raw_block(&self.columns);
730            let bytes = bytes.into();
731            self.data.replace(bytes);
732            self.layout.borrow_mut().set_schema_changed(false);
733            unsafe { &*self.data.as_ptr() }
734        } else {
735            unsafe { &(*self.data.as_ptr()) }
736        }
737    }
738
739    pub fn is_null(&self, row: usize, col: usize) -> bool {
740        if row >= self.nrows() || col >= self.ncols() {
741            return true;
742        }
743        unsafe { self.columns.get_unchecked(col).is_null_unchecked(row) }
744    }
745
746    #[inline]
747    /// Get one value at `(row, col)` of the block.
748    ///
749    /// # Safety
750    ///
751    /// `(row, col)` should not exceed the block limit.
752    pub unsafe fn get_raw_value_unchecked(
753        &self,
754        row: usize,
755        col: usize,
756    ) -> (Ty, u32, *const c_void) {
757        let view = self.columns.get_unchecked(col);
758        view.get_raw_value_unchecked(row)
759    }
760
761    pub fn get_ref(&self, row: usize, col: usize) -> Option<BorrowedValue> {
762        if row >= self.nrows() || col >= self.ncols() {
763            return None;
764        }
765        Some(unsafe { self.get_ref_unchecked(row, col) })
766    }
767
768    #[inline]
769    /// Get one value at `(row, col)` of the block.
770    ///
771    /// # Safety
772    ///
773    /// Ensure that `row` and `col` not exceed the limit of the block.
774    pub unsafe fn get_ref_unchecked(&self, row: usize, col: usize) -> BorrowedValue {
775        self.columns.get_unchecked(col).get_ref_unchecked(row)
776    }
777
778    // unsafe fn get_col_unchecked(&self, col: usize) -> &ColumnView {
779    //     self.columns.get_unchecked(col)
780    // }
781
782    pub fn to_values(&self) -> Vec<Vec<Value>> {
783        self.rows().map(|row| row.into_values()).collect_vec()
784    }
785
786    pub fn write<W: std::io::Write>(&self, _wtr: W) -> std::io::Result<usize> {
787        todo!()
788    }
789
790    fn layout(&self) -> Layout {
791        Layout::from_bits(self.layout.borrow().as_inner()).unwrap()
792    }
793
794    pub fn fields(&self) -> Vec<Field> {
795        self.schemas()
796            .iter()
797            .zip(self.field_names())
798            .map(|(schema, name)| Field::new(name, schema.ty, schema.len))
799            .collect_vec()
800    }
801
802    pub fn pretty_format(&self) -> PrettyBlock {
803        PrettyBlock::new(self)
804    }
805
806    // pub fn fields_iter(&self) -> impl Iterator<Item = Field> + '_ {
807    //     self.schemas()
808    //         .iter()
809    //         .zip(self.field_names())
810    //         .map(|(schema, name)| Field::new(name, schema.ty, schema.len))
811    // }
812
813    pub fn to_create(&self) -> Option<MetaCreate> {
814        self.table_name().map(|table_name| MetaCreate::Normal {
815            table_name: table_name.to_string(),
816            columns: self.fields().into_iter().map(Into::into).collect(),
817        })
818    }
819
820    /// Concatenate two blocks into one by rows.
821    ///
822    /// # Panics
823    ///
824    /// If the two blocks have different schemas.
825    pub fn concat(&self, rhs: &Self) -> Self {
826        debug_assert_eq!(self.ncols(), rhs.ncols());
827        #[cfg(debug_assertions)]
828        {
829            for (l, r) in self.fields().into_iter().zip(rhs.fields()) {
830                debug_assert_eq!(l, r);
831            }
832        }
833        let fields = self.field_names();
834        let views = rhs
835            .column_views()
836            .iter()
837            .zip(self.column_views())
838            .collect_vec()
839            .into_par_iter()
840            .map(|(r, l)| r.concat_strictly(l))
841            .collect::<Vec<_>>();
842        let mut block = Self::from_views(&views, self.precision());
843        block.with_field_names(fields);
844        if let Some(table_name) = self.table_name().or(rhs.table_name()) {
845            block.with_table_name(table_name);
846        }
847        block
848    }
849
850    /// Cast the block into a new block with new precision.
851    pub fn cast_precision(&self, precision: Precision) -> Self {
852        let views = self
853            .column_views()
854            .iter()
855            .map(|view| view.cast_precision(precision).to_owned())
856            .collect::<Vec<ColumnView>>();
857        let mut block = Self::from_views(&views, precision);
858        block.with_field_names(self.field_names());
859        if let Some(table_name) = self.table_name() {
860            block.with_table_name(table_name);
861        }
862        block
863    }
864}
865
866impl std::ops::Add for RawBlock {
867    type Output = Self;
868
869    fn add(self, rhs: Self) -> Self::Output {
870        self.concat(&rhs)
871    }
872}
873
874impl std::ops::Add for &RawBlock {
875    type Output = RawBlock;
876
877    fn add(self, rhs: Self) -> Self::Output {
878        self.concat(rhs)
879    }
880}
881
882pub struct PrettyBlock<'a> {
883    raw: &'a RawBlock,
884}
885
886impl<'a> PrettyBlock<'a> {
887    fn new(raw: &'a RawBlock) -> Self {
888        Self { raw }
889    }
890}
891
892impl<'a> Deref for PrettyBlock<'a> {
893    type Target = RawBlock;
894    fn deref(&self) -> &Self::Target {
895        self.raw
896    }
897}
898
899impl<'a> Display for PrettyBlock<'a> {
900    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
901        use prettytable::{Row, Table};
902        let mut table = Table::new();
903        writeln!(
904            f,
905            "Table view with {} rows, {} columns, table name \"{}\"",
906            self.nrows(),
907            self.ncols(),
908            self.table_name().unwrap_or_default(),
909        )?;
910        table.set_titles(Row::from_iter(self.field_names()));
911        let nrows = self.nrows();
912        const MAX_DISPLAY_ROWS: usize = 10;
913        let mut rows_iter = self.raw.rows();
914        if f.alternate() {
915            for row in rows_iter {
916                table.add_row(Row::from_iter(
917                    row.map(|s| s.1.to_string().unwrap_or_default()),
918                ));
919            }
920        } else if nrows > 2 * MAX_DISPLAY_ROWS {
921            for row in (&mut rows_iter).take(MAX_DISPLAY_ROWS) {
922                table.add_row(Row::from_iter(
923                    row.map(|s| s.1.to_string().unwrap_or_default()),
924                ));
925            }
926            table.add_row(Row::from_iter(std::iter::repeat("...").take(self.ncols())));
927            for row in rows_iter.skip(nrows - 2 * MAX_DISPLAY_ROWS) {
928                table.add_row(Row::from_iter(
929                    row.map(|s| s.1.to_string().unwrap_or_default()),
930                ));
931            }
932        } else {
933            for row in rows_iter {
934                table.add_row(Row::from_iter(
935                    row.map(|s| s.1.to_string().unwrap_or_default()),
936                ));
937            }
938        }
939
940        f.write_fmt(format_args!("{}", table))?;
941        Ok(())
942    }
943}
944
945struct InlineBlock(Bytes);
946
947impl From<InlineBlock> for Bytes {
948    fn from(raw: InlineBlock) -> Self {
949        raw.0
950    }
951}
952
953impl crate::prelude::sync::Inlinable for InlineBlock {
954    fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self>
955    where
956        Self: Sized,
957    {
958        use crate::prelude::sync::InlinableRead;
959        let version = reader.read_u32()?;
960        let len = reader.read_u32()?;
961        let mut bytes = vec![0; len as usize];
962        unsafe {
963            std::ptr::copy_nonoverlapping(
964                version.to_le_bytes().as_ptr(),
965                bytes.as_mut_ptr(),
966                std::mem::size_of::<u32>(),
967            );
968            std::ptr::copy_nonoverlapping(
969                len.to_le_bytes().as_ptr(),
970                bytes.as_mut_ptr().offset(4),
971                std::mem::size_of::<u32>(),
972            );
973        }
974        let buf = &mut bytes[8..];
975        reader.read_exact(buf)?;
976        Ok(Self(bytes.into()))
977    }
978
979    fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
980        wtr.write_all(self.0.as_ref())?;
981        Ok(self.0.len())
982    }
983}
984#[async_trait::async_trait]
985impl crate::prelude::AsyncInlinable for InlineBlock {
986    async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
987        reader: &mut R,
988    ) -> std::io::Result<Self>
989    where
990        Self: Sized,
991    {
992        use tokio::io::*;
993
994        let version = reader.read_u32_le().await?;
995        let len = reader.read_u32_le().await?;
996        let mut bytes = vec![0; len as usize];
997        unsafe {
998            std::ptr::copy_nonoverlapping(
999                version.to_le_bytes().as_ptr(),
1000                bytes.as_mut_ptr(),
1001                std::mem::size_of::<u32>(),
1002            );
1003            std::ptr::copy_nonoverlapping(
1004                len.to_le_bytes().as_ptr(),
1005                bytes.as_mut_ptr().offset(4),
1006                std::mem::size_of::<u32>(),
1007            );
1008        }
1009        let buf = &mut bytes[8..];
1010        reader.read_exact(buf).await?;
1011        Ok(Self(bytes.into()))
1012    }
1013
1014    async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
1015        &self,
1016        wtr: &mut W,
1017    ) -> std::io::Result<usize> {
1018        use tokio::io::*;
1019        wtr.write_all(self.0.as_ref()).await?;
1020        Ok(self.0.len())
1021    }
1022}
1023
1024impl crate::prelude::sync::Inlinable for RawBlock {
1025    fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
1026        use crate::prelude::sync::InlinableRead;
1027        let layout = reader.read_u32()?;
1028        let layout = Layout::from_bits(layout).expect("should be layout");
1029
1030        let precision = layout.precision();
1031        let raw: InlineBlock = reader.read_inlinable()?;
1032        let mut raw = Self::parse_from_raw_block(raw.0, precision);
1033        let cols = raw.ncols();
1034
1035        if layout.expect_table_name() {
1036            let name = reader.read_inlined_str::<2>()?;
1037            raw.with_table_name(name);
1038        }
1039        if layout.expect_field_names() {
1040            let names: Vec<_> = (0..cols)
1041                .map(|_| reader.read_inlined_str::<1>())
1042                .try_collect()?;
1043            raw.with_field_names(names);
1044        }
1045
1046        Ok(raw)
1047    }
1048
1049    fn read_optional_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Option<Self>> {
1050        use crate::prelude::sync::InlinableRead;
1051        let layout = reader.read_u32()?;
1052        if layout == 0xFFFFFFFF {
1053            return Ok(None);
1054        }
1055        let layout = Layout::from_bits(layout).expect("should be layout");
1056
1057        let precision = layout.precision();
1058        let raw: InlineBlock = reader.read_inlinable()?;
1059        let mut raw = Self::parse_from_raw_block(raw.0, precision);
1060
1061        if layout.expect_table_name() {
1062            let name = reader.read_inlined_str::<2>()?;
1063            raw.with_table_name(name);
1064        }
1065        if layout.expect_field_names() {
1066            let names: Vec<_> = (0..raw.ncols())
1067                .map(|_| reader.read_inlined_str::<1>())
1068                .try_collect()?;
1069            raw.with_field_names(names);
1070        }
1071        log::trace!(
1072            "table name: {}, cols: {}, rows: {}",
1073            &raw.table_name().unwrap_or("(?)"),
1074            raw.ncols(),
1075            raw.nrows()
1076        );
1077
1078        Ok(Some(raw))
1079    }
1080
1081    fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
1082        use crate::prelude::sync::InlinableWrite;
1083        let layout = self.layout();
1084        let mut l = wtr.write_u32_le(layout.as_inner())?;
1085
1086        let raw = self.as_raw_bytes();
1087        wtr.write_all(raw)?;
1088        l += raw.len();
1089
1090        if layout.expect_table_name() {
1091            let name = self.table_name().expect("table name should be known");
1092            l += wtr.write_inlined_bytes::<2>(name.as_bytes())?;
1093        }
1094        if layout.expect_field_names() {
1095            debug_assert_eq!(self.field_names().len(), self.ncols());
1096            for field in self.field_names() {
1097                l += wtr.write_inlined_str::<1>(field)?;
1098            }
1099        }
1100        Ok(l)
1101    }
1102}
1103
1104#[repr(C)]
1105#[derive(Default, Debug, Copy, Clone)]
1106pub enum SchemalessProtocol {
1107    Unknown = 0,
1108    #[default]
1109    Line,
1110    Telnet,
1111    Json,
1112}
1113
1114#[repr(C)]
1115#[derive(Default, Debug, Copy, Clone)]
1116pub enum SchemalessPrecision {
1117    NonConfigured = 0,
1118    Hours,
1119    Minutes,
1120    Seconds,
1121    #[default]
1122    Millisecond,
1123    Microsecond,
1124    Nanosecond,
1125}
1126
1127impl From<SchemalessPrecision> for String {
1128    fn from(precision: SchemalessPrecision) -> Self {
1129        match precision {
1130            SchemalessPrecision::Millisecond => "ms".to_string(),
1131            SchemalessPrecision::Microsecond => "us".to_string(),
1132            SchemalessPrecision::Nanosecond => "ns".to_string(),
1133            _ => todo!(),
1134        }
1135    }
1136}
1137
1138#[derive(Default, Builder, Debug)]
1139#[builder(setter(into))]
1140pub struct SmlData {
1141    protocol: SchemalessProtocol,
1142    #[builder(setter(into, strip_option), default)]
1143    precision: SchemalessPrecision,
1144    data: Vec<String>,
1145    #[builder(setter(into, strip_option), default)]
1146    ttl: Option<i32>,
1147    #[builder(setter(into, strip_option), default)]
1148    req_id: Option<u64>,
1149}
1150
1151impl SmlData {
1152    #[inline]
1153    pub fn protocol(&self) -> SchemalessProtocol {
1154        self.protocol
1155    }
1156
1157    #[inline]
1158    pub fn precision(&self) -> SchemalessPrecision {
1159        self.precision
1160    }
1161
1162    #[inline]
1163    pub fn data(&self) -> &Vec<String> {
1164        &self.data
1165    }
1166
1167    #[inline]
1168    pub fn ttl(&self) -> Option<i32> {
1169        self.ttl
1170    }
1171
1172    #[inline]
1173    pub fn req_id(&self) -> Option<u64> {
1174        self.req_id
1175    }
1176}
1177
1178impl From<SmlDataBuilderError> for Error {
1179    fn from(value: SmlDataBuilderError) -> Self {
1180        Error::from_any(value)
1181    }
1182}
1183#[async_trait::async_trait]
1184impl crate::prelude::AsyncInlinable for RawBlock {
1185    async fn read_optional_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
1186        reader: &mut R,
1187    ) -> std::io::Result<Option<Self>> {
1188        use crate::util::AsyncInlinableRead;
1189        use tokio::io::*;
1190        let layout = reader.read_u32_le().await?;
1191        if layout == 0xFFFFFFFF {
1192            return Ok(None);
1193        }
1194        let layout = Layout::from_bits(layout).expect("invalid layout");
1195
1196        let precision = layout.precision();
1197
1198        let raw: InlineBlock =
1199            <InlineBlock as crate::prelude::AsyncInlinable>::read_inlined(reader).await?;
1200        let mut raw = Self::parse_from_raw_block(raw.0, precision);
1201
1202        if layout.expect_table_name() {
1203            let name = reader.read_inlined_str::<2>().await?;
1204            raw.with_table_name(name);
1205        }
1206        if layout.expect_field_names() {
1207            let mut names = Vec::with_capacity(raw.ncols());
1208            for _ in 0..raw.ncols() {
1209                names.push(reader.read_inlined_str::<1>().await?);
1210            }
1211            raw.with_field_names(names);
1212        }
1213
1214        Ok(Some(raw))
1215    }
1216
1217    async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
1218        reader: &mut R,
1219    ) -> std::io::Result<Self> {
1220        <Self as crate::prelude::AsyncInlinable>::read_optional_inlined(reader)
1221            .await?
1222            .ok_or(std::io::Error::new(
1223                std::io::ErrorKind::InvalidData,
1224                "invalid raw data format",
1225            ))
1226    }
1227
1228    async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
1229        &self,
1230        wtr: &mut W,
1231    ) -> std::io::Result<usize> {
1232        use crate::util::AsyncInlinableWrite;
1233        use tokio::io::*;
1234
1235        let layout = self.layout();
1236        wtr.write_u32_le(layout.as_inner()).await?;
1237
1238        let raw = self.as_raw_bytes();
1239        wtr.write_all(raw).await?;
1240
1241        let mut l = std::mem::size_of::<u32>() + raw.len();
1242
1243        if layout.expect_table_name() {
1244            let name = self.table_name().expect("table name should be known");
1245            l += wtr.write_inlined_bytes::<2>(name.as_bytes()).await?;
1246        }
1247        if layout.expect_field_names() {
1248            debug_assert_eq!(self.field_names().len(), self.ncols());
1249            for field in self.field_names() {
1250                l += wtr.write_inlined_str::<1>(field).await?;
1251            }
1252        }
1253
1254        Ok(l)
1255    }
1256}
1257
1258#[tokio::test]
1259async fn test_raw_from_v2() {
1260    use crate::prelude::AsyncInlinable;
1261    use std::ops::Deref;
1262    // pretty_env_logger::formatted_builder()
1263    //     .filter_level(log::LevelFilter::Trace)
1264    //     .init();
1265    let bytes = b"\x10\x86\x1aA \xcc)AB\xc2\x14AZ],A\xa2\x8d$A\x87\xb9%A\xf5~\x0fA\x96\xf7,AY\xee\x17A1|\x15As\x00\x00\x00q\x00\x00\x00s\x00\x00\x00t\x00\x00\x00u\x00\x00\x00t\x00\x00\x00n\x00\x00\x00n\x00\x00\x00n\x00\x00\x00r\x00\x00\x00";
1266
1267    let block = RawBlock::parse_from_raw_block_v2(
1268        bytes.as_slice(),
1269        &[Field::new("a", Ty::Float, 4), Field::new("b", Ty::Int, 4)],
1270        &[4, 4],
1271        10,
1272        Precision::Millisecond,
1273    );
1274    assert!(block.lengths.deref() == &[40, 40]);
1275
1276    let bytes = include_bytes!("../../../tests/test.txt");
1277
1278    let block = RawBlock::parse_from_raw_block_v2(
1279        bytes.as_slice(),
1280        &[
1281            Field::new("ts", Ty::Timestamp, 8),
1282            Field::new("current", Ty::Float, 4),
1283            Field::new("voltage", Ty::Int, 4),
1284            Field::new("phase", Ty::Float, 4),
1285            Field::new("group_id", Ty::Int, 4),
1286            Field::new("location", Ty::VarChar, 16),
1287        ],
1288        &[8, 4, 4, 4, 4, 18],
1289        10,
1290        Precision::Millisecond,
1291    );
1292
1293    #[derive(Debug, serde::Deserialize)]
1294    #[allow(dead_code)]
1295    struct Record {
1296        ts: String,
1297        current: f32,
1298        voltage: i32,
1299        phase: f32,
1300        group_id: i32,
1301        location: String,
1302    }
1303    let rows: Vec<Record> = block.deserialize().try_collect().unwrap();
1304    dbg!(rows);
1305    // dbg!(block);
1306    let bytes = views_to_raw_block(&block.columns);
1307    let raw2 = RawBlock::parse_from_raw_block(bytes, block.precision);
1308    dbg!(&raw2);
1309    let inlined = raw2.inlined().await;
1310    dbg!(&inlined);
1311    let mut raw3 = RawBlock::read_inlined(&mut inlined.as_slice())
1312        .await
1313        .unwrap();
1314    raw3.with_field_names(["ts", "current", "voltage", "phase", "group_id", "location"])
1315        .with_table_name("meters");
1316    dbg!(&raw3);
1317
1318    let raw4 = RawBlock::read_inlined(&mut raw3.inlined().await.as_slice())
1319        .await
1320        .unwrap();
1321    dbg!(&raw4);
1322    assert_eq!(raw3.table_name(), raw4.table_name());
1323
1324    let raw5 = RawBlock::read_optional_inlined(&mut raw3.inlined().await.as_slice())
1325        .await
1326        .unwrap();
1327    dbg!(raw5);
1328}
1329
1330#[test]
1331fn test_v2_full() {
1332    let bytes = include_bytes!("../../../tests/v2.block.gz");
1333
1334    use flate2::read::GzDecoder;
1335    use std::io::prelude::*;
1336    let mut buf = Vec::new();
1337    let len = GzDecoder::new(&bytes[..]).read_to_end(&mut buf).unwrap();
1338    assert_eq!(len, 66716);
1339    let block = RawBlock::parse_from_raw_block_v2(
1340        buf,
1341        &[
1342            Field::new("ts", Ty::Timestamp, 8),
1343            Field::new("b1", Ty::Bool, 1),
1344            Field::new("c8i1", Ty::TinyInt, 1),
1345            Field::new("c16i1", Ty::SmallInt, 2),
1346            Field::new("c32i1", Ty::Int, 4),
1347            Field::new("c64i1", Ty::BigInt, 8),
1348            Field::new("c8u1", Ty::UTinyInt, 1),
1349            Field::new("c16u1", Ty::USmallInt, 2),
1350            Field::new("c32u1", Ty::UInt, 4),
1351            Field::new("c64u1", Ty::UBigInt, 8),
1352            Field::new("cb1", Ty::VarChar, 100),
1353            Field::new("cn1", Ty::NChar, 10),
1354            Field::new("b2", Ty::Bool, 1),
1355            Field::new("c8i2", Ty::TinyInt, 1),
1356            Field::new("c16i2", Ty::SmallInt, 2),
1357            Field::new("c32i2", Ty::Int, 4),
1358            Field::new("c64i2", Ty::BigInt, 8),
1359            Field::new("c8u2", Ty::UTinyInt, 1),
1360            Field::new("c16u2", Ty::USmallInt, 2),
1361            Field::new("c32u2", Ty::UInt, 4),
1362            Field::new("c64u2", Ty::UBigInt, 8),
1363            Field::new("cb2", Ty::VarChar, 100),
1364            Field::new("cn2", Ty::NChar, 10),
1365            Field::new("jt", Ty::Json, 4096),
1366        ],
1367        &[
1368            8, 1, 1, 2, 4, 8, 1, 2, 4, 8, 102, 42, 1, 1, 2, 4, 8, 1, 2, 4, 8, 12, 66, 16387,
1369        ],
1370        4,
1371        Precision::Millisecond,
1372    );
1373    let bytes = views_to_raw_block(&block.columns);
1374    let raw2 = RawBlock::parse_from_raw_block(bytes, block.precision);
1375
1376    let added = &raw2 + &raw2;
1377    dbg!(raw2, added);
1378}
1379
1380#[test]
1381fn test_v2_null() {
1382    let raw = RawBlock::parse_from_raw_block_v2(
1383        [0x2, 0].as_slice(),
1384        &[Field::new("b", Ty::Bool, 1)],
1385        &[1],
1386        2,
1387        Precision::Millisecond,
1388    );
1389    dbg!(&raw);
1390    let bytes = views_to_raw_block(&raw.columns);
1391    let raw2 = RawBlock::parse_from_raw_block(bytes, raw.precision);
1392    dbg!(raw2);
1393    let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1394    assert!(null.is_null());
1395    let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(1, 0) };
1396    assert!(!null.is_null());
1397
1398    let raw = RawBlock::parse_from_raw_block_v2(
1399        [0x80].as_slice(),
1400        &[Field::new("b", Ty::TinyInt, 1)],
1401        &[1],
1402        1,
1403        Precision::Millisecond,
1404    );
1405    dbg!(&raw);
1406    let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1407    assert!(null.is_null());
1408
1409    let raw = RawBlock::parse_from_raw_block_v2(
1410        [0, 0, 0, 0x80].as_slice(),
1411        &[Field::new("a", Ty::Int, 4)],
1412        &[4],
1413        1,
1414        Precision::Millisecond,
1415    );
1416    dbg!(&raw);
1417    let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1418    assert!(null.is_null());
1419
1420    let raw = RawBlock::parse_from_raw_block_v2(
1421        [0, 0, 0xf0, 0x7f].as_slice(),
1422        &[Field::new("a", Ty::Float, 4)],
1423        &[4],
1424        1,
1425        Precision::Millisecond,
1426    );
1427    let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1428    dbg!(raw);
1429    assert!(null.is_null());
1430}
1431#[test]
1432fn test_from_v2() {
1433    let raw = RawBlock::parse_from_raw_block_v2(
1434        [1].as_slice(),
1435        &[Field::new("a", Ty::TinyInt, 1)],
1436        &[1],
1437        1,
1438        Precision::Millisecond,
1439    );
1440    let bytes = raw.as_raw_bytes();
1441    let bytes = Bytes::copy_from_slice(bytes);
1442    let raw2 = RawBlock::parse_from_raw_block(bytes, raw.precision());
1443    dbg!(&raw, raw2);
1444    // dbg!(raw.as_bytes());
1445    // let v = unsafe { raw.get_ref_unchecked(0, 0) };
1446    // dbg!(v);
1447
1448    let raw = RawBlock::parse_from_raw_block_v2(
1449        [1, 0, 0, 0].as_slice(),
1450        &[Field::new("a", Ty::Int, 4)],
1451        &[4],
1452        1,
1453        Precision::Millisecond,
1454    );
1455    dbg!(&raw);
1456    // dbg!(raw.as_bytes());
1457    // let v = unsafe { raw.get_ref_unchecked(0, 0) };
1458    // dbg!(v);
1459
1460    let raw = RawBlock::parse_from_raw_block_v2(
1461        [2, 0, b'a', b'b'].as_slice(),
1462        &[Field::new("b", Ty::VarChar, 2)],
1463        &[4],
1464        1,
1465        Precision::Millisecond,
1466    );
1467    dbg!(&raw);
1468    // dbg!(raw.as_bytes());
1469    // let v = unsafe { raw.get_ref_unchecked(0, 0) };
1470    // dbg!(v);
1471
1472    let raw = RawBlock::parse_from_raw_block_v2(
1473        [2, 0, b'a', b'b'].as_slice(),
1474        &[Field::new("b", Ty::VarChar, 2)],
1475        &[4],
1476        1,
1477        Precision::Millisecond,
1478    );
1479    dbg!(&raw);
1480    let raw = RawBlock::parse_from_raw_block_v2(
1481        &[1, 1, 1][..],
1482        &[
1483            Field::new("a", Ty::TinyInt, 1),
1484            Field::new("b", Ty::SmallInt, 2),
1485        ],
1486        &[1, 2],
1487        1,
1488        Precision::Millisecond,
1489    );
1490    dbg!(&raw);
1491
1492    println!("{}", raw.pretty_format());
1493}