Skip to main content

fluss/row/
row_decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Row decoder for deserializing binary row formats.
19//!
20//! Mirrors the Java org.apache.fluss.row.decode package.
21
22use crate::error::{Error, Result};
23use crate::metadata::{KvFormat, RowType};
24use crate::row::compacted::{CompactedRow, CompactedRowDeserializer};
25use std::sync::Arc;
26
27/// Decoder for creating BinaryRow from bytes.
28///
29/// This trait provides an abstraction for decoding different row formats
30/// (COMPACTED, INDEXED, etc.) from binary data.
31///
32/// Reference: org.apache.fluss.row.decode.RowDecoder
33pub trait RowDecoder: Send + Sync {
34    /// Decode bytes into a CompactedRow.
35    ///
36    /// The lifetime 'a ties the returned row to the input data, ensuring
37    /// the data remains valid as long as the row is used.
38    fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a>;
39}
40
41/// Decoder for CompactedRow format.
42///
43/// Uses the existing CompactedRow infrastructure for decoding.
44/// This is a thin wrapper that implements the RowDecoder trait.
45///
46/// Reference: org.apache.fluss.row.decode.CompactedRowDecoder
47pub struct CompactedRowDecoder {
48    field_count: usize,
49    deserializer: Arc<CompactedRowDeserializer<'static>>,
50}
51
52impl CompactedRowDecoder {
53    /// Create a new CompactedRowDecoder with the given row type.
54    pub fn new(row_type: RowType) -> Self {
55        let field_count = row_type.fields().len();
56        let deserializer = Arc::new(CompactedRowDeserializer::new_from_owned(row_type));
57
58        Self {
59            field_count,
60            deserializer,
61        }
62    }
63}
64
65impl RowDecoder for CompactedRowDecoder {
66    fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a> {
67        // Use existing CompactedRow::deserialize() infrastructure
68        CompactedRow::deserialize(Arc::clone(&self.deserializer), self.field_count, data)
69    }
70}
71
72/// Factory for creating RowDecoders based on KvFormat.
73///
74/// Reference: org.apache.fluss.row.decode.RowDecoder.create()
75pub struct RowDecoderFactory;
76
77impl RowDecoderFactory {
78    /// Create a RowDecoder for the given format and row type.
79    pub fn create(kv_format: KvFormat, row_type: RowType) -> Result<Arc<dyn RowDecoder>> {
80        match kv_format {
81            KvFormat::COMPACTED => Ok(Arc::new(CompactedRowDecoder::new(row_type))),
82            KvFormat::INDEXED => Err(Error::UnsupportedOperation {
83                message: "INDEXED format is not yet supported".to_string(),
84            }),
85        }
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use crate::metadata::DataTypes;
93    use crate::row::InternalRow;
94    use crate::row::binary::BinaryWriter;
95    use crate::row::compacted::CompactedRowWriter;
96
97    #[test]
98    fn test_compacted_row_decoder() {
99        // Write a CompactedRow
100        let mut writer = CompactedRowWriter::new(2);
101        writer.write_int(42);
102        writer.write_string("hello");
103
104        let data = writer.to_bytes();
105
106        // Create decoder with RowType
107        let row_type = RowType::with_data_types(vec![DataTypes::int(), DataTypes::string()]);
108        let decoder = CompactedRowDecoder::new(row_type);
109
110        // Decode
111        let row = decoder.decode(&data);
112
113        // Verify
114        assert_eq!(row.get_field_count(), 2);
115        assert_eq!(row.get_int(0).unwrap(), 42);
116        assert_eq!(row.get_string(1).unwrap(), "hello");
117    }
118
119    #[test]
120    fn test_row_decoder_factory() {
121        let row_type = RowType::with_data_types(vec![DataTypes::int(), DataTypes::string()]);
122        let decoder = RowDecoderFactory::create(KvFormat::COMPACTED, row_type).unwrap();
123
124        // Write a row
125        let mut writer = CompactedRowWriter::new(2);
126        writer.write_int(100);
127        writer.write_string("world");
128        let data = writer.to_bytes();
129
130        // Decode
131        let row = decoder.decode(&data);
132
133        // Verify
134        assert_eq!(row.get_int(0).unwrap(), 100);
135        assert_eq!(row.get_string(1).unwrap(), "world");
136    }
137}