taos_query/common/raw/
data.rs

1use std::{borrow::Cow, ffi::c_void};
2
3use bytes::Bytes;
4
5use crate::util::{Inlinable, InlinableRead};
6
7const RAW_PTR_OFFSET: usize = std::mem::size_of::<u32>() + std::mem::size_of::<u16>();
8
9/// C-struct for raw data, just a data view from native library.
10///
11/// It can be copy/cloned, but should not use it outbound away a offset lifetime.
12#[repr(C)]
13#[derive(Debug, Clone)]
14pub struct raw_data_t {
15    pub raw: *const c_void,
16    pub raw_len: u32,
17    pub raw_type: u16,
18}
19
20unsafe impl Send for raw_data_t {}
21
22impl raw_data_t {
23    pub fn to_bytes(&self) -> Bytes {
24        let cap = // raw data len
25            self.raw_len as usize +
26            // self.raw_len
27            std::mem::size_of::<u32>() +
28            // self.raw_type
29            std::mem::size_of::<u16>();
30        let mut data = Vec::with_capacity(cap);
31
32        // first 4 bytes: raw_len
33        data.extend(self.raw_len.to_le_bytes());
34
35        // next 2 bytes: raw_type
36        data.extend(self.raw_type.to_le_bytes());
37
38        unsafe {
39            let ptr = data.as_mut_ptr().add(RAW_PTR_OFFSET);
40            std::ptr::copy_nonoverlapping(self.raw, ptr as _, self.raw_len as _);
41            data.set_len(cap);
42        }
43        Bytes::from(data)
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct RawData(Bytes);
49
50unsafe impl Send for RawData {}
51unsafe impl Sync for RawData {}
52
53impl From<&raw_data_t> for RawData {
54    fn from(raw: &raw_data_t) -> Self {
55        RawData(raw.to_bytes())
56    }
57}
58
59impl<T: Into<Bytes>> From<T> for RawData {
60    fn from(bytes: T) -> Self {
61        RawData(bytes.into())
62    }
63}
64
65impl RawData {
66    pub fn new(raw: Bytes) -> Self {
67        raw.into()
68    }
69    pub fn raw(&self) -> *const c_void {
70        unsafe { self.0.as_ptr().add(RAW_PTR_OFFSET) as _ }
71    }
72    pub fn raw_len(&self) -> u32 {
73        unsafe { *(self.0.as_ptr() as *const u32) }
74    }
75    pub fn raw_type(&self) -> u16 {
76        unsafe { *(self.0.as_ptr().add(std::mem::size_of::<u32>()) as *const u16) }
77    }
78
79    pub fn as_raw_data_t(&self) -> raw_data_t {
80        raw_data_t {
81            raw: self.raw(),
82            raw_len: self.raw_len(),
83            raw_type: self.raw_type(),
84        }
85    }
86
87    pub fn as_bytes(&self) -> Cow<Bytes> {
88        Cow::Borrowed(&self.0)
89    }
90}
91
92impl Inlinable for RawData {
93    fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
94        let mut data = Vec::new();
95
96        let len = reader.read_u32()?;
97        data.extend(len.to_le_bytes());
98
99        let meta_type = reader.read_u16()?;
100        data.extend(meta_type.to_le_bytes());
101
102        data.resize(data.len() + len as usize, 0);
103
104        let buf = &mut data[RAW_PTR_OFFSET..];
105
106        reader.read_exact(buf)?;
107        Ok(data.into())
108    }
109
110    fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
111        let bytes = self.as_bytes();
112        wtr.write_all(&bytes)?;
113        Ok(bytes.len())
114    }
115}
116
117#[async_trait::async_trait]
118impl crate::util::AsyncInlinable for RawData {
119    async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
120        reader: &mut R,
121    ) -> std::io::Result<Self> {
122        use tokio::io::*;
123        let mut data = Vec::new();
124
125        let len = reader.read_u32_le().await?;
126        data.extend(len.to_le_bytes());
127
128        let meta_type = reader.read_u16_le().await?;
129        data.extend(meta_type.to_le_bytes());
130
131        data.resize(data.len() + len as usize, 0);
132
133        let buf = &mut data[RAW_PTR_OFFSET..];
134
135        reader.read_exact(buf).await?;
136        Ok(data.into())
137    }
138
139    async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
140        &self,
141        wtr: &mut W,
142    ) -> std::io::Result<usize> {
143        use tokio::io::*;
144        let bytes = self.as_bytes();
145        wtr.write_all(&bytes).await?;
146        Ok(bytes.len())
147    }
148}