Skip to main content

fluss/row/binary/
binary_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::Error::IllegalArgument;
19use crate::error::Result;
20use crate::metadata::DataType;
21use crate::row::Datum;
22use crate::row::binary::BinaryRowFormat;
23
24/// Writer to write a composite data format, like row, array,
25#[allow(dead_code)]
26pub trait BinaryWriter {
27    /// Reset writer to prepare next write
28    fn reset(&mut self);
29
30    /// Set null to this field
31    fn set_null_at(&mut self, pos: usize);
32
33    fn write_boolean(&mut self, value: bool);
34
35    fn write_byte(&mut self, value: u8);
36
37    fn write_bytes(&mut self, value: &[u8]);
38
39    fn write_char(&mut self, value: &str, length: usize);
40
41    fn write_string(&mut self, value: &str);
42
43    fn write_short(&mut self, value: i16);
44
45    fn write_int(&mut self, value: i32);
46
47    fn write_long(&mut self, value: i64);
48
49    fn write_float(&mut self, value: f32);
50
51    fn write_double(&mut self, value: f64);
52
53    fn write_binary(&mut self, bytes: &[u8], length: usize);
54
55    fn write_decimal(&mut self, value: &crate::row::Decimal, precision: u32);
56
57    /// Writes a TIME value.
58    ///
59    /// Note: TIME is physically stored as an i32 (milliseconds since midnight).
60    /// This method exists for type safety and semantic clarity, even though it's
61    /// currently equivalent to `write_int()`. The precision parameter is accepted
62    /// for API consistency with TIMESTAMP types, though TIME encoding doesn't
63    /// currently vary by precision.
64    fn write_time(&mut self, value: i32, precision: u32);
65
66    fn write_timestamp_ntz(&mut self, value: &crate::row::datum::TimestampNtz, precision: u32);
67
68    fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, precision: u32);
69
70    // TODO InternalArray, ArraySerializer
71    // fn write_array(&mut self, pos: i32, value: i64);
72
73    // TODO Row serializer
74    // fn write_row(&mut self, pos: i32, value: &InternalRow);
75
76    /// Finally, complete write to set real size to binary.
77    fn complete(&mut self);
78}
79
80pub enum ValueWriter {
81    Nullable(InnerValueWriter),
82    NonNullable(InnerValueWriter),
83}
84
85impl ValueWriter {
86    pub fn create_value_writer(
87        element_type: &DataType,
88        binary_row_format: Option<&BinaryRowFormat>,
89    ) -> Result<ValueWriter> {
90        let value_writer =
91            InnerValueWriter::create_inner_value_writer(element_type, binary_row_format)?;
92        if element_type.is_nullable() {
93            Ok(Self::Nullable(value_writer))
94        } else {
95            Ok(Self::NonNullable(value_writer))
96        }
97    }
98
99    pub fn write_value<W: BinaryWriter>(
100        &self,
101        writer: &mut W,
102        pos: usize,
103        value: &Datum,
104    ) -> Result<()> {
105        match self {
106            Self::Nullable(inner_value_writer) => {
107                if let Datum::Null = value {
108                    writer.set_null_at(pos);
109                    Ok(())
110                } else {
111                    inner_value_writer.write_value(writer, pos, value)
112                }
113            }
114            Self::NonNullable(inner_value_writer) => {
115                inner_value_writer.write_value(writer, pos, value)
116            }
117        }
118    }
119}
120
121#[derive(Debug)]
122pub enum InnerValueWriter {
123    Char,
124    String,
125    Boolean,
126    Binary,
127    Bytes,
128    TinyInt,
129    SmallInt,
130    Int,
131    BigInt,
132    Float,
133    Double,
134    Decimal(u32, u32), // precision, scale
135    Date,
136    Time(u32),         // precision (not used in wire format, but kept for consistency)
137    TimestampNtz(u32), // precision
138    TimestampLtz(u32), // precision
139                       // TODO Array, Row
140}
141
142/// Accessor for writing the fields/elements of a binary writer during runtime, the
143/// fields/elements must be written in the order.
144impl InnerValueWriter {
145    pub fn create_inner_value_writer(
146        data_type: &DataType,
147        _: Option<&BinaryRowFormat>,
148    ) -> Result<InnerValueWriter> {
149        match data_type {
150            DataType::Char(_) => Ok(InnerValueWriter::Char),
151            DataType::String(_) => Ok(InnerValueWriter::String),
152            DataType::Boolean(_) => Ok(InnerValueWriter::Boolean),
153            DataType::Binary(_) => Ok(InnerValueWriter::Binary),
154            DataType::Bytes(_) => Ok(InnerValueWriter::Bytes),
155            DataType::TinyInt(_) => Ok(InnerValueWriter::TinyInt),
156            DataType::SmallInt(_) => Ok(InnerValueWriter::SmallInt),
157            DataType::Int(_) => Ok(InnerValueWriter::Int),
158            DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
159            DataType::Float(_) => Ok(InnerValueWriter::Float),
160            DataType::Double(_) => Ok(InnerValueWriter::Double),
161            DataType::Decimal(d) => {
162                // Validation is done at DecimalType construction time
163                Ok(InnerValueWriter::Decimal(d.precision(), d.scale()))
164            }
165            DataType::Date(_) => Ok(InnerValueWriter::Date),
166            DataType::Time(t) => {
167                // Validation is done at TimeType construction time
168                Ok(InnerValueWriter::Time(t.precision()))
169            }
170            DataType::Timestamp(t) => {
171                // Validation is done at TimestampType construction time
172                Ok(InnerValueWriter::TimestampNtz(t.precision()))
173            }
174            DataType::TimestampLTz(t) => {
175                // Validation is done at TimestampLTzType construction time
176                Ok(InnerValueWriter::TimestampLtz(t.precision()))
177            }
178            _ => unimplemented!(
179                "ValueWriter for DataType {:?} is currently not implemented",
180                data_type
181            ),
182        }
183    }
184    pub fn write_value<W: BinaryWriter>(
185        &self,
186        writer: &mut W,
187        _pos: usize,
188        value: &Datum,
189    ) -> Result<()> {
190        match (self, value) {
191            (InnerValueWriter::Char, Datum::String(v)) => {
192                writer.write_char(v, v.len());
193            }
194            (InnerValueWriter::String, Datum::String(v)) => {
195                writer.write_string(v);
196            }
197            (InnerValueWriter::Boolean, Datum::Bool(v)) => {
198                writer.write_boolean(*v);
199            }
200            (InnerValueWriter::Binary, Datum::Blob(v)) => {
201                let b = v.as_ref();
202                writer.write_binary(b, b.len());
203            }
204            (InnerValueWriter::Bytes, Datum::Blob(v)) => {
205                writer.write_bytes(v.as_ref());
206            }
207            (InnerValueWriter::TinyInt, Datum::Int8(v)) => {
208                writer.write_byte(*v as u8);
209            }
210            (InnerValueWriter::SmallInt, Datum::Int16(v)) => {
211                writer.write_short(*v);
212            }
213            (InnerValueWriter::Int, Datum::Int32(v)) => {
214                writer.write_int(*v);
215            }
216            (InnerValueWriter::BigInt, Datum::Int64(v)) => {
217                writer.write_long(*v);
218            }
219            (InnerValueWriter::Float, Datum::Float32(v)) => {
220                writer.write_float(v.into_inner());
221            }
222            (InnerValueWriter::Double, Datum::Float64(v)) => {
223                writer.write_double(v.into_inner());
224            }
225            (InnerValueWriter::Decimal(p, _s), Datum::Decimal(v)) => {
226                writer.write_decimal(v, *p);
227            }
228            (InnerValueWriter::Date, Datum::Date(d)) => {
229                writer.write_int(d.get_inner());
230            }
231            (InnerValueWriter::Time(p), Datum::Time(t)) => {
232                writer.write_time(t.get_inner(), *p);
233            }
234            (InnerValueWriter::TimestampNtz(p), Datum::TimestampNtz(ts)) => {
235                writer.write_timestamp_ntz(ts, *p);
236            }
237            (InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
238                writer.write_timestamp_ltz(ts, *p);
239            }
240            _ => {
241                return Err(IllegalArgument {
242                    message: format!("{self:?} used to write value {value:?}"),
243                });
244            }
245        }
246        Ok(())
247    }
248}