taos_query/common/raw/views/
timestamp_view.rs

1use std::ffi::c_void;
2
3use crate::common::{BorrowedValue, Precision, Timestamp, Ty};
4
5use super::{IsColumnView, NullBits, NullsIter};
6
7use bytes::Bytes;
8use itertools::Itertools;
9
10type Item = i64;
11type View = TimestampView;
12const ITEM_SIZE: usize = std::mem::size_of::<Item>();
13
14#[derive(Debug, Clone)]
15pub struct TimestampView {
16    pub(crate) nulls: NullBits,
17    pub(crate) data: Bytes,
18    pub(crate) precision: Precision,
19}
20
21impl IsColumnView for View {
22    fn ty(&self) -> Ty {
23        Ty::USmallInt
24    }
25    fn from_borrowed_value_iter<'b>(iter: impl Iterator<Item = BorrowedValue<'b>>) -> Self {
26        Self::from_nullable_timestamp(iter.map(|v| v.to_timestamp()).collect_vec())
27    }
28}
29impl TimestampView {
30    pub fn from_millis(values: Vec<impl Into<Option<i64>>>) -> Self {
31        TimestampMillisecondView::from_iter(values).into_inner()
32    }
33
34    pub fn from_micros(values: Vec<impl Into<Option<i64>>>) -> Self {
35        TimestampMicrosecondView::from_iter(values).into_inner()
36    }
37
38    pub fn from_nanos(values: Vec<impl Into<Option<i64>>>) -> Self {
39        TimestampNanosecondView::from_iter(values).into_inner()
40    }
41
42    pub fn from_timestamp(values: Vec<Timestamp>) -> Self {
43        let precision = values.first().map(|ts| ts.precision()).unwrap_or_default();
44        let values = values.into_iter().map(|ts| ts.as_raw_i64()).collect_vec();
45        match precision {
46            Precision::Millisecond => Self::from_millis(values),
47            Precision::Microsecond => Self::from_micros(values),
48            Precision::Nanosecond => Self::from_nanos(values),
49        }
50    }
51    pub fn from_nullable_timestamp(values: Vec<Option<Timestamp>>) -> Self {
52        let precision = values
53            .iter()
54            .find(|ts| ts.is_some())
55            .map(|v| v.as_ref().unwrap().precision());
56        if let Some(precision) = precision {
57            let values = values
58                .into_iter()
59                .map(|ts| ts.map(|v| v.as_raw_i64()))
60                .collect_vec();
61            match precision {
62                Precision::Millisecond => Self::from_millis(values),
63                Precision::Microsecond => Self::from_micros(values),
64                Precision::Nanosecond => Self::from_nanos(values),
65            }
66        } else {
67            Self::from_millis(vec![None; values.len()])
68        }
69    }
70
71    /// Precision for current view
72    pub fn precision(&self) -> Precision {
73        self.precision
74    }
75
76    /// Rows
77    pub fn len(&self) -> usize {
78        self.data.len() / std::mem::size_of::<Item>()
79    }
80
81    /// Raw slice of target type.
82    pub fn as_raw_slice(&self) -> &[Item] {
83        unsafe { std::slice::from_raw_parts(self.data.as_ptr() as *const Item, self.len()) }
84    }
85
86    /// Raw pointer of the slice.
87    pub fn as_raw_ptr(&self) -> *const Item {
88        self.data.as_ptr() as *const Item
89    }
90
91    /// Build a nulls vector.
92    pub fn to_nulls_vec(&self) -> Vec<bool> {
93        self.is_null_iter().collect()
94    }
95
96    /// A iterator only decide if the value at some row index is NULL or not.
97    pub fn is_null_iter(&self) -> NullsIter {
98        NullsIter {
99            nulls: &self.nulls,
100            row: 0,
101            len: self.len(),
102        }
103    }
104
105    /// Check if the value at `row` index is NULL or not.
106    pub fn is_null(&self, row: usize) -> bool {
107        if row < self.len() {
108            unsafe { self.is_null_unchecked(row) }
109        } else {
110            false
111        }
112    }
113
114    /// Unsafe version for [methods.is_null]
115    pub unsafe fn is_null_unchecked(&self, row: usize) -> bool {
116        self.nulls.is_null_unchecked(row)
117    }
118
119    /// Get nullable value at `row` index.
120    pub fn get(&self, row: usize) -> Option<Option<Timestamp>> {
121        if row < self.len() {
122            Some(unsafe { self.get_unchecked(row) })
123        } else {
124            None
125        }
126    }
127
128    #[inline(always)]
129    unsafe fn get_raw_at(&self, index: usize) -> *const Item {
130        self.data.as_ptr().add(index * ITEM_SIZE) as _
131    }
132
133    /// Get nullable value at `row` index.
134    pub unsafe fn get_unchecked(&self, row: usize) -> Option<Timestamp> {
135        if self.nulls.is_null_unchecked(row) {
136            None
137        } else {
138            Some(Timestamp::new(
139                std::ptr::read_unaligned(
140                    self.data.as_ptr().add(row * std::mem::size_of::<Item>()) as _
141                ),
142                // *self.get_raw_at(row),
143                self.precision,
144            ))
145        }
146    }
147
148    pub unsafe fn get_ref_unchecked(&self, row: usize) -> Option<*const Item> {
149        if self.nulls.is_null_unchecked(row) {
150            None
151        } else {
152            Some(self.get_raw_at(row))
153        }
154    }
155
156    pub unsafe fn get_value_unchecked(&self, row: usize) -> BorrowedValue {
157        self.get_unchecked(row)
158            .map(BorrowedValue::Timestamp)
159            .unwrap_or(BorrowedValue::Null(Ty::Timestamp))
160    }
161
162    pub unsafe fn get_raw_value_unchecked(&self, row: usize) -> (Ty, u32, *const c_void) {
163        if self.nulls.is_null_unchecked(row) {
164            (
165                Ty::Timestamp,
166                std::mem::size_of::<Item>() as _,
167                std::ptr::null(),
168            )
169        } else {
170            (
171                Ty::Timestamp,
172                std::mem::size_of::<Item>() as _,
173                self.get_raw_at(row) as *const Item as _,
174            )
175        }
176    }
177
178    /// Create a slice of view.
179    pub fn slice(&self, mut range: std::ops::Range<usize>) -> Option<Self> {
180        if range.start >= self.len() {
181            return None;
182        }
183        if range.end > self.len() {
184            range.end = self.len();
185        }
186        if range.is_empty() {
187            return None;
188        }
189
190        let nulls = unsafe { self.nulls.slice(range.clone()) };
191        let data = self
192            .data
193            .slice(range.start * ITEM_SIZE..range.end * ITEM_SIZE);
194        Some(Self {
195            nulls,
196            data,
197            precision: self.precision,
198        })
199    }
200
201    /// A iterator to nullable values of current row.
202    pub fn iter(&self) -> TimestampViewIter {
203        TimestampViewIter { view: self, row: 0 }
204    }
205
206    /// Convert data to a vector of all nullable values.
207    pub fn to_vec(&self) -> Vec<Option<Timestamp>> {
208        self.iter().collect()
209    }
210
211    /// Write column data as raw bytes.
212    pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
213        let nulls = self.nulls.0.as_ref();
214        wtr.write_all(nulls)?;
215        wtr.write_all(&self.data)?;
216        Ok(nulls.len() + self.data.len())
217    }
218
219    pub fn concat(&self, rhs: &View) -> View {
220        let nulls = NullBits::from_iter(
221            self.nulls
222                .iter()
223                .take(self.len())
224                .chain(rhs.nulls.iter().take(rhs.len())),
225        );
226        let data: Bytes = self
227            .data
228            .as_ref()
229            .iter()
230            .chain(rhs.data.as_ref().iter())
231            .copied()
232            .collect();
233
234        View {
235            nulls,
236            data,
237            precision: self.precision,
238        }
239    }
240
241    pub fn cast_precision(&self, precision: Precision) -> TimestampView {
242        if self.precision == precision {
243            self.clone()
244        } else {
245            let data = self.iter().map(|v| v.map(|v| v.cast_precision(precision)));
246            Self::from_nullable_timestamp(data.collect())
247        }
248    }
249}
250
251pub struct TimestampViewIter<'a> {
252    view: &'a TimestampView,
253    row: usize,
254}
255
256impl<'a> Iterator for TimestampViewIter<'a> {
257    type Item = Option<Timestamp>;
258
259    fn next(&mut self) -> Option<Self::Item> {
260        if self.row < self.view.len() {
261            let row = self.row;
262            self.row += 1;
263            Some(unsafe { self.view.get_unchecked(row) })
264        } else {
265            None
266        }
267    }
268
269    fn size_hint(&self) -> (usize, Option<usize>) {
270        if self.row < self.view.len() {
271            let len = self.view.len() - self.row;
272            (len, Some(len))
273        } else {
274            (0, Some(0))
275        }
276    }
277
278    fn last(self) -> Option<Self::Item>
279    where
280        Self: Sized,
281    {
282        self.view.get(self.view.len() - 1)
283    }
284}
285
286impl<'a> ExactSizeIterator for TimestampViewIter<'a> {}
287
288pub struct TimestampMillisecondView(View);
289
290impl TimestampMillisecondView {
291    pub fn into_inner(self) -> View {
292        self.0
293    }
294}
295
296impl<A: Into<Option<Item>>> FromIterator<A> for TimestampMillisecondView {
297    fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
298        let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
299            .into_iter()
300            .map(|v| match v.into() {
301                Some(v) => (false, v),
302                None => (true, Item::default()),
303            })
304            .unzip();
305        Self(View {
306            nulls: NullBits::from_iter(nulls),
307            data: Bytes::from({
308                let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
309                std::mem::forget(values);
310                unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
311            }),
312            precision: Precision::Millisecond,
313        })
314    }
315}
316pub struct TimestampMicrosecondView(View);
317impl TimestampMicrosecondView {
318    pub fn into_inner(self) -> View {
319        self.0
320    }
321}
322
323impl<A: Into<Option<Item>>> FromIterator<A> for TimestampMicrosecondView {
324    fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
325        let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
326            .into_iter()
327            .map(|v| match v.into() {
328                Some(v) => (false, v),
329                None => (true, Item::default()),
330            })
331            .unzip();
332        Self(View {
333            nulls: NullBits::from_iter(nulls),
334            data: Bytes::from({
335                let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
336                std::mem::forget(values);
337                unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
338            }),
339            precision: Precision::Microsecond,
340        })
341    }
342}
343pub struct TimestampNanosecondView(View);
344impl TimestampNanosecondView {
345    pub fn into_inner(self) -> View {
346        self.0
347    }
348}
349
350impl<A: Into<Option<Item>>> FromIterator<A> for TimestampNanosecondView {
351    fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
352        let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
353            .into_iter()
354            .map(|v| match v.into() {
355                Some(v) => (false, v),
356                None => (true, Item::default()),
357            })
358            .unzip();
359        Self(View {
360            nulls: NullBits::from_iter(nulls),
361            data: Bytes::from({
362                let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
363                std::mem::forget(values);
364                unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
365            }),
366            precision: Precision::Nanosecond,
367        })
368    }
369}
370
371#[test]
372fn test_slice() {
373    let data = [0, 1, Item::MIN, Item::MAX];
374    let view = TimestampMillisecondView::from_iter(data).into_inner();
375    dbg!(&view);
376    let slice = view.slice(1..3);
377    dbg!(&slice);
378
379    let ts_min = chrono::NaiveDateTime::MIN.and_utc().timestamp_millis();
380    let ts_max = chrono::NaiveDateTime::MAX.and_utc().timestamp_millis();
381    let data = [None, Some(ts_min), Some(ts_max), Some(0)];
382    let view = TimestampMillisecondView::from_iter(data).into_inner();
383    dbg!(&view);
384    let range = 1..4;
385    let slice = view.slice(range.clone()).unwrap();
386    for (v, i) in slice.iter().zip(range) {
387        assert_eq!(v.map(|ts| ts.as_raw_i64()), data[i]);
388        let inner = v.unwrap();
389        dbg!(inner.to_naive_datetime());
390    }
391}