Skip to main content

fluss/row/encode/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod compacted_key_encoder;
19mod compacted_row_encoder;
20
21use crate::error::{Error, Result};
22use crate::metadata::{DataLakeFormat, KvFormat, RowType};
23use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
24use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
25use crate::row::{Datum, InternalRow};
26use bytes::Bytes;
27
28/// An interface for encoding key of row into bytes.
29#[allow(dead_code)]
30pub trait KeyEncoder: Send + Sync {
31    fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
32}
33
34pub struct KeyEncoderFactory;
35
36impl KeyEncoderFactory {
37    /// Create a key encoder to encode the key bytes of the input row.
38    /// # Arguments
39    /// * `row_type` - the row type of the input row
40    /// * `key_fields` - the key fields to encode
41    /// * `lake_format` - the data lake format
42    ///
43    /// # Returns
44    /// key encoder
45    pub fn of(
46        row_type: &RowType,
47        key_fields: &[String],
48        data_lake_format: &Option<DataLakeFormat>,
49    ) -> Result<Box<dyn KeyEncoder>> {
50        match data_lake_format {
51            Some(DataLakeFormat::Paimon) => Err(Error::UnsupportedOperation {
52                message: "KeyEncoder for Paimon format is not yet implemented".to_string(),
53            }),
54            Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
55                row_type, key_fields,
56            )?)),
57            Some(DataLakeFormat::Iceberg) => Err(Error::UnsupportedOperation {
58                message: "KeyEncoder for Iceberg format is not yet implemented".to_string(),
59            }),
60            None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
61                row_type, key_fields,
62            )?)),
63        }
64    }
65}
66
67/// An encoder to write binary row data. It's used to write rows
68/// one by one. When writing a new row:
69///
70/// 1. call method [`RowEncoder::start_new_row()`] to start the writing.
71/// 2. call method [`RowEncoder::encode_field()`] to write the row's field.
72/// 3. call method [`RowEncoder::finish_row()`] to finish the writing and get the written row.
73#[allow(dead_code)]
74pub trait RowEncoder: Send + Sync {
75    /// Start to write a new row.
76    ///
77    /// # Returns
78    /// * Ok(()) if successful
79    fn start_new_row(&mut self) -> Result<()>;
80
81    /// Write the row's field in given pos with given value.
82    ///
83    /// # Arguments
84    /// * pos - the position of the field to write.
85    /// * value - the value of the field to write.
86    ///
87    /// # Returns
88    /// * Ok(()) if successful
89    fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()>;
90
91    /// Finish write the row, returns the written row.
92    ///
93    /// Note that returned row borrows from [`RowEncoder`]'s internal buffer which is reused for subsequent rows
94    /// [`RowEncoder::start_new_row()`] should only be called after the returned row goes out of scope.
95    ///
96    /// # Returns
97    /// * the written row
98    fn finish_row(&mut self) -> Result<Bytes>;
99
100    /// Closes the row encoder
101    ///
102    /// # Returns
103    /// * Ok(()) if successful
104    fn close(&mut self) -> Result<()>;
105}
106
107#[allow(dead_code)]
108pub struct RowEncoderFactory {}
109
110#[allow(dead_code)]
111impl RowEncoderFactory {
112    pub fn create(kv_format: KvFormat, row_type: RowType) -> Result<impl RowEncoder> {
113        Self::create_for_field_types(kv_format, row_type)
114    }
115
116    pub fn create_for_field_types(
117        kv_format: KvFormat,
118        row_type: RowType,
119    ) -> Result<impl RowEncoder> {
120        match kv_format {
121            KvFormat::INDEXED => {
122                todo!()
123            }
124            KvFormat::COMPACTED => CompactedRowEncoder::new(row_type),
125        }
126    }
127}