Skip to main content

fluss/row/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod column;
19
20pub(crate) mod datum;
21mod decimal;
22
23pub mod binary;
24pub(crate) mod column_writer;
25pub mod compacted;
26pub mod encode;
27pub mod field_getter;
28mod row_decoder;
29
30use crate::client::WriteFormat;
31use bytes::Bytes;
32pub use column::*;
33pub use compacted::CompactedRow;
34pub use datum::*;
35pub use decimal::{Decimal, MAX_COMPACT_PRECISION};
36pub use encode::KeyEncoder;
37pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory};
38
39pub struct BinaryRow<'a> {
40    data: BinaryDataWrapper<'a>,
41}
42
43pub enum BinaryDataWrapper<'a> {
44    Bytes(Bytes),
45    Ref(&'a [u8]),
46}
47
48impl<'a> BinaryRow<'a> {
49    /// Returns the binary representation of this row as a byte slice.
50    pub fn as_bytes(&'a self) -> &'a [u8] {
51        match &self.data {
52            BinaryDataWrapper::Bytes(bytes) => bytes.as_ref(),
53            BinaryDataWrapper::Ref(r) => r,
54        }
55    }
56}
57
58use crate::error::Error::IllegalArgument;
59use crate::error::Result;
60
61pub trait InternalRow: Send + Sync {
62    /// Returns the number of fields in this row
63    fn get_field_count(&self) -> usize;
64
65    /// Returns true if the element is null at the given position
66    fn is_null_at(&self, pos: usize) -> Result<bool>;
67
68    /// Returns the boolean value at the given position
69    fn get_boolean(&self, pos: usize) -> Result<bool>;
70
71    /// Returns the byte value at the given position
72    fn get_byte(&self, pos: usize) -> Result<i8>;
73
74    /// Returns the short value at the given position
75    fn get_short(&self, pos: usize) -> Result<i16>;
76
77    /// Returns the integer value at the given position
78    fn get_int(&self, pos: usize) -> Result<i32>;
79
80    /// Returns the long value at the given position
81    fn get_long(&self, pos: usize) -> Result<i64>;
82
83    /// Returns the float value at the given position
84    fn get_float(&self, pos: usize) -> Result<f32>;
85
86    /// Returns the double value at the given position
87    fn get_double(&self, pos: usize) -> Result<f64>;
88
89    /// Returns the string value at the given position with fixed length
90    fn get_char(&self, pos: usize, length: usize) -> Result<&str>;
91
92    /// Returns the string value at the given position
93    fn get_string(&self, pos: usize) -> Result<&str>;
94
95    /// Returns the decimal value at the given position
96    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Result<Decimal>;
97
98    /// Returns the date value at the given position (date as days since epoch)
99    fn get_date(&self, pos: usize) -> Result<Date>;
100
101    /// Returns the time value at the given position (time as milliseconds since midnight)
102    fn get_time(&self, pos: usize) -> Result<Time>;
103
104    /// Returns the timestamp value at the given position (timestamp without timezone)
105    ///
106    /// The precision is required to determine whether the timestamp value was stored
107    /// in a compact representation (precision <= 3) or with nanosecond precision.
108    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> Result<TimestampNtz>;
109
110    /// Returns the timestamp value at the given position (timestamp with local timezone)
111    ///
112    /// The precision is required to determine whether the timestamp value was stored
113    /// in a compact representation (precision <= 3) or with nanosecond precision.
114    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> Result<TimestampLtz>;
115
116    /// Returns the binary value at the given position with fixed length
117    fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]>;
118
119    /// Returns the binary value at the given position
120    fn get_bytes(&self, pos: usize) -> Result<&[u8]>;
121
122    /// Returns encoded bytes if already encoded
123    fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> {
124        None
125    }
126}
127
128#[derive(Debug)]
129pub struct GenericRow<'a> {
130    pub values: Vec<Datum<'a>>,
131}
132
133impl<'a> GenericRow<'a> {
134    fn get_value(&self, pos: usize) -> Result<&Datum<'a>> {
135        self.values.get(pos).ok_or_else(|| IllegalArgument {
136            message: format!(
137                "position {pos} out of bounds (row has {} fields)",
138                self.values.len()
139            ),
140        })
141    }
142
143    fn try_convert<T: TryFrom<&'a Datum<'a>>>(
144        &'a self,
145        pos: usize,
146        expected_type: &str,
147    ) -> Result<T> {
148        let datum = self.get_value(pos)?;
149        T::try_from(datum).map_err(|_| IllegalArgument {
150            message: format!(
151                "type mismatch at position {pos}: expected {expected_type}, got {datum:?}"
152            ),
153        })
154    }
155}
156
157impl<'a> InternalRow for GenericRow<'a> {
158    fn get_field_count(&self) -> usize {
159        self.values.len()
160    }
161
162    fn is_null_at(&self, pos: usize) -> Result<bool> {
163        Ok(self.get_value(pos)?.is_null())
164    }
165
166    fn get_boolean(&self, pos: usize) -> Result<bool> {
167        self.try_convert(pos, "Boolean")
168    }
169
170    fn get_byte(&self, pos: usize) -> Result<i8> {
171        self.try_convert(pos, "TinyInt")
172    }
173
174    fn get_short(&self, pos: usize) -> Result<i16> {
175        self.try_convert(pos, "SmallInt")
176    }
177
178    fn get_int(&self, pos: usize) -> Result<i32> {
179        self.try_convert(pos, "Int")
180    }
181
182    fn get_long(&self, pos: usize) -> Result<i64> {
183        self.try_convert(pos, "BigInt")
184    }
185
186    fn get_float(&self, pos: usize) -> Result<f32> {
187        self.try_convert(pos, "Float")
188    }
189
190    fn get_double(&self, pos: usize) -> Result<f64> {
191        self.try_convert(pos, "Double")
192    }
193
194    fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
195        // don't check length, following java client
196        self.get_string(pos)
197    }
198
199    fn get_string(&self, pos: usize) -> Result<&str> {
200        self.try_convert(pos, "String")
201    }
202
203    fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) -> Result<Decimal> {
204        match self.get_value(pos)? {
205            Datum::Decimal(d) => Ok(d.clone()),
206            other => Err(IllegalArgument {
207                message: format!(
208                    "type mismatch at position {pos}: expected Decimal, got {other:?}"
209                ),
210            }),
211        }
212    }
213
214    fn get_date(&self, pos: usize) -> Result<Date> {
215        match self.get_value(pos)? {
216            Datum::Date(d) => Ok(*d),
217            Datum::Int32(i) => Ok(Date::new(*i)),
218            other => Err(IllegalArgument {
219                message: format!(
220                    "type mismatch at position {pos}: expected Date or Int32, got {other:?}"
221                ),
222            }),
223        }
224    }
225
226    fn get_time(&self, pos: usize) -> Result<Time> {
227        match self.get_value(pos)? {
228            Datum::Time(t) => Ok(*t),
229            Datum::Int32(i) => Ok(Time::new(*i)),
230            other => Err(IllegalArgument {
231                message: format!(
232                    "type mismatch at position {pos}: expected Time or Int32, got {other:?}"
233                ),
234            }),
235        }
236    }
237
238    fn get_timestamp_ntz(&self, pos: usize, _precision: u32) -> Result<TimestampNtz> {
239        match self.get_value(pos)? {
240            Datum::TimestampNtz(t) => Ok(*t),
241            other => Err(IllegalArgument {
242                message: format!(
243                    "type mismatch at position {pos}: expected TimestampNtz, got {other:?}"
244                ),
245            }),
246        }
247    }
248
249    fn get_timestamp_ltz(&self, pos: usize, _precision: u32) -> Result<TimestampLtz> {
250        match self.get_value(pos)? {
251            Datum::TimestampLtz(t) => Ok(*t),
252            other => Err(IllegalArgument {
253                message: format!(
254                    "type mismatch at position {pos}: expected TimestampLtz, got {other:?}"
255                ),
256            }),
257        }
258    }
259
260    fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
261        match self.get_value(pos)? {
262            Datum::Blob(b) => Ok(b.as_ref()),
263            other => Err(IllegalArgument {
264                message: format!("type mismatch at position {pos}: expected Binary, got {other:?}"),
265            }),
266        }
267    }
268
269    fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
270        match self.get_value(pos)? {
271            Datum::Blob(b) => Ok(b.as_ref()),
272            other => Err(IllegalArgument {
273                message: format!("type mismatch at position {pos}: expected Bytes, got {other:?}"),
274            }),
275        }
276    }
277}
278
279impl<'a> GenericRow<'a> {
280    pub fn from_data(data: Vec<impl Into<Datum<'a>>>) -> GenericRow<'a> {
281        GenericRow {
282            values: data.into_iter().map(Into::into).collect(),
283        }
284    }
285
286    /// Creates a GenericRow with the specified number of fields, all initialized to null.
287    ///
288    /// This is useful when you need to create a row with a specific field count
289    /// but only want to set some fields (e.g., for KV delete operations where
290    /// only primary key fields need to be set).
291    ///
292    /// # Example
293    /// ```
294    /// use fluss::row::GenericRow;
295    ///
296    /// let mut row = GenericRow::new(3);
297    /// row.set_field(0, 42); // Only set the primary key
298    /// // Fields 1 and 2 remain null
299    /// ```
300    pub fn new(field_count: usize) -> GenericRow<'a> {
301        GenericRow {
302            values: vec![Datum::Null; field_count],
303        }
304    }
305
306    /// Sets the field at the given position to the specified value.
307    ///
308    /// # Panics
309    /// Panics if `pos` is out of bounds (>= field count).
310    pub fn set_field(&mut self, pos: usize, value: impl Into<Datum<'a>>) {
311        self.values[pos] = value.into();
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn is_null_at_checks_datum_nullity() {
321        let mut row = GenericRow::new(2);
322        row.set_field(0, Datum::Null);
323        row.set_field(1, 42_i32);
324
325        assert!(row.is_null_at(0).unwrap());
326        assert!(!row.is_null_at(1).unwrap());
327    }
328
329    #[test]
330    fn is_null_at_out_of_bounds_returns_error() {
331        let row = GenericRow::from_data(vec![42_i32]);
332        let err = row.is_null_at(5).unwrap_err();
333        assert!(
334            err.to_string().contains("out of bounds"),
335            "Expected out of bounds error, got: {err}"
336        );
337    }
338
339    #[test]
340    fn new_initializes_nulls() {
341        let row = GenericRow::new(3);
342        assert_eq!(row.get_field_count(), 3);
343        assert!(row.is_null_at(0).unwrap());
344        assert!(row.is_null_at(1).unwrap());
345        assert!(row.is_null_at(2).unwrap());
346    }
347
348    #[test]
349    fn partial_row_for_delete() {
350        // Simulates delete scenario: only primary key (field 0) is set
351        let mut row = GenericRow::new(3);
352        row.set_field(0, 123_i32);
353        // Fields 1 and 2 remain null
354        assert_eq!(row.get_field_count(), 3);
355        assert_eq!(row.get_int(0).unwrap(), 123);
356        assert!(row.is_null_at(1).unwrap());
357        assert!(row.is_null_at(2).unwrap());
358    }
359
360    #[test]
361    fn type_mismatch_returns_error() {
362        let row = GenericRow::from_data(vec![Datum::Int64(999)]);
363        let err = row.get_string(0).unwrap_err();
364        assert!(
365            err.to_string().contains("type mismatch"),
366            "Expected type mismatch error, got: {err}"
367        );
368    }
369
370    #[test]
371    fn out_of_bounds_returns_error() {
372        let row = GenericRow::from_data(vec![42_i32]);
373        let err = row.get_int(5).unwrap_err();
374        assert!(
375            err.to_string().contains("out of bounds"),
376            "Expected out of bounds error, got: {err}"
377        );
378    }
379}