1use super::column::as_bytes_bufer;
2use super::column::AsOutColumn;
3use super::encoder::Encoder;
4use crate::errors::{ConversionError, Result};
5#[cfg(feature = "int128")]
6use crate::types::Decimal128;
7use crate::types::{Decimal, Decimal32, Decimal64, DecimalBits, Field, FieldMeta, SqlType, SCALE};
8use byteorder::{LittleEndian, WriteBytesExt};
9use chrono::{Date, DateTime, NaiveDate, NaiveDateTime, Utc};
10use std::io;
11use std::io::Write;
12use std::net::{Ipv4Addr, Ipv6Addr};
13use uuid::Uuid;
14
15pub trait IntoColumn<'b>: Sized {
16 fn to_column(this: Vec<Self>) -> Box<dyn AsOutColumn + 'b>;
17}
18
19lazy_static! {
20 static ref EPOCH: chrono::NaiveDate = {
22 chrono::NaiveDate::from_ymd(1970,1,1)
23 };
24}
25
26pub(crate) trait WriteColumn {
27 fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()>;
28}
29
30struct SimpleOutputColumn<T, F: Fn(&Field) -> bool> {
31 data: Vec<T>,
32 f: F,
33}
34
35impl<T, F> AsOutColumn for SimpleOutputColumn<T, F>
36where
37 T: WriteColumn,
38 F: Fn(&Field) -> bool,
39{
40 fn len(&self) -> usize {
41 self.data.len()
42 }
43
44 fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
45 for item in self.data.iter() {
46 <T as WriteColumn>::write_column(item, field, writer)?;
47 }
48 Ok(())
49 }
50
51 fn is_compatible(&self, field: &Field) -> bool {
52 (&self.f)(field)
53 }
54}
55pub trait NullValue {
57 fn null() -> Self;
58}
59
60macro_rules! impl_null {
61 ($t: ty, $v: expr) => {
62 impl NullValue for $t {
63 #[inline]
64 fn null() -> Self {
65 $v
66 }
67 }
68 };
69}
70
71impl_null!(u8, 0u8);
73impl_null!(i8, 0i8);
74impl_null!(u16, 0u16);
75impl_null!(i16, 0i16);
76impl_null!(u32, 0u32);
77impl_null!(i32, 0i32);
78impl_null!(u64, 0u64);
79impl_null!(i64, 0i64);
80impl_null!(f32, 0f32);
81impl_null!(f64, 0f64);
82
83impl_null!(Ipv4Addr, Ipv4Addr::UNSPECIFIED);
84impl_null!(Ipv6Addr, Ipv6Addr::UNSPECIFIED);
85impl_null!(Uuid, Default::default());
86impl_null!(Decimal32, Default::default());
87impl_null!(Decimal64, Default::default());
88#[cfg(feature = "int128")]
89impl_null!(Decimal128, Default::default());
90impl_null!(Date<Utc>, chrono::MIN_DATE);
91impl_null!(DateTime<Utc>, chrono::MIN_DATE.and_hms(0, 0, 0));
92impl_null!(&str, Default::default());
93impl_null!(String, Default::default());
94
95impl<T, F> AsOutColumn for SimpleOutputColumn<Option<T>, F>
96where
97 T: WriteColumn + NullValue,
98 F: Fn(&Field) -> bool,
99{
100 fn len(&self) -> usize {
101 self.data.len()
102 }
103 fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
106 for item in self
110 .data
111 .iter()
112 .map(|item| if item.is_some() { 0u8 } else { 1u8 })
113 {
114 writer.write_u8(item)?;
115 }
116 let def: T = NullValue::null();
117 for item in self.data.iter() {
118 <T as WriteColumn>::write_column(item.as_ref().unwrap_or(&def), field, writer)?;
119 }
120 Ok(())
121 }
122
123 fn is_compatible(&self, field: &Field) -> bool {
124 (&self.f)(field)
125 }
126}
127
128impl WriteColumn for &str {
131 #[inline]
132 fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
133 let slice = std::slice::from_ref(self);
134 match field.sql_type {
135 SqlType::String => encode_string(slice, writer),
136 SqlType::FixedString(val) => encode_fixedstring(slice, val, writer),
137 _ => unreachable!(),
138 }
139 }
140}
141impl WriteColumn for String {
142 #[inline]
143 fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
144 let slice = std::slice::from_ref(self);
145 match field.sql_type {
146 SqlType::String => encode_string(slice, writer),
147 SqlType::FixedString(val) => encode_fixedstring(slice, val, writer),
148 _ => unreachable!(),
149 }
150 }
151}
152
153fn encode_string<T: AsRef<[u8]>>(data: &[T], writer: &mut dyn Write) -> Result<()> {
156 for s in data {
157 let s = s.as_ref();
158 (s.len() as u64).encode(writer)?;
159 s.encode(writer)?;
160 }
161 Ok(())
162}
163fn encode_fixedstring<T: AsRef<[u8]>>(data: &[T], size: u32, writer: &mut dyn Write) -> Result<()> {
166 for s in data {
167 let s = s.as_ref();
168 if s.is_empty() {
170 for _ in 0..size {
171 writer.write_u8(0)?;
172 }
173 } else if s.len() != size as usize {
174 return Err(ConversionError::FixedStringLengthNotMatch(size).into());
175 } else {
176 writer.write_all(s)?;
177 }
178 }
179
180 Ok(())
181}
182
183fn encode_enum8<T: AsRef<[u8]>>(
184 data: &[T],
185 meta: &FieldMeta,
186 writer: &mut dyn Write,
187) -> Result<()> {
188 for s in data {
189 let val = meta.str2val(s.as_ref())?;
190 writer.write_i8(val as i8)?;
191 }
192 Ok(())
193}
194
195fn encode_enum16<T: AsRef<[u8]>>(
196 data: &[T],
197 meta: &FieldMeta,
198 writer: &mut dyn Write,
199) -> Result<()> {
200 for s in data {
201 let val: i16 = meta.str2val(s.as_ref())?;
202 writer.write_i16::<LittleEndian>(val)?;
203 }
204 Ok(())
205}
206struct StringOutputColumn<T> {
208 data: Vec<T>,
209}
210
211impl<'a, T> AsOutColumn for StringOutputColumn<T>
212where
213 T: AsRef<[u8]>,
214{
215 fn len(&self) -> usize {
216 self.data.len()
217 }
218
219 fn encode(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
220 match field.sql_type {
221 SqlType::String => encode_string(self.data.as_ref(), writer),
222 SqlType::FixedString(v) => encode_fixedstring(self.data.as_ref(), v, writer),
223 SqlType::Enum8 => encode_enum8(
224 self.data.as_ref(),
225 field.get_meta().expect("enum index corrupted"),
226 writer,
227 ),
228 SqlType::Enum16 => encode_enum16(
229 self.data.as_ref(),
230 field.get_meta().expect("enum index corrupted"),
231 writer,
232 ),
233 _ => unreachable!(),
234 }
235 }
236
237 fn is_compatible(&self, field: &Field) -> bool {
238 matches!(field.sql_type, SqlType::String | SqlType::FixedString(_) | SqlType::Enum8 | SqlType::Enum16)
239 }
240}
241impl WriteColumn for Ipv4Addr {
243 fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
244 let mut b = self.octets();
245 b.reverse();
246
247 writer.write_all(&b[..]).map_err(Into::into)
248 }
249}
250impl WriteColumn for Ipv6Addr {
252 fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
253 let b = self.octets();
254 writer.write_all(&b[..]).map_err(Into::into)
255 }
256}
257impl WriteColumn for Uuid {
259 fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
260 let i = self.as_u128();
261 writer.write_u64::<LittleEndian>((i >> 64) as u64)?;
262 writer
263 .write_u64::<LittleEndian>(i as u64)
264 .map_err(Into::into)
265 }
266}
267impl WriteColumn for Date<Utc> {
269 fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
270 let days = (self.naive_utc() - *EPOCH).num_days();
271
272 if days < 0 || days > u16::MAX as i64 {
273 return Err(ConversionError::UnsupportedConversion.into());
274 }
275
276 let days = (days as u16).to_le_bytes();
277 writer.write_all(&days[..]).map_err(Into::into)
278 }
279}
280impl WriteColumn for DateTime<Utc> {
282 fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
283 let mut timestamp = self.timestamp();
284
285 match field.sql_type {
286 SqlType::DateTime => {
287 if timestamp < 0 || timestamp > u32::MAX as i64 {
288 return Err(ConversionError::UnsupportedConversion.into());
289 }
290 let timestamp = (timestamp as u32).to_le_bytes();
291 writer.write_all(×tamp[..]).map_err(Into::into)
292 }
293 SqlType::DateTime64(p, _) => {
294 debug_assert!(p < 9);
295 timestamp *= SCALE[p as usize];
297 writer
298 .write_i64::<LittleEndian>(timestamp)
299 .map_err(Into::into)
300 }
301 _ => unreachable!(),
302 }
303 }
304}
305
306macro_rules! to_column_numeric {
307 ($t:ty, $f: ident, $endian: ty) => {
308 impl WriteColumn for $t {
309 #[inline]
310 fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
311 writer.$f::<$endian>(*self).map_err(Into::into)
312 }
313 }
314 };
315 ($t:ty, $f: ident) => {
316 impl WriteColumn for $t {
317 #[inline]
318 fn write_column(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
319 writer.$f(*self).map_err(Into::into)
320 }
321 }
322 };
323}
324
325to_column_numeric!(i8, write_i8);
326to_column_numeric!(u8, write_u8);
327to_column_numeric!(i16, write_i16, LittleEndian);
328to_column_numeric!(u16, write_u16, LittleEndian);
329to_column_numeric!(i32, write_i32, LittleEndian);
330to_column_numeric!(u32, write_u32, LittleEndian);
331to_column_numeric!(i64, write_i64, LittleEndian);
332to_column_numeric!(u64, write_u64, LittleEndian);
333
334#[cfg(feature = "int128")]
335to_column_numeric!(i128, write_i128, LittleEndian);
336#[cfg(feature = "int128")]
337to_column_numeric!(u128, write_u128, LittleEndian);
338
339to_column_numeric!(f32, write_f32, LittleEndian);
340to_column_numeric!(f64, write_f64, LittleEndian);
341
342impl<T: WriteColumn + DecimalBits> WriteColumn for Decimal<T> {
344 fn write_column(&self, field: &Field, writer: &mut dyn Write) -> Result<()> {
345 if let SqlType::Decimal(p, s) = field.sql_type {
346 debug_assert!(T::fit(p));
347 if s != self.scale {
348 return Err(ConversionError::UnsupportedConversion.into());
349 }
350 } else {
351 unreachable!()
352 }
353 self.underlying.write_column(field, writer)
354 }
355}
356
357struct BinaryCompatibleOutColumn<T: Sized> {
361 sql_type: SqlType,
362 data: Vec<T>,
363}
364
365fn encode_data_bc(data: &[u8], writer: &mut dyn Write) -> io::Result<()> {
366 writer.write_all(data)
367}
368
369impl<'a, T: Sized + Send + Sync> AsOutColumn for BinaryCompatibleOutColumn<T> {
370 fn len(&self) -> usize {
371 self.data.len()
372 }
373 fn encode(&self, _field: &Field, writer: &mut dyn Write) -> Result<()> {
374 encode_data_bc(unsafe { as_bytes_bufer(self.data.as_ref()) }, writer).map_err(Into::into)
375 }
376 fn is_compatible(&self, field: &Field) -> bool {
377 self.sql_type == field.sql_type
378 }
379}
380
381macro_rules! impl_intocolumn_bc {
382 ($fs: ty, $sql: path) => {
383 impl<'b> IntoColumn<'b> for $fs {
384 fn to_column(this: Vec<$fs>) -> Box<dyn AsOutColumn + 'b> {
385 Box::new(BinaryCompatibleOutColumn {
386 data: this,
387 sql_type: $sql,
388 })
389 }
390 }
391 };
392}
393
394macro_rules! impl_intocolumn_simple {
395 ($fs: ty, $sql: expr) => {
396 impl<'b> IntoColumn<'b> for $fs
397 where
398 $fs: 'b,
399 {
400 fn to_column(this: Vec<$fs>) -> Box<dyn AsOutColumn + 'b> {
401 Box::new(SimpleOutputColumn {
402 data: this,
403 f: $sql,
404 })
405 }
406 }
407 };
408}
409
410macro_rules! impl_intocolumn_string {
411 ($fs: ty) => {
412 impl<'b> IntoColumn<'b> for $fs
413 where
414 $fs: 'b,
415 {
416 fn to_column(this: Vec<$fs>) -> Box<dyn AsOutColumn + 'b> {
417 Box::new(StringOutputColumn { data: this })
418 }
419 }
420 };
421}
422
423impl_intocolumn_bc!(u8, SqlType::UInt8);
424impl_intocolumn_bc!(i8, SqlType::Int8);
425impl_intocolumn_bc!(u16, SqlType::UInt16);
426impl_intocolumn_bc!(i16, SqlType::Int16);
427impl_intocolumn_bc!(u32, SqlType::UInt32);
428impl_intocolumn_bc!(i32, SqlType::Int32);
429impl_intocolumn_bc!(u64, SqlType::UInt64);
430impl_intocolumn_bc!(i64, SqlType::Int64);
431impl_intocolumn_bc!(f64, SqlType::Float64);
432impl_intocolumn_bc!(f32, SqlType::Float32);
433impl_intocolumn_bc!(ValueDate, SqlType::Date);
434impl_intocolumn_bc!(ValueDateTime, SqlType::DateTime);
435
436impl_intocolumn_simple!(Ipv4Addr, |f| f.sql_type == SqlType::Ipv4);
437impl_intocolumn_simple!(Ipv6Addr, |f| f.sql_type == SqlType::Ipv6);
438impl_intocolumn_simple!(Date<Utc>, |f| f.sql_type == SqlType::Date);
439
440impl_intocolumn_simple!(Decimal32, |f| {
441 match f.sql_type {
442 SqlType::Decimal(p, _) => i32::fit(p),
443 _ => false,
444 }
445});
446
447impl_intocolumn_simple!(Decimal64, |f| {
448 match f.sql_type {
449 SqlType::Decimal(p, _) => i64::fit(p),
450 _ => false,
451 }
452});
453
454#[cfg(feature = "int128")]
455impl_intocolumn_simple!(Decimal128, |f| {
456 match f.sql_type {
457 SqlType::Decimal(p, _) => i128::fit(p),
458 _ => false,
459 }
460});
461
462impl_intocolumn_simple!(DateTime<Utc>, |f| matches!(
463 f.sql_type,
464 SqlType::DateTime | SqlType::DateTime64(..)
465));
466
467impl_intocolumn_simple!(Uuid, |f| f.sql_type == SqlType::Uuid);
468impl_intocolumn_string!(&'b str);
469impl_intocolumn_string!(String);
470
471impl_intocolumn_simple!(Option<u8>, |f| f.sql_type == SqlType::UInt8);
473impl_intocolumn_simple!(Option<i8>, |f| f.sql_type == SqlType::Int8);
474impl_intocolumn_simple!(Option<u16>, |f| f.sql_type == SqlType::UInt16);
475impl_intocolumn_simple!(Option<i16>, |f| f.sql_type == SqlType::Int16);
476impl_intocolumn_simple!(Option<u32>, |f| f.sql_type == SqlType::UInt32);
477impl_intocolumn_simple!(Option<i32>, |f| f.sql_type == SqlType::Int32);
478impl_intocolumn_simple!(Option<u64>, |f| f.sql_type == SqlType::UInt64);
479impl_intocolumn_simple!(Option<i64>, |f| f.sql_type == SqlType::Int64);
480impl_intocolumn_simple!(Option<f32>, |f| f.sql_type == SqlType::Float32);
481impl_intocolumn_simple!(Option<f64>, |f| f.sql_type == SqlType::Float64);
482impl_intocolumn_simple!(Option<Uuid>, |f| f.sql_type == SqlType::Uuid);
483impl_intocolumn_simple!(Option<Ipv4Addr>, |f| f.sql_type == SqlType::Ipv4);
484impl_intocolumn_simple!(Option<Ipv6Addr>, |f| f.sql_type == SqlType::Ipv6);
485impl_intocolumn_simple!(Option<Date<Utc>>, |f| f.sql_type == SqlType::Date);
486impl_intocolumn_simple!(Option<DateTime<Utc>>, |f| f.sql_type == SqlType::DateTime);
487
488impl_intocolumn_simple!(Option<&'b str>, |f| f.sql_type == SqlType::String);
489impl_intocolumn_simple!(Option<String>, |f| f.sql_type == SqlType::String);
490
491impl_intocolumn_simple!(Option<Decimal32>, |f| {
492 match f.sql_type {
493 SqlType::Decimal(p, _) => i32::fit(p),
494 _ => false,
495 }
496});
497
498impl_intocolumn_simple!(Option<Decimal64>, |f| {
499 match f.sql_type {
500 SqlType::Decimal(p, _) => i64::fit(p),
501 _ => false,
502 }
503});
504
505#[cfg(feature = "int128")]
506impl_intocolumn_simple!(Option<Decimal128>, |f| {
507 match f.sql_type {
508 SqlType::Decimal(p, _) => i128::fit(p),
509 _ => false,
510 }
511});
512
513#[derive(Copy, Clone, Debug)]
514pub struct ValueIp4([u8; 4]);
515
516impl Into<Ipv4Addr> for ValueIp4 {
517 fn into(mut self) -> Ipv4Addr {
518 self.0.reverse();
519 self.0.into()
520 }
521}
522
523#[derive(Copy, Clone, Debug)]
524pub struct ValueIp6([u8; 16]);
525
526impl Into<Ipv6Addr> for ValueIp6 {
527 fn into(mut self) -> Ipv6Addr {
528 self.0.reverse();
529 self.0.into()
530 }
531}
532
533#[derive(Copy, Clone, Debug)]
534pub struct ValueUuid([u8; 16]);
535
536impl Into<Uuid> for ValueUuid {
537 fn into(mut self) -> Uuid {
538 self.0[0..8].reverse();
539 self.0[8..16].reverse();
540 Uuid::from_bytes(self.0)
541 }
542}
543
544#[derive(Copy, Clone, Debug)]
545pub struct ValueDate(pub [u8; 2]);
546
547#[derive(Copy, Clone, Debug)]
548pub struct ValueDateTime(pub [u8; 4]);
549
550#[derive(Copy, Clone, Debug)]
551pub struct ValueDateTime64(pub i64);
552
553#[derive(Copy, Clone, Debug)]
554pub struct ValueDecimal32(pub i32);
555
556#[derive(Copy, Clone, Debug)]
557pub struct ValueDecimal64(pub i64);
558
559#[cfg(feature = "int128")]
560#[derive(Copy, Clone, Debug)]
561pub struct ValueDecimal128(pub i128);
562
563impl ValueDate {
564 pub(super) fn to_date(&self) -> chrono::Date<chrono::offset::Utc> {
565 ValueDate::date_inner(i16::from_le_bytes(self.0))
566 }
567
568 pub(super) fn date_inner(dates: i16) -> chrono::Date<chrono::offset::Utc> {
569 let ce: i32 = 719163_i32 + dates as i32;
570 let nt = NaiveDate::from_num_days_from_ce(ce);
571 chrono::Date::from_utc(nt, Utc)
572 }
573}
574
575impl ValueDateTime {
576 pub(super) fn to_datetime(&self) -> DateTime<chrono::offset::Utc> {
577 ValueDateTime::datetime_inner(i32::from_le_bytes(self.0))
578 }
579
580 pub(super) fn datetime_inner(sec: i32) -> DateTime<chrono::offset::Utc> {
581 let nt = NaiveDateTime::from_timestamp(sec as i64, 0);
582 DateTime::from_utc(nt, Utc)
583 }
584}
585
586impl ValueDateTime64 {
587 pub(super) fn to_datetime(self, precision: u8) -> DateTime<chrono::offset::Utc> {
588 let magnitude = SCALE[precision as usize];
589 let sec = self.0.wrapping_div(magnitude);
590 let nsec = self.0.wrapping_rem(magnitude) * SCALE[(9 - precision) as usize];
592
593 let nt = NaiveDateTime::from_timestamp(sec, nsec as u32);
594 DateTime::from_utc(nt, Utc)
595 }
596
597 pub fn from_raw(base: i64) -> ValueDateTime64 {
598 ValueDateTime64(base)
599 }
600}