taos_query/common/raw/views/
timestamp_view.rs1use std::ffi::c_void;
2
3use crate::common::{BorrowedValue, Precision, Timestamp, Ty};
4
5use super::{IsColumnView, NullBits, NullsIter};
6
7use bytes::Bytes;
8use itertools::Itertools;
9
10type Item = i64;
11type View = TimestampView;
12const ITEM_SIZE: usize = std::mem::size_of::<Item>();
13
14#[derive(Debug, Clone)]
15pub struct TimestampView {
16 pub(crate) nulls: NullBits,
17 pub(crate) data: Bytes,
18 pub(crate) precision: Precision,
19}
20
21impl IsColumnView for View {
22 fn ty(&self) -> Ty {
23 Ty::USmallInt
24 }
25 fn from_borrowed_value_iter<'b>(iter: impl Iterator<Item = BorrowedValue<'b>>) -> Self {
26 Self::from_nullable_timestamp(iter.map(|v| v.to_timestamp()).collect_vec())
27 }
28}
29impl TimestampView {
30 pub fn from_millis(values: Vec<impl Into<Option<i64>>>) -> Self {
31 TimestampMillisecondView::from_iter(values).into_inner()
32 }
33
34 pub fn from_micros(values: Vec<impl Into<Option<i64>>>) -> Self {
35 TimestampMicrosecondView::from_iter(values).into_inner()
36 }
37
38 pub fn from_nanos(values: Vec<impl Into<Option<i64>>>) -> Self {
39 TimestampNanosecondView::from_iter(values).into_inner()
40 }
41
42 pub fn from_timestamp(values: Vec<Timestamp>) -> Self {
43 let precision = values.first().map(|ts| ts.precision()).unwrap_or_default();
44 let values = values.into_iter().map(|ts| ts.as_raw_i64()).collect_vec();
45 match precision {
46 Precision::Millisecond => Self::from_millis(values),
47 Precision::Microsecond => Self::from_micros(values),
48 Precision::Nanosecond => Self::from_nanos(values),
49 }
50 }
51 pub fn from_nullable_timestamp(values: Vec<Option<Timestamp>>) -> Self {
52 let precision = values
53 .iter()
54 .find(|ts| ts.is_some())
55 .map(|v| v.as_ref().unwrap().precision());
56 if let Some(precision) = precision {
57 let values = values
58 .into_iter()
59 .map(|ts| ts.map(|v| v.as_raw_i64()))
60 .collect_vec();
61 match precision {
62 Precision::Millisecond => Self::from_millis(values),
63 Precision::Microsecond => Self::from_micros(values),
64 Precision::Nanosecond => Self::from_nanos(values),
65 }
66 } else {
67 Self::from_millis(vec![None; values.len()])
68 }
69 }
70
71 pub fn precision(&self) -> Precision {
73 self.precision
74 }
75
76 pub fn len(&self) -> usize {
78 self.data.len() / std::mem::size_of::<Item>()
79 }
80
81 pub fn as_raw_slice(&self) -> &[Item] {
83 unsafe { std::slice::from_raw_parts(self.data.as_ptr() as *const Item, self.len()) }
84 }
85
86 pub fn as_raw_ptr(&self) -> *const Item {
88 self.data.as_ptr() as *const Item
89 }
90
91 pub fn to_nulls_vec(&self) -> Vec<bool> {
93 self.is_null_iter().collect()
94 }
95
96 pub fn is_null_iter(&self) -> NullsIter {
98 NullsIter {
99 nulls: &self.nulls,
100 row: 0,
101 len: self.len(),
102 }
103 }
104
105 pub fn is_null(&self, row: usize) -> bool {
107 if row < self.len() {
108 unsafe { self.is_null_unchecked(row) }
109 } else {
110 false
111 }
112 }
113
114 pub unsafe fn is_null_unchecked(&self, row: usize) -> bool {
116 self.nulls.is_null_unchecked(row)
117 }
118
119 pub fn get(&self, row: usize) -> Option<Option<Timestamp>> {
121 if row < self.len() {
122 Some(unsafe { self.get_unchecked(row) })
123 } else {
124 None
125 }
126 }
127
128 #[inline(always)]
129 unsafe fn get_raw_at(&self, index: usize) -> *const Item {
130 self.data.as_ptr().add(index * ITEM_SIZE) as _
131 }
132
133 pub unsafe fn get_unchecked(&self, row: usize) -> Option<Timestamp> {
135 if self.nulls.is_null_unchecked(row) {
136 None
137 } else {
138 Some(Timestamp::new(
139 std::ptr::read_unaligned(
140 self.data.as_ptr().add(row * std::mem::size_of::<Item>()) as _
141 ),
142 self.precision,
144 ))
145 }
146 }
147
148 pub unsafe fn get_ref_unchecked(&self, row: usize) -> Option<*const Item> {
149 if self.nulls.is_null_unchecked(row) {
150 None
151 } else {
152 Some(self.get_raw_at(row))
153 }
154 }
155
156 pub unsafe fn get_value_unchecked(&self, row: usize) -> BorrowedValue {
157 self.get_unchecked(row)
158 .map(BorrowedValue::Timestamp)
159 .unwrap_or(BorrowedValue::Null(Ty::Timestamp))
160 }
161
162 pub unsafe fn get_raw_value_unchecked(&self, row: usize) -> (Ty, u32, *const c_void) {
163 if self.nulls.is_null_unchecked(row) {
164 (
165 Ty::Timestamp,
166 std::mem::size_of::<Item>() as _,
167 std::ptr::null(),
168 )
169 } else {
170 (
171 Ty::Timestamp,
172 std::mem::size_of::<Item>() as _,
173 self.get_raw_at(row) as *const Item as _,
174 )
175 }
176 }
177
178 pub fn slice(&self, mut range: std::ops::Range<usize>) -> Option<Self> {
180 if range.start >= self.len() {
181 return None;
182 }
183 if range.end > self.len() {
184 range.end = self.len();
185 }
186 if range.is_empty() {
187 return None;
188 }
189
190 let nulls = unsafe { self.nulls.slice(range.clone()) };
191 let data = self
192 .data
193 .slice(range.start * ITEM_SIZE..range.end * ITEM_SIZE);
194 Some(Self {
195 nulls,
196 data,
197 precision: self.precision,
198 })
199 }
200
201 pub fn iter(&self) -> TimestampViewIter {
203 TimestampViewIter { view: self, row: 0 }
204 }
205
206 pub fn to_vec(&self) -> Vec<Option<Timestamp>> {
208 self.iter().collect()
209 }
210
211 pub(crate) fn write_raw_into<W: std::io::Write>(&self, mut wtr: W) -> std::io::Result<usize> {
213 let nulls = self.nulls.0.as_ref();
214 wtr.write_all(nulls)?;
215 wtr.write_all(&self.data)?;
216 Ok(nulls.len() + self.data.len())
217 }
218
219 pub fn concat(&self, rhs: &View) -> View {
220 let nulls = NullBits::from_iter(
221 self.nulls
222 .iter()
223 .take(self.len())
224 .chain(rhs.nulls.iter().take(rhs.len())),
225 );
226 let data: Bytes = self
227 .data
228 .as_ref()
229 .iter()
230 .chain(rhs.data.as_ref().iter())
231 .copied()
232 .collect();
233
234 View {
235 nulls,
236 data,
237 precision: self.precision,
238 }
239 }
240
241 pub fn cast_precision(&self, precision: Precision) -> TimestampView {
242 if self.precision == precision {
243 self.clone()
244 } else {
245 let data = self.iter().map(|v| v.map(|v| v.cast_precision(precision)));
246 Self::from_nullable_timestamp(data.collect())
247 }
248 }
249}
250
251pub struct TimestampViewIter<'a> {
252 view: &'a TimestampView,
253 row: usize,
254}
255
256impl<'a> Iterator for TimestampViewIter<'a> {
257 type Item = Option<Timestamp>;
258
259 fn next(&mut self) -> Option<Self::Item> {
260 if self.row < self.view.len() {
261 let row = self.row;
262 self.row += 1;
263 Some(unsafe { self.view.get_unchecked(row) })
264 } else {
265 None
266 }
267 }
268
269 fn size_hint(&self) -> (usize, Option<usize>) {
270 if self.row < self.view.len() {
271 let len = self.view.len() - self.row;
272 (len, Some(len))
273 } else {
274 (0, Some(0))
275 }
276 }
277
278 fn last(self) -> Option<Self::Item>
279 where
280 Self: Sized,
281 {
282 self.view.get(self.view.len() - 1)
283 }
284}
285
286impl<'a> ExactSizeIterator for TimestampViewIter<'a> {}
287
288pub struct TimestampMillisecondView(View);
289
290impl TimestampMillisecondView {
291 pub fn into_inner(self) -> View {
292 self.0
293 }
294}
295
296impl<A: Into<Option<Item>>> FromIterator<A> for TimestampMillisecondView {
297 fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
298 let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
299 .into_iter()
300 .map(|v| match v.into() {
301 Some(v) => (false, v),
302 None => (true, Item::default()),
303 })
304 .unzip();
305 Self(View {
306 nulls: NullBits::from_iter(nulls),
307 data: Bytes::from({
308 let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
309 std::mem::forget(values);
310 unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
311 }),
312 precision: Precision::Millisecond,
313 })
314 }
315}
316pub struct TimestampMicrosecondView(View);
317impl TimestampMicrosecondView {
318 pub fn into_inner(self) -> View {
319 self.0
320 }
321}
322
323impl<A: Into<Option<Item>>> FromIterator<A> for TimestampMicrosecondView {
324 fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
325 let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
326 .into_iter()
327 .map(|v| match v.into() {
328 Some(v) => (false, v),
329 None => (true, Item::default()),
330 })
331 .unzip();
332 Self(View {
333 nulls: NullBits::from_iter(nulls),
334 data: Bytes::from({
335 let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
336 std::mem::forget(values);
337 unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
338 }),
339 precision: Precision::Microsecond,
340 })
341 }
342}
343pub struct TimestampNanosecondView(View);
344impl TimestampNanosecondView {
345 pub fn into_inner(self) -> View {
346 self.0
347 }
348}
349
350impl<A: Into<Option<Item>>> FromIterator<A> for TimestampNanosecondView {
351 fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
352 let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
353 .into_iter()
354 .map(|v| match v.into() {
355 Some(v) => (false, v),
356 None => (true, Item::default()),
357 })
358 .unzip();
359 Self(View {
360 nulls: NullBits::from_iter(nulls),
361 data: Bytes::from({
362 let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
363 std::mem::forget(values);
364 unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
365 }),
366 precision: Precision::Nanosecond,
367 })
368 }
369}
370
371#[test]
372fn test_slice() {
373 let data = [0, 1, Item::MIN, Item::MAX];
374 let view = TimestampMillisecondView::from_iter(data).into_inner();
375 dbg!(&view);
376 let slice = view.slice(1..3);
377 dbg!(&slice);
378
379 let ts_min = chrono::NaiveDateTime::MIN.and_utc().timestamp_millis();
380 let ts_max = chrono::NaiveDateTime::MAX.and_utc().timestamp_millis();
381 let data = [None, Some(ts_min), Some(ts_max), Some(0)];
382 let view = TimestampMillisecondView::from_iter(data).into_inner();
383 dbg!(&view);
384 let range = 1..4;
385 let slice = view.slice(range.clone()).unwrap();
386 for (v, i) in slice.iter().zip(range) {
387 assert_eq!(v.map(|ts| ts.as_raw_i64()), data[i]);
388 let inner = v.unwrap();
389 dbg!(inner.to_naive_datetime());
390 }
391}