1#![allow(clippy::size_of_in_element_count)]
2#![allow(clippy::type_complexity)]
3
4use crate::common::{BorrowedValue, Field, Precision, Ty, Value};
5
6use bytes::Bytes;
7use itertools::Itertools;
8use rayon::prelude::*;
9
10use serde::Deserialize;
11use taos_error::Error;
12
13use std::{
14 cell::{Cell, RefCell, UnsafeCell},
15 ffi::c_void,
16 fmt::Debug,
17 fmt::Display,
18 ops::Deref,
19 ptr::NonNull,
20 rc::Rc,
21};
22
23pub mod layout;
24pub mod meta;
25
26mod data;
27
28use layout::Layout;
29
30#[allow(clippy::missing_safety_doc)]
31#[allow(clippy::should_implement_trait)]
32pub mod views;
33
34pub use views::ColumnView;
35
36use views::*;
37
38pub use data::*;
39pub use meta::*;
40
41mod de;
42mod rows;
43pub use rows::*;
44
45use derive_builder::Builder;
46
47#[derive(Debug, Clone, Copy)]
48#[repr(C, packed(1))]
49struct Header {
50 version: u32,
51 length: u32,
52 nrows: u32,
53 ncols: u32,
54 flag: u32,
55 group_id: u64,
56}
57
58impl Default for Header {
59 fn default() -> Self {
60 Self {
61 version: 1,
62 length: Default::default(),
63 nrows: Default::default(),
64 ncols: Default::default(),
65 flag: u32::MAX,
66 group_id: Default::default(),
67 }
68 }
69}
70
71impl Header {
72 fn as_bytes(&self) -> &[u8] {
73 unsafe {
74 let ptr = self as *const Self;
75 let len = std::mem::size_of::<Self>();
76 std::slice::from_raw_parts(ptr as *const u8, len)
77 }
78 }
79
80 fn len(&self) -> usize {
81 self.length as _
82 }
83
84 fn nrows(&self) -> usize {
85 self.nrows as _
86 }
87 fn ncols(&self) -> usize {
88 self.ncols as _
89 }
90}
91
92pub struct RawBlock {
105 layout: Rc<RefCell<Layout>>,
107 version: Version,
109 data: Cell<Bytes>,
111 rows: usize,
113 cols: usize,
115 precision: Precision,
117 database: Option<String>,
119 table: Option<String>,
121 fields: Vec<String>,
123 group_id: u64,
125 schemas: Schemas,
127 lengths: Lengths,
129 columns: Vec<ColumnView>,
131}
132
133unsafe impl Send for RawBlock {}
134unsafe impl Sync for RawBlock {}
135
136impl Debug for RawBlock {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("Raw")
140 .field("layout", &self.layout)
141 .field("version", &self.version)
142 .field("data", &"...")
143 .field("rows", &self.rows)
144 .field("cols", &self.cols)
145 .field("precision", &self.precision)
146 .field("table", &self.table)
147 .field("fields", &self.fields)
148 .field("group_id", &self.group_id)
150 .field("schemas", &self.schemas)
151 .field("lengths", &self.lengths)
152 .field("columns", &self.columns)
153 .finish()
154 }
155}
156
157impl RawBlock {
158 pub fn from_views(views: &[ColumnView], precision: Precision) -> Self {
159 Self::parse_from_raw_block(views_to_raw_block(views), precision)
160 }
161
162 pub unsafe fn parse_from_ptr(ptr: *mut c_void, precision: Precision) -> Self {
166 let header = &*(ptr as *const Header);
167 let len = header.length as usize;
168 let bytes = std::slice::from_raw_parts(ptr as *const u8, len);
169 let bytes = Bytes::from(bytes.to_vec());
170 Self::parse_from_raw_block(bytes, precision).with_layout(Layout::default())
171 }
172
173 #[allow(clippy::needless_range_loop)]
177 pub unsafe fn parse_from_ptr_v2(
178 ptr: *const *const c_void,
179 fields: &[Field],
180 lengths: &[u32],
181 rows: usize,
182 precision: Precision,
183 ) -> Self {
184 let mut bytes = Vec::new();
185 for i in 0..fields.len() {
186 let slice = ptr.add(i).read();
187 bytes.extend_from_slice(std::slice::from_raw_parts(
188 slice as *const u8,
189 lengths[i] as usize * rows,
190 ));
191 }
192 Self::parse_from_raw_block_v2(bytes, fields, lengths, rows, precision)
193 }
194
195 pub fn parse_from_raw_block_v2(
196 bytes: impl Into<Bytes>,
197 fields: &[Field],
198 lengths: &[u32],
199 rows: usize,
200 precision: Precision,
201 ) -> Self {
202 use bytes::BufMut;
203 debug_assert_eq!(fields.len(), lengths.len());
204
205 #[inline(always)]
206 fn bool_is_null(v: *const bool) -> bool {
207 unsafe { (v as *const u8).read_unaligned() == 0x02 }
208 }
209 #[inline(always)]
210 fn tiny_int_is_null(v: *const i8) -> bool {
211 unsafe { (v as *const u8).read_unaligned() == 0x80 }
212 }
213 #[inline(always)]
214 fn small_int_is_null(v: *const i16) -> bool {
215 unsafe { (v as *const u16).read_unaligned() == 0x8000 }
216 }
217 #[inline(always)]
218 fn int_is_null(v: *const i32) -> bool {
219 unsafe { (v as *const u32).read_unaligned() == 0x80000000 }
220 }
221 #[inline(always)]
222 fn big_int_is_null(v: *const i64) -> bool {
223 unsafe { (v as *const u64).read_unaligned() == 0x8000000000000000 }
224 }
225 #[inline(always)]
226 fn u_tiny_int_is_null(v: *const u8) -> bool {
227 unsafe { v.read_unaligned() == 0xFF }
228 }
229 #[inline(always)]
230 fn u_small_int_is_null(v: *const u16) -> bool {
231 unsafe { v.read_unaligned() == 0xFFFF }
232 }
233 #[inline(always)]
234 fn u_int_is_null(v: *const u32) -> bool {
235 unsafe { v.read_unaligned() == 0xFFFFFFFF }
236 }
237 #[inline(always)]
238 fn u_big_int_is_null(v: *const u64) -> bool {
239 unsafe { v.read_unaligned() == 0xFFFFFFFFFFFFFFFF }
240 }
241 #[inline(always)]
242 fn float_is_null(v: *const f32) -> bool {
243 unsafe { (v as *const u32).read_unaligned() == 0x7FF00000 }
244 }
245 #[inline(always)]
246 fn double_is_null(v: *const f64) -> bool {
247 unsafe { (v as *const u64).read_unaligned() == 0x7FFFFF0000000000 }
248 }
249
250 let layout = Rc::new(RefCell::new(Layout::INLINE_DEFAULT.with_schema_changed()));
263
264 let bytes = bytes.into();
265 let cols = fields.len();
266
267 let mut schemas_bytes =
268 bytes::BytesMut::with_capacity(rows * std::mem::size_of::<ColSchema>());
269 fields
270 .iter()
271 .for_each(|f| schemas_bytes.put(f.to_column_schema().as_bytes()));
272 let schemas = Schemas::from(schemas_bytes);
273
274 let mut data_lengths = LengthsMut::new(cols);
275
276 let mut columns = Vec::new();
277
278 let mut offset = 0;
279
280 for (i, (field, length)) in fields.iter().zip(lengths).enumerate() {
281 macro_rules! _primitive_view {
282 ($ty:ident, $prim:ty) => {
283 {
284 debug_assert_eq!(field.bytes(), *length);
285 let start = offset;
287 offset += rows * std::mem::size_of::<$prim>() as usize;
289 let data = bytes.slice(start..offset);
291 let nulls = NullBits::from_iter((0..rows).map(|row| unsafe {
292 paste::paste!{ [<$ty:snake _is_null>] (
293 data
294 .as_ptr()
295 .offset(row as isize * std::mem::size_of::<$prim>() as isize)
296 as *const $prim,
297 ) }
298 }));
299 data_lengths[i] = data.len() as u32;
308
309 let column = paste::paste! { ColumnView::$ty([<$ty View>] { nulls, data }) };
319 columns.push(column);
320 }};
321 }
322
323 match field.ty() {
324 Ty::Null => unreachable!(),
325
326 Ty::Bool => _primitive_view!(Bool, bool),
328 Ty::TinyInt => _primitive_view!(TinyInt, i8),
329 Ty::SmallInt => _primitive_view!(SmallInt, i16),
330 Ty::Int => _primitive_view!(Int, i32),
331 Ty::BigInt => _primitive_view!(BigInt, i64),
332 Ty::UTinyInt => _primitive_view!(UTinyInt, u8),
334 Ty::USmallInt => _primitive_view!(USmallInt, u16),
335 Ty::UInt => _primitive_view!(UInt, u32),
336 Ty::UBigInt => _primitive_view!(UBigInt, u64),
337 Ty::Float => _primitive_view!(Float, f32),
339 Ty::Double => _primitive_view!(Double, f64),
340 Ty::VarChar => {
341 let start = offset;
342 offset += *length as usize * rows;
343 let data = bytes.slice(start..offset);
344 let data_ptr = data.as_ptr();
345
346 let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
347 let offset = row as i32 * *length as i32;
348 let ptr = data_ptr.offset(offset as isize);
349 let len = (ptr as *const u16).read_unaligned();
350 if len == 1 && *ptr.offset(2) == 0xFF {
351 -1
352 } else {
353 offset
354 }
355 }));
356
357 columns.push(ColumnView::VarChar(VarCharView { offsets, data }));
358
359 data_lengths[i] = *length * rows as u32;
360 }
361 Ty::Timestamp => {
362 let start = offset;
364 offset += rows * std::mem::size_of::<i64>();
366 let data = bytes.slice(start..offset);
368 let nulls = NullBits::from_iter((0..rows).map(|row| unsafe {
369 big_int_is_null(
370 &(data
371 .as_ptr()
372 .offset(row as isize * std::mem::size_of::<i64>() as isize)
373 as *const i64)
374 .read_unaligned() as _,
375 )
376 }));
377 data_lengths[i] = data.len() as u32;
379
380 let column = ColumnView::Timestamp(TimestampView {
382 nulls,
383 data,
384 precision,
385 });
386 columns.push(column);
387 }
388 Ty::NChar => {
389 let start = offset;
390 offset += *length as usize * rows;
391 let data = bytes.slice(start..offset);
392 let data_ptr = data.as_ptr();
393
394 let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
395 let offset = row as i32 * *length as i32;
396 let ptr = data_ptr.offset(offset as isize);
397 let len = (ptr as *const u16).read_unaligned();
398 if len == 4 && (ptr.offset(2) as *const u32).read_unaligned() == 0xFFFFFFFF
399 {
400 -1
401 } else {
402 offset
403 }
404 }));
405
406 columns.push(ColumnView::NChar(NCharView {
407 offsets,
408 data,
409 is_chars: UnsafeCell::new(false),
410 version: Version::V2,
411 layout: layout.clone(),
412 }));
413
414 data_lengths[i] = *length * rows as u32;
415 }
416 Ty::Json => {
417 let start = offset;
418 offset += *length as usize * rows;
419 let data = bytes.slice(start..offset);
420 let data_ptr = data.as_ptr();
421
422 let offsets = Offsets::from_offsets((0..rows).map(|row| unsafe {
423 let offset = row as i32 * *length as i32;
424 let ptr = data_ptr.offset(offset as isize);
425 let len = (ptr as *const u16).read_unaligned();
426 if len == 4 && (ptr.offset(2) as *const u32).read_unaligned() == 0xFFFFFFFF
427 {
428 -1
429 } else {
430 offset
431 }
432 }));
433
434 columns.push(ColumnView::Json(JsonView { offsets, data }));
435
436 data_lengths[i] = *length * rows as u32;
437 }
438 Ty::VarBinary => todo!(),
439 Ty::Decimal => todo!(),
440 Ty::Blob => todo!(),
441 Ty::MediumBlob => todo!(),
442 Ty::Geometry => todo!(),
443 }
444 }
445
446 Self {
447 layout,
448 version: Version::V2,
449 data: Cell::new(bytes),
450 rows,
451 cols,
452 schemas,
453 lengths: data_lengths.into_lengths(),
454 precision,
455 database: None,
456 table: None,
457 fields: fields.iter().map(|s| s.name().to_string()).collect(),
458 columns,
459 group_id: 0,
460 }
462 }
463
464 pub fn parse_from_raw_block(bytes: impl Into<Bytes>, precision: Precision) -> Self {
465 let schema_start: usize = std::mem::size_of::<Header>();
466
467 let layout = Rc::new(RefCell::new(Layout::INLINE_DEFAULT));
468
469 let bytes = bytes.into();
470 let ptr = bytes.as_ptr();
471
472 let header = unsafe { &*(ptr as *const Header) };
473
474 let rows = header.nrows();
475 let cols = header.ncols();
476 let len = header.len();
477 debug_assert_eq!(bytes.len(), len);
478 let group_id = header.group_id;
479
480 let schema_end = schema_start + cols * std::mem::size_of::<ColSchema>();
481 let schemas = Schemas::from(bytes.slice(schema_start..schema_end));
482 let lengths_end = schema_end + std::mem::size_of::<u32>() * cols;
484 let lengths = Lengths::from(bytes.slice(schema_end..lengths_end));
485 let mut data_offset = lengths_end;
487 let mut columns = Vec::with_capacity(cols);
488 for col in 0..cols {
489 let length = unsafe { lengths.get_unchecked(col) } as usize;
491 let schema = unsafe { schemas.get_unchecked(col) };
492
493 macro_rules! _primitive_value {
494 ($ty:ident, $prim:ty) => {{
495 let o1 = data_offset;
496 let o2 = data_offset + ((rows + 7) >> 3); data_offset = o2 + rows * std::mem::size_of::<$prim>();
498 let nulls = bytes.slice(o1..o2);
499 let data = bytes.slice(o2..data_offset);
500 ColumnView::$ty(paste::paste! {[<$ty View>] {
501 nulls: NullBits(nulls),
502 data,
503 }})
504 }};
505 }
506
507 let column = match schema.ty {
508 Ty::Null => unreachable!("raw block does not contains type NULL"),
509 Ty::Bool => _primitive_value!(Bool, i8),
510 Ty::TinyInt => _primitive_value!(TinyInt, i8),
511 Ty::SmallInt => _primitive_value!(SmallInt, i16),
512 Ty::Int => _primitive_value!(Int, i32),
513 Ty::BigInt => _primitive_value!(BigInt, i64),
514 Ty::Float => _primitive_value!(Float, f32),
515 Ty::Double => _primitive_value!(Double, f64),
516 Ty::VarChar => {
517 let o1 = data_offset;
518 let o2 = data_offset + std::mem::size_of::<i32>() * rows;
519 data_offset = o2 + length;
520
521 let offsets = Offsets::from(bytes.slice(o1..o2));
522 let data = bytes.slice(o2..data_offset);
523
524 ColumnView::VarChar(VarCharView { offsets, data })
525 }
526 Ty::Timestamp => {
527 let o1 = data_offset;
528 let o2 = data_offset + ((rows + 7) >> 3);
529 data_offset = o2 + rows * std::mem::size_of::<i64>();
530 let nulls = bytes.slice(o1..o2);
531 let data = bytes.slice(o2..data_offset);
532 ColumnView::Timestamp(TimestampView {
533 nulls: NullBits(nulls),
534 data,
535 precision,
536 })
537 }
538 Ty::NChar => {
539 let o1 = data_offset;
540 let o2 = data_offset + std::mem::size_of::<i32>() * rows;
541 data_offset = o2 + length;
542
543 let offsets = Offsets::from(bytes.slice(o1..o2));
544 let data = bytes.slice(o2..data_offset);
545
546 ColumnView::NChar(NCharView {
547 offsets,
548 data,
549 is_chars: UnsafeCell::new(true),
550 version: Version::V3,
551 layout: layout.clone(),
552 })
553 }
554 Ty::UTinyInt => _primitive_value!(UTinyInt, u8),
555 Ty::USmallInt => _primitive_value!(USmallInt, u16),
556 Ty::UInt => _primitive_value!(UInt, u32),
557 Ty::UBigInt => _primitive_value!(UBigInt, u64),
558 Ty::Json => {
559 let o1 = data_offset;
560 let o2 = data_offset + std::mem::size_of::<i32>() * rows;
561 data_offset = o2 + length;
562
563 let offsets = Offsets::from(bytes.slice(o1..o2));
564 let data = bytes.slice(o2..data_offset);
565
566 ColumnView::Json(JsonView { offsets, data })
567 }
568 Ty::VarBinary => {
569 let o1 = data_offset;
570 let o2 = data_offset + std::mem::size_of::<i32>() * rows;
571 data_offset = o2 + length;
572
573 let offsets = Offsets::from(bytes.slice(o1..o2));
574 let data: Bytes = bytes.slice(o2..data_offset);
575
576 ColumnView::VarBinary(VarBinaryView { offsets, data })
577 }
578 Ty::Geometry => {
579 let o1 = data_offset;
580 let o2 = data_offset + std::mem::size_of::<i32>() * rows;
581 data_offset = o2 + length;
582
583 let offsets = Offsets::from(bytes.slice(o1..o2));
584 let data = bytes.slice(o2..data_offset);
585
586 ColumnView::Geometry(GeometryView { offsets, data })
587 }
588
589 ty => {
590 unreachable!("unsupported type: {ty}")
591 }
592 };
593 columns.push(column);
594 debug_assert!(data_offset <= len);
595 }
596 RawBlock {
597 layout,
598 version: Version::V3,
599 data: Cell::new(bytes),
600 rows,
601 cols,
602 precision,
603 group_id,
604 schemas,
605 lengths,
606 database: None,
607 table: None,
608 fields: Vec::new(),
609 columns,
610 }
611 }
612
613 pub fn with_database_name(&mut self, name: impl Into<String>) -> &mut Self {
615 self.database = Some(name.into());
616 self
617 }
618 pub fn with_table_name(&mut self, name: impl Into<String>) -> &mut Self {
620 self.table = Some(name.into());
621 self.layout.borrow_mut().with_table_name();
622
623 self
624 }
625
626 pub fn with_field_names<S: Display, I: IntoIterator<Item = S>>(
628 &mut self,
629 names: I,
630 ) -> &mut Self {
631 self.fields = names.into_iter().map(|name| name.to_string()).collect();
632 self.layout.borrow_mut().with_field_names();
633 self
634 }
635
636 fn with_layout(mut self, layout: Layout) -> Self {
637 self.layout = Rc::new(RefCell::new(layout));
638 self
639 }
640
641 #[inline]
643 pub fn ncols(&self) -> usize {
644 self.columns.len()
645 }
646
647 #[inline]
649 pub fn nrows(&self) -> usize {
650 self.rows
651 }
652
653 #[inline]
655 pub const fn precision(&self) -> Precision {
656 self.precision
657 }
658
659 #[inline]
660 pub const fn group_id(&self) -> u64 {
661 self.group_id
662 }
663
664 #[inline]
665 pub fn table_name(&self) -> Option<&str> {
666 self.table.as_deref()
667 }
668
669 #[inline]
671 pub fn tmq_db_name(&self) -> Option<&str> {
672 self.database.as_deref()
673 }
674
675 #[inline]
676 pub fn schemas(&self) -> &[ColSchema] {
677 &self.schemas
678 }
679
680 pub fn field_names(&self) -> &[String] {
682 &self.fields
683 }
684
685 #[inline]
687 pub fn columns(&self) -> std::slice::Iter<ColumnView> {
688 self.columns.iter()
689 }
690
691 pub fn column_views(&self) -> &[ColumnView] {
692 &self.columns
693 }
694
695 #[inline]
697 pub fn rows<'a>(&self) -> RowsIter<'a> {
698 RowsIter {
699 raw: NonNull::new(self as *const Self as *mut Self).unwrap(),
700 row: 0,
701 _marker: std::marker::PhantomData,
702 }
703 }
704
705 #[inline]
706 pub fn into_rows<'a>(self) -> IntoRowsIter<'a>
707 where
708 Self: 'a,
709 {
710 IntoRowsIter {
711 raw: self,
712 row: 0,
713 _marker: std::marker::PhantomData,
714 }
715 }
716
717 #[inline]
718 pub fn deserialize<'de, 'a: 'de, T>(
719 &'a self,
720 ) -> std::iter::Map<rows::RowsIter<'_>, fn(RowView<'a>) -> Result<T, DeError>>
721 where
722 T: Deserialize<'de>,
723 {
724 self.rows().map(|mut row| T::deserialize(&mut row))
725 }
726
727 pub fn as_raw_bytes(&self) -> &[u8] {
728 if self.layout.borrow().schema_changed() {
729 let bytes = views_to_raw_block(&self.columns);
730 let bytes = bytes.into();
731 self.data.replace(bytes);
732 self.layout.borrow_mut().set_schema_changed(false);
733 unsafe { &*self.data.as_ptr() }
734 } else {
735 unsafe { &(*self.data.as_ptr()) }
736 }
737 }
738
739 pub fn is_null(&self, row: usize, col: usize) -> bool {
740 if row >= self.nrows() || col >= self.ncols() {
741 return true;
742 }
743 unsafe { self.columns.get_unchecked(col).is_null_unchecked(row) }
744 }
745
746 #[inline]
747 pub unsafe fn get_raw_value_unchecked(
753 &self,
754 row: usize,
755 col: usize,
756 ) -> (Ty, u32, *const c_void) {
757 let view = self.columns.get_unchecked(col);
758 view.get_raw_value_unchecked(row)
759 }
760
761 pub fn get_ref(&self, row: usize, col: usize) -> Option<BorrowedValue> {
762 if row >= self.nrows() || col >= self.ncols() {
763 return None;
764 }
765 Some(unsafe { self.get_ref_unchecked(row, col) })
766 }
767
768 #[inline]
769 pub unsafe fn get_ref_unchecked(&self, row: usize, col: usize) -> BorrowedValue {
775 self.columns.get_unchecked(col).get_ref_unchecked(row)
776 }
777
778 pub fn to_values(&self) -> Vec<Vec<Value>> {
783 self.rows().map(|row| row.into_values()).collect_vec()
784 }
785
786 pub fn write<W: std::io::Write>(&self, _wtr: W) -> std::io::Result<usize> {
787 todo!()
788 }
789
790 fn layout(&self) -> Layout {
791 Layout::from_bits(self.layout.borrow().as_inner()).unwrap()
792 }
793
794 pub fn fields(&self) -> Vec<Field> {
795 self.schemas()
796 .iter()
797 .zip(self.field_names())
798 .map(|(schema, name)| Field::new(name, schema.ty, schema.len))
799 .collect_vec()
800 }
801
802 pub fn pretty_format(&self) -> PrettyBlock {
803 PrettyBlock::new(self)
804 }
805
806 pub fn to_create(&self) -> Option<MetaCreate> {
814 self.table_name().map(|table_name| MetaCreate::Normal {
815 table_name: table_name.to_string(),
816 columns: self.fields().into_iter().map(Into::into).collect(),
817 })
818 }
819
820 pub fn concat(&self, rhs: &Self) -> Self {
826 debug_assert_eq!(self.ncols(), rhs.ncols());
827 #[cfg(debug_assertions)]
828 {
829 for (l, r) in self.fields().into_iter().zip(rhs.fields()) {
830 debug_assert_eq!(l, r);
831 }
832 }
833 let fields = self.field_names();
834 let views = rhs
835 .column_views()
836 .iter()
837 .zip(self.column_views())
838 .collect_vec()
839 .into_par_iter()
840 .map(|(r, l)| r.concat_strictly(l))
841 .collect::<Vec<_>>();
842 let mut block = Self::from_views(&views, self.precision());
843 block.with_field_names(fields);
844 if let Some(table_name) = self.table_name().or(rhs.table_name()) {
845 block.with_table_name(table_name);
846 }
847 block
848 }
849
850 pub fn cast_precision(&self, precision: Precision) -> Self {
852 let views = self
853 .column_views()
854 .iter()
855 .map(|view| view.cast_precision(precision).to_owned())
856 .collect::<Vec<ColumnView>>();
857 let mut block = Self::from_views(&views, precision);
858 block.with_field_names(self.field_names());
859 if let Some(table_name) = self.table_name() {
860 block.with_table_name(table_name);
861 }
862 block
863 }
864}
865
866impl std::ops::Add for RawBlock {
867 type Output = Self;
868
869 fn add(self, rhs: Self) -> Self::Output {
870 self.concat(&rhs)
871 }
872}
873
874impl std::ops::Add for &RawBlock {
875 type Output = RawBlock;
876
877 fn add(self, rhs: Self) -> Self::Output {
878 self.concat(rhs)
879 }
880}
881
882pub struct PrettyBlock<'a> {
883 raw: &'a RawBlock,
884}
885
886impl<'a> PrettyBlock<'a> {
887 fn new(raw: &'a RawBlock) -> Self {
888 Self { raw }
889 }
890}
891
892impl<'a> Deref for PrettyBlock<'a> {
893 type Target = RawBlock;
894 fn deref(&self) -> &Self::Target {
895 self.raw
896 }
897}
898
899impl<'a> Display for PrettyBlock<'a> {
900 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
901 use prettytable::{Row, Table};
902 let mut table = Table::new();
903 writeln!(
904 f,
905 "Table view with {} rows, {} columns, table name \"{}\"",
906 self.nrows(),
907 self.ncols(),
908 self.table_name().unwrap_or_default(),
909 )?;
910 table.set_titles(Row::from_iter(self.field_names()));
911 let nrows = self.nrows();
912 const MAX_DISPLAY_ROWS: usize = 10;
913 let mut rows_iter = self.raw.rows();
914 if f.alternate() {
915 for row in rows_iter {
916 table.add_row(Row::from_iter(
917 row.map(|s| s.1.to_string().unwrap_or_default()),
918 ));
919 }
920 } else if nrows > 2 * MAX_DISPLAY_ROWS {
921 for row in (&mut rows_iter).take(MAX_DISPLAY_ROWS) {
922 table.add_row(Row::from_iter(
923 row.map(|s| s.1.to_string().unwrap_or_default()),
924 ));
925 }
926 table.add_row(Row::from_iter(std::iter::repeat("...").take(self.ncols())));
927 for row in rows_iter.skip(nrows - 2 * MAX_DISPLAY_ROWS) {
928 table.add_row(Row::from_iter(
929 row.map(|s| s.1.to_string().unwrap_or_default()),
930 ));
931 }
932 } else {
933 for row in rows_iter {
934 table.add_row(Row::from_iter(
935 row.map(|s| s.1.to_string().unwrap_or_default()),
936 ));
937 }
938 }
939
940 f.write_fmt(format_args!("{}", table))?;
941 Ok(())
942 }
943}
944
945struct InlineBlock(Bytes);
946
947impl From<InlineBlock> for Bytes {
948 fn from(raw: InlineBlock) -> Self {
949 raw.0
950 }
951}
952
953impl crate::prelude::sync::Inlinable for InlineBlock {
954 fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self>
955 where
956 Self: Sized,
957 {
958 use crate::prelude::sync::InlinableRead;
959 let version = reader.read_u32()?;
960 let len = reader.read_u32()?;
961 let mut bytes = vec![0; len as usize];
962 unsafe {
963 std::ptr::copy_nonoverlapping(
964 version.to_le_bytes().as_ptr(),
965 bytes.as_mut_ptr(),
966 std::mem::size_of::<u32>(),
967 );
968 std::ptr::copy_nonoverlapping(
969 len.to_le_bytes().as_ptr(),
970 bytes.as_mut_ptr().offset(4),
971 std::mem::size_of::<u32>(),
972 );
973 }
974 let buf = &mut bytes[8..];
975 reader.read_exact(buf)?;
976 Ok(Self(bytes.into()))
977 }
978
979 fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
980 wtr.write_all(self.0.as_ref())?;
981 Ok(self.0.len())
982 }
983}
984#[async_trait::async_trait]
985impl crate::prelude::AsyncInlinable for InlineBlock {
986 async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
987 reader: &mut R,
988 ) -> std::io::Result<Self>
989 where
990 Self: Sized,
991 {
992 use tokio::io::*;
993
994 let version = reader.read_u32_le().await?;
995 let len = reader.read_u32_le().await?;
996 let mut bytes = vec![0; len as usize];
997 unsafe {
998 std::ptr::copy_nonoverlapping(
999 version.to_le_bytes().as_ptr(),
1000 bytes.as_mut_ptr(),
1001 std::mem::size_of::<u32>(),
1002 );
1003 std::ptr::copy_nonoverlapping(
1004 len.to_le_bytes().as_ptr(),
1005 bytes.as_mut_ptr().offset(4),
1006 std::mem::size_of::<u32>(),
1007 );
1008 }
1009 let buf = &mut bytes[8..];
1010 reader.read_exact(buf).await?;
1011 Ok(Self(bytes.into()))
1012 }
1013
1014 async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
1015 &self,
1016 wtr: &mut W,
1017 ) -> std::io::Result<usize> {
1018 use tokio::io::*;
1019 wtr.write_all(self.0.as_ref()).await?;
1020 Ok(self.0.len())
1021 }
1022}
1023
1024impl crate::prelude::sync::Inlinable for RawBlock {
1025 fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
1026 use crate::prelude::sync::InlinableRead;
1027 let layout = reader.read_u32()?;
1028 let layout = Layout::from_bits(layout).expect("should be layout");
1029
1030 let precision = layout.precision();
1031 let raw: InlineBlock = reader.read_inlinable()?;
1032 let mut raw = Self::parse_from_raw_block(raw.0, precision);
1033 let cols = raw.ncols();
1034
1035 if layout.expect_table_name() {
1036 let name = reader.read_inlined_str::<2>()?;
1037 raw.with_table_name(name);
1038 }
1039 if layout.expect_field_names() {
1040 let names: Vec<_> = (0..cols)
1041 .map(|_| reader.read_inlined_str::<1>())
1042 .try_collect()?;
1043 raw.with_field_names(names);
1044 }
1045
1046 Ok(raw)
1047 }
1048
1049 fn read_optional_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Option<Self>> {
1050 use crate::prelude::sync::InlinableRead;
1051 let layout = reader.read_u32()?;
1052 if layout == 0xFFFFFFFF {
1053 return Ok(None);
1054 }
1055 let layout = Layout::from_bits(layout).expect("should be layout");
1056
1057 let precision = layout.precision();
1058 let raw: InlineBlock = reader.read_inlinable()?;
1059 let mut raw = Self::parse_from_raw_block(raw.0, precision);
1060
1061 if layout.expect_table_name() {
1062 let name = reader.read_inlined_str::<2>()?;
1063 raw.with_table_name(name);
1064 }
1065 if layout.expect_field_names() {
1066 let names: Vec<_> = (0..raw.ncols())
1067 .map(|_| reader.read_inlined_str::<1>())
1068 .try_collect()?;
1069 raw.with_field_names(names);
1070 }
1071 log::trace!(
1072 "table name: {}, cols: {}, rows: {}",
1073 &raw.table_name().unwrap_or("(?)"),
1074 raw.ncols(),
1075 raw.nrows()
1076 );
1077
1078 Ok(Some(raw))
1079 }
1080
1081 fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
1082 use crate::prelude::sync::InlinableWrite;
1083 let layout = self.layout();
1084 let mut l = wtr.write_u32_le(layout.as_inner())?;
1085
1086 let raw = self.as_raw_bytes();
1087 wtr.write_all(raw)?;
1088 l += raw.len();
1089
1090 if layout.expect_table_name() {
1091 let name = self.table_name().expect("table name should be known");
1092 l += wtr.write_inlined_bytes::<2>(name.as_bytes())?;
1093 }
1094 if layout.expect_field_names() {
1095 debug_assert_eq!(self.field_names().len(), self.ncols());
1096 for field in self.field_names() {
1097 l += wtr.write_inlined_str::<1>(field)?;
1098 }
1099 }
1100 Ok(l)
1101 }
1102}
1103
1104#[repr(C)]
1105#[derive(Default, Debug, Copy, Clone)]
1106pub enum SchemalessProtocol {
1107 Unknown = 0,
1108 #[default]
1109 Line,
1110 Telnet,
1111 Json,
1112}
1113
1114#[repr(C)]
1115#[derive(Default, Debug, Copy, Clone)]
1116pub enum SchemalessPrecision {
1117 NonConfigured = 0,
1118 Hours,
1119 Minutes,
1120 Seconds,
1121 #[default]
1122 Millisecond,
1123 Microsecond,
1124 Nanosecond,
1125}
1126
1127impl From<SchemalessPrecision> for String {
1128 fn from(precision: SchemalessPrecision) -> Self {
1129 match precision {
1130 SchemalessPrecision::Millisecond => "ms".to_string(),
1131 SchemalessPrecision::Microsecond => "us".to_string(),
1132 SchemalessPrecision::Nanosecond => "ns".to_string(),
1133 _ => todo!(),
1134 }
1135 }
1136}
1137
1138#[derive(Default, Builder, Debug)]
1139#[builder(setter(into))]
1140pub struct SmlData {
1141 protocol: SchemalessProtocol,
1142 #[builder(setter(into, strip_option), default)]
1143 precision: SchemalessPrecision,
1144 data: Vec<String>,
1145 #[builder(setter(into, strip_option), default)]
1146 ttl: Option<i32>,
1147 #[builder(setter(into, strip_option), default)]
1148 req_id: Option<u64>,
1149}
1150
1151impl SmlData {
1152 #[inline]
1153 pub fn protocol(&self) -> SchemalessProtocol {
1154 self.protocol
1155 }
1156
1157 #[inline]
1158 pub fn precision(&self) -> SchemalessPrecision {
1159 self.precision
1160 }
1161
1162 #[inline]
1163 pub fn data(&self) -> &Vec<String> {
1164 &self.data
1165 }
1166
1167 #[inline]
1168 pub fn ttl(&self) -> Option<i32> {
1169 self.ttl
1170 }
1171
1172 #[inline]
1173 pub fn req_id(&self) -> Option<u64> {
1174 self.req_id
1175 }
1176}
1177
1178impl From<SmlDataBuilderError> for Error {
1179 fn from(value: SmlDataBuilderError) -> Self {
1180 Error::from_any(value)
1181 }
1182}
1183#[async_trait::async_trait]
1184impl crate::prelude::AsyncInlinable for RawBlock {
1185 async fn read_optional_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
1186 reader: &mut R,
1187 ) -> std::io::Result<Option<Self>> {
1188 use crate::util::AsyncInlinableRead;
1189 use tokio::io::*;
1190 let layout = reader.read_u32_le().await?;
1191 if layout == 0xFFFFFFFF {
1192 return Ok(None);
1193 }
1194 let layout = Layout::from_bits(layout).expect("invalid layout");
1195
1196 let precision = layout.precision();
1197
1198 let raw: InlineBlock =
1199 <InlineBlock as crate::prelude::AsyncInlinable>::read_inlined(reader).await?;
1200 let mut raw = Self::parse_from_raw_block(raw.0, precision);
1201
1202 if layout.expect_table_name() {
1203 let name = reader.read_inlined_str::<2>().await?;
1204 raw.with_table_name(name);
1205 }
1206 if layout.expect_field_names() {
1207 let mut names = Vec::with_capacity(raw.ncols());
1208 for _ in 0..raw.ncols() {
1209 names.push(reader.read_inlined_str::<1>().await?);
1210 }
1211 raw.with_field_names(names);
1212 }
1213
1214 Ok(Some(raw))
1215 }
1216
1217 async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
1218 reader: &mut R,
1219 ) -> std::io::Result<Self> {
1220 <Self as crate::prelude::AsyncInlinable>::read_optional_inlined(reader)
1221 .await?
1222 .ok_or(std::io::Error::new(
1223 std::io::ErrorKind::InvalidData,
1224 "invalid raw data format",
1225 ))
1226 }
1227
1228 async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
1229 &self,
1230 wtr: &mut W,
1231 ) -> std::io::Result<usize> {
1232 use crate::util::AsyncInlinableWrite;
1233 use tokio::io::*;
1234
1235 let layout = self.layout();
1236 wtr.write_u32_le(layout.as_inner()).await?;
1237
1238 let raw = self.as_raw_bytes();
1239 wtr.write_all(raw).await?;
1240
1241 let mut l = std::mem::size_of::<u32>() + raw.len();
1242
1243 if layout.expect_table_name() {
1244 let name = self.table_name().expect("table name should be known");
1245 l += wtr.write_inlined_bytes::<2>(name.as_bytes()).await?;
1246 }
1247 if layout.expect_field_names() {
1248 debug_assert_eq!(self.field_names().len(), self.ncols());
1249 for field in self.field_names() {
1250 l += wtr.write_inlined_str::<1>(field).await?;
1251 }
1252 }
1253
1254 Ok(l)
1255 }
1256}
1257
1258#[tokio::test]
1259async fn test_raw_from_v2() {
1260 use crate::prelude::AsyncInlinable;
1261 use std::ops::Deref;
1262 let bytes = b"\x10\x86\x1aA \xcc)AB\xc2\x14AZ],A\xa2\x8d$A\x87\xb9%A\xf5~\x0fA\x96\xf7,AY\xee\x17A1|\x15As\x00\x00\x00q\x00\x00\x00s\x00\x00\x00t\x00\x00\x00u\x00\x00\x00t\x00\x00\x00n\x00\x00\x00n\x00\x00\x00n\x00\x00\x00r\x00\x00\x00";
1266
1267 let block = RawBlock::parse_from_raw_block_v2(
1268 bytes.as_slice(),
1269 &[Field::new("a", Ty::Float, 4), Field::new("b", Ty::Int, 4)],
1270 &[4, 4],
1271 10,
1272 Precision::Millisecond,
1273 );
1274 assert!(block.lengths.deref() == &[40, 40]);
1275
1276 let bytes = include_bytes!("../../../tests/test.txt");
1277
1278 let block = RawBlock::parse_from_raw_block_v2(
1279 bytes.as_slice(),
1280 &[
1281 Field::new("ts", Ty::Timestamp, 8),
1282 Field::new("current", Ty::Float, 4),
1283 Field::new("voltage", Ty::Int, 4),
1284 Field::new("phase", Ty::Float, 4),
1285 Field::new("group_id", Ty::Int, 4),
1286 Field::new("location", Ty::VarChar, 16),
1287 ],
1288 &[8, 4, 4, 4, 4, 18],
1289 10,
1290 Precision::Millisecond,
1291 );
1292
1293 #[derive(Debug, serde::Deserialize)]
1294 #[allow(dead_code)]
1295 struct Record {
1296 ts: String,
1297 current: f32,
1298 voltage: i32,
1299 phase: f32,
1300 group_id: i32,
1301 location: String,
1302 }
1303 let rows: Vec<Record> = block.deserialize().try_collect().unwrap();
1304 dbg!(rows);
1305 let bytes = views_to_raw_block(&block.columns);
1307 let raw2 = RawBlock::parse_from_raw_block(bytes, block.precision);
1308 dbg!(&raw2);
1309 let inlined = raw2.inlined().await;
1310 dbg!(&inlined);
1311 let mut raw3 = RawBlock::read_inlined(&mut inlined.as_slice())
1312 .await
1313 .unwrap();
1314 raw3.with_field_names(["ts", "current", "voltage", "phase", "group_id", "location"])
1315 .with_table_name("meters");
1316 dbg!(&raw3);
1317
1318 let raw4 = RawBlock::read_inlined(&mut raw3.inlined().await.as_slice())
1319 .await
1320 .unwrap();
1321 dbg!(&raw4);
1322 assert_eq!(raw3.table_name(), raw4.table_name());
1323
1324 let raw5 = RawBlock::read_optional_inlined(&mut raw3.inlined().await.as_slice())
1325 .await
1326 .unwrap();
1327 dbg!(raw5);
1328}
1329
1330#[test]
1331fn test_v2_full() {
1332 let bytes = include_bytes!("../../../tests/v2.block.gz");
1333
1334 use flate2::read::GzDecoder;
1335 use std::io::prelude::*;
1336 let mut buf = Vec::new();
1337 let len = GzDecoder::new(&bytes[..]).read_to_end(&mut buf).unwrap();
1338 assert_eq!(len, 66716);
1339 let block = RawBlock::parse_from_raw_block_v2(
1340 buf,
1341 &[
1342 Field::new("ts", Ty::Timestamp, 8),
1343 Field::new("b1", Ty::Bool, 1),
1344 Field::new("c8i1", Ty::TinyInt, 1),
1345 Field::new("c16i1", Ty::SmallInt, 2),
1346 Field::new("c32i1", Ty::Int, 4),
1347 Field::new("c64i1", Ty::BigInt, 8),
1348 Field::new("c8u1", Ty::UTinyInt, 1),
1349 Field::new("c16u1", Ty::USmallInt, 2),
1350 Field::new("c32u1", Ty::UInt, 4),
1351 Field::new("c64u1", Ty::UBigInt, 8),
1352 Field::new("cb1", Ty::VarChar, 100),
1353 Field::new("cn1", Ty::NChar, 10),
1354 Field::new("b2", Ty::Bool, 1),
1355 Field::new("c8i2", Ty::TinyInt, 1),
1356 Field::new("c16i2", Ty::SmallInt, 2),
1357 Field::new("c32i2", Ty::Int, 4),
1358 Field::new("c64i2", Ty::BigInt, 8),
1359 Field::new("c8u2", Ty::UTinyInt, 1),
1360 Field::new("c16u2", Ty::USmallInt, 2),
1361 Field::new("c32u2", Ty::UInt, 4),
1362 Field::new("c64u2", Ty::UBigInt, 8),
1363 Field::new("cb2", Ty::VarChar, 100),
1364 Field::new("cn2", Ty::NChar, 10),
1365 Field::new("jt", Ty::Json, 4096),
1366 ],
1367 &[
1368 8, 1, 1, 2, 4, 8, 1, 2, 4, 8, 102, 42, 1, 1, 2, 4, 8, 1, 2, 4, 8, 12, 66, 16387,
1369 ],
1370 4,
1371 Precision::Millisecond,
1372 );
1373 let bytes = views_to_raw_block(&block.columns);
1374 let raw2 = RawBlock::parse_from_raw_block(bytes, block.precision);
1375
1376 let added = &raw2 + &raw2;
1377 dbg!(raw2, added);
1378}
1379
1380#[test]
1381fn test_v2_null() {
1382 let raw = RawBlock::parse_from_raw_block_v2(
1383 [0x2, 0].as_slice(),
1384 &[Field::new("b", Ty::Bool, 1)],
1385 &[1],
1386 2,
1387 Precision::Millisecond,
1388 );
1389 dbg!(&raw);
1390 let bytes = views_to_raw_block(&raw.columns);
1391 let raw2 = RawBlock::parse_from_raw_block(bytes, raw.precision);
1392 dbg!(raw2);
1393 let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1394 assert!(null.is_null());
1395 let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(1, 0) };
1396 assert!(!null.is_null());
1397
1398 let raw = RawBlock::parse_from_raw_block_v2(
1399 [0x80].as_slice(),
1400 &[Field::new("b", Ty::TinyInt, 1)],
1401 &[1],
1402 1,
1403 Precision::Millisecond,
1404 );
1405 dbg!(&raw);
1406 let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1407 assert!(null.is_null());
1408
1409 let raw = RawBlock::parse_from_raw_block_v2(
1410 [0, 0, 0, 0x80].as_slice(),
1411 &[Field::new("a", Ty::Int, 4)],
1412 &[4],
1413 1,
1414 Precision::Millisecond,
1415 );
1416 dbg!(&raw);
1417 let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1418 assert!(null.is_null());
1419
1420 let raw = RawBlock::parse_from_raw_block_v2(
1421 [0, 0, 0xf0, 0x7f].as_slice(),
1422 &[Field::new("a", Ty::Float, 4)],
1423 &[4],
1424 1,
1425 Precision::Millisecond,
1426 );
1427 let (_ty, _len, null) = unsafe { raw.get_raw_value_unchecked(0, 0) };
1428 dbg!(raw);
1429 assert!(null.is_null());
1430}
1431#[test]
1432fn test_from_v2() {
1433 let raw = RawBlock::parse_from_raw_block_v2(
1434 [1].as_slice(),
1435 &[Field::new("a", Ty::TinyInt, 1)],
1436 &[1],
1437 1,
1438 Precision::Millisecond,
1439 );
1440 let bytes = raw.as_raw_bytes();
1441 let bytes = Bytes::copy_from_slice(bytes);
1442 let raw2 = RawBlock::parse_from_raw_block(bytes, raw.precision());
1443 dbg!(&raw, raw2);
1444 let raw = RawBlock::parse_from_raw_block_v2(
1449 [1, 0, 0, 0].as_slice(),
1450 &[Field::new("a", Ty::Int, 4)],
1451 &[4],
1452 1,
1453 Precision::Millisecond,
1454 );
1455 dbg!(&raw);
1456 let raw = RawBlock::parse_from_raw_block_v2(
1461 [2, 0, b'a', b'b'].as_slice(),
1462 &[Field::new("b", Ty::VarChar, 2)],
1463 &[4],
1464 1,
1465 Precision::Millisecond,
1466 );
1467 dbg!(&raw);
1468 let raw = RawBlock::parse_from_raw_block_v2(
1473 [2, 0, b'a', b'b'].as_slice(),
1474 &[Field::new("b", Ty::VarChar, 2)],
1475 &[4],
1476 1,
1477 Precision::Millisecond,
1478 );
1479 dbg!(&raw);
1480 let raw = RawBlock::parse_from_raw_block_v2(
1481 &[1, 1, 1][..],
1482 &[
1483 Field::new("a", Ty::TinyInt, 1),
1484 Field::new("b", Ty::SmallInt, 2),
1485 ],
1486 &[1, 2],
1487 1,
1488 Precision::Millisecond,
1489 );
1490 dbg!(&raw);
1491
1492 println!("{}", raw.pretty_format());
1493}