fluss/row/encode/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod compacted_key_encoder;
19mod compacted_row_encoder;
20
21use crate::error::{Error, Result};
22use crate::metadata::{DataLakeFormat, KvFormat, RowType};
23use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
24use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
25use crate::row::{Datum, InternalRow};
26use bytes::Bytes;
27
28/// An interface for encoding key of row into bytes.
29#[allow(dead_code)]
30pub trait KeyEncoder: Send + Sync {
31 fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
32}
33
34pub struct KeyEncoderFactory;
35
36impl KeyEncoderFactory {
37 /// Create a key encoder to encode the key bytes of the input row.
38 /// # Arguments
39 /// * `row_type` - the row type of the input row
40 /// * `key_fields` - the key fields to encode
41 /// * `lake_format` - the data lake format
42 ///
43 /// # Returns
44 /// key encoder
45 pub fn of(
46 row_type: &RowType,
47 key_fields: &[String],
48 data_lake_format: &Option<DataLakeFormat>,
49 ) -> Result<Box<dyn KeyEncoder>> {
50 match data_lake_format {
51 Some(DataLakeFormat::Paimon) => Err(Error::UnsupportedOperation {
52 message: "KeyEncoder for Paimon format is not yet implemented".to_string(),
53 }),
54 Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
55 row_type, key_fields,
56 )?)),
57 Some(DataLakeFormat::Iceberg) => Err(Error::UnsupportedOperation {
58 message: "KeyEncoder for Iceberg format is not yet implemented".to_string(),
59 }),
60 None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
61 row_type, key_fields,
62 )?)),
63 }
64 }
65}
66
67/// An encoder to write binary row data. It's used to write rows
68/// one by one. When writing a new row:
69///
70/// 1. call method [`RowEncoder::start_new_row()`] to start the writing.
71/// 2. call method [`RowEncoder::encode_field()`] to write the row's field.
72/// 3. call method [`RowEncoder::finish_row()`] to finish the writing and get the written row.
73#[allow(dead_code)]
74pub trait RowEncoder: Send + Sync {
75 /// Start to write a new row.
76 ///
77 /// # Returns
78 /// * Ok(()) if successful
79 fn start_new_row(&mut self) -> Result<()>;
80
81 /// Write the row's field in given pos with given value.
82 ///
83 /// # Arguments
84 /// * pos - the position of the field to write.
85 /// * value - the value of the field to write.
86 ///
87 /// # Returns
88 /// * Ok(()) if successful
89 fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()>;
90
91 /// Finish write the row, returns the written row.
92 ///
93 /// Note that returned row borrows from [`RowEncoder`]'s internal buffer which is reused for subsequent rows
94 /// [`RowEncoder::start_new_row()`] should only be called after the returned row goes out of scope.
95 ///
96 /// # Returns
97 /// * the written row
98 fn finish_row(&mut self) -> Result<Bytes>;
99
100 /// Closes the row encoder
101 ///
102 /// # Returns
103 /// * Ok(()) if successful
104 fn close(&mut self) -> Result<()>;
105}
106
107#[allow(dead_code)]
108pub struct RowEncoderFactory {}
109
110#[allow(dead_code)]
111impl RowEncoderFactory {
112 pub fn create(kv_format: KvFormat, row_type: RowType) -> Result<impl RowEncoder> {
113 Self::create_for_field_types(kv_format, row_type)
114 }
115
116 pub fn create_for_field_types(
117 kv_format: KvFormat,
118 row_type: RowType,
119 ) -> Result<impl RowEncoder> {
120 match kv_format {
121 KvFormat::INDEXED => {
122 todo!()
123 }
124 KvFormat::COMPACTED => CompactedRowEncoder::new(row_type),
125 }
126 }
127}