Skip to main content

fluss/metadata/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::compression::ArrowCompressionInfo;
19use crate::error::Error::IllegalArgument;
20use crate::error::{Error, Result};
21use crate::metadata::DataLakeFormat;
22use crate::metadata::datatype::{DataField, DataType, RowType};
23use crate::{BucketId, PartitionId, TableId};
24use core::fmt;
25use serde::{Deserialize, Serialize};
26use std::collections::{HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::sync::Arc;
29use strum_macros::EnumString;
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
32pub struct Column {
33    name: String,
34    data_type: DataType,
35    comment: Option<String>,
36}
37
38impl Column {
39    pub fn new<N: Into<String>>(name: N, data_type: DataType) -> Self {
40        Self {
41            name: name.into(),
42            data_type,
43            comment: None,
44        }
45    }
46
47    pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
48        self.comment = Some(comment.into());
49        self
50    }
51
52    pub fn with_data_type(&self, data_type: DataType) -> Self {
53        Self {
54            name: self.name.clone(),
55            data_type: data_type.clone(),
56            comment: self.comment.clone(),
57        }
58    }
59
60    // Getters...
61    pub fn name(&self) -> &str {
62        &self.name
63    }
64
65    pub fn data_type(&self) -> &DataType {
66        &self.data_type
67    }
68
69    pub fn comment(&self) -> Option<&str> {
70        self.comment.as_deref()
71    }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
75pub struct PrimaryKey {
76    constraint_name: String,
77    column_names: Vec<String>,
78}
79
80impl PrimaryKey {
81    pub fn new<N: Into<String>>(constraint_name: N, column_names: Vec<String>) -> Self {
82        Self {
83            constraint_name: constraint_name.into(),
84            column_names,
85        }
86    }
87
88    // Getters...
89    pub fn constraint_name(&self) -> &str {
90        &self.constraint_name
91    }
92
93    pub fn column_names(&self) -> &[String] {
94        &self.column_names
95    }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99pub struct Schema {
100    columns: Vec<Column>,
101    primary_key: Option<PrimaryKey>,
102    row_type: RowType,
103    auto_increment_col_names: Vec<String>,
104}
105
106impl Schema {
107    pub fn empty() -> Result<Self> {
108        Self::builder().build()
109    }
110
111    pub fn builder() -> SchemaBuilder {
112        SchemaBuilder::new()
113    }
114
115    pub fn columns(&self) -> &[Column] {
116        &self.columns
117    }
118
119    pub fn primary_key(&self) -> Option<&PrimaryKey> {
120        self.primary_key.as_ref()
121    }
122
123    pub fn row_type(&self) -> &RowType {
124        &self.row_type
125    }
126
127    pub fn primary_key_indexes(&self) -> Vec<usize> {
128        self.primary_key
129            .as_ref()
130            .map(|pk| {
131                pk.column_names
132                    .iter()
133                    .filter_map(|name| self.columns.iter().position(|c| &c.name == name))
134                    .collect()
135            })
136            .unwrap_or_default()
137    }
138
139    pub fn primary_key_column_names(&self) -> Vec<&str> {
140        self.primary_key
141            .as_ref()
142            .map(|pk| pk.column_names.iter().map(|s| s.as_str()).collect())
143            .unwrap_or_default()
144    }
145
146    pub fn column_names(&self) -> Vec<&str> {
147        self.columns.iter().map(|c| c.name.as_str()).collect()
148    }
149
150    pub fn auto_increment_col_names(&self) -> &Vec<String> {
151        &self.auto_increment_col_names
152    }
153}
154
155#[derive(Debug, Default)]
156pub struct SchemaBuilder {
157    columns: Vec<Column>,
158    primary_key: Option<PrimaryKey>,
159    auto_increment_col_names: Vec<String>,
160}
161
162impl SchemaBuilder {
163    pub fn new() -> Self {
164        Self::default()
165    }
166
167    pub fn with_row_type(mut self, row_type: &DataType) -> Self {
168        match row_type {
169            DataType::Row(row) => {
170                for data_field in row.fields() {
171                    self = self.column(&data_field.name, data_field.data_type.clone())
172                }
173                self
174            }
175            _ => {
176                panic!("data type must be row type")
177            }
178        }
179    }
180
181    pub fn column<N: Into<String>>(mut self, name: N, data_type: DataType) -> Self {
182        self.columns.push(Column::new(name.into(), data_type));
183        self
184    }
185
186    pub fn with_columns(mut self, columns: Vec<Column>) -> Self {
187        self.columns.extend_from_slice(columns.as_ref());
188        self
189    }
190
191    pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
192        if let Some(last) = self.columns.last_mut() {
193            *last = last.clone().with_comment(comment.into());
194        }
195        self
196    }
197
198    pub fn primary_key<I, S>(self, column_names: I) -> Self
199    where
200        I: IntoIterator<Item = S>,
201        S: Into<String>,
202    {
203        let names: Vec<String> = column_names.into_iter().map(|s| s.into()).collect();
204
205        let constraint_name = format!("PK_{}", names.join("_"));
206
207        self.primary_key_named(&constraint_name, names)
208    }
209
210    pub fn primary_key_named<N: Into<String>, P: Into<String>>(
211        mut self,
212        constraint_name: N,
213        column_names: Vec<P>,
214    ) -> Self {
215        self.primary_key = Some(PrimaryKey::new(
216            constraint_name.into(),
217            column_names.into_iter().map(|s| s.into()).collect(),
218        ));
219        self
220    }
221
222    /// Declares a column to be auto-incremented. With an auto-increment column in the table,
223    /// whenever a new row is inserted into the table, the new row will be assigned with the next
224    /// available value from the auto-increment sequence. A table can have at most one auto
225    /// increment column.
226    pub fn enable_auto_increment<N: Into<String>>(mut self, column_name: N) -> Result<Self> {
227        if !self.auto_increment_col_names.is_empty() {
228            return Err(IllegalArgument {
229                message: "Multiple auto increment columns are not supported yet.".to_string(),
230            });
231        }
232
233        self.auto_increment_col_names.push(column_name.into());
234        Ok(self)
235    }
236
237    pub fn build(&self) -> Result<Schema> {
238        let columns = Self::normalize_columns(&self.columns, self.primary_key.as_ref())?;
239
240        let column_names: HashSet<_> = columns.iter().map(|c| &c.name).collect();
241        for auto_inc_col in &self.auto_increment_col_names {
242            if !column_names.contains(auto_inc_col) {
243                return Err(IllegalArgument {
244                    message: format!(
245                        "Auto increment column '{auto_inc_col}' is not found in the schema columns."
246                    ),
247                });
248            }
249        }
250
251        let data_fields = columns
252            .iter()
253            .map(|c| DataField {
254                name: c.name.clone(),
255                data_type: c.data_type.clone(),
256                description: c.comment.clone(),
257            })
258            .collect();
259
260        Ok(Schema {
261            columns,
262            primary_key: self.primary_key.clone(),
263            row_type: RowType::new(data_fields),
264            auto_increment_col_names: self.auto_increment_col_names.clone(),
265        })
266    }
267
268    fn normalize_columns(
269        columns: &[Column],
270        primary_key: Option<&PrimaryKey>,
271    ) -> Result<Vec<Column>> {
272        let names: Vec<_> = columns.iter().map(|c| &c.name).collect();
273        if let Some(duplicates) = Self::find_duplicates(&names) {
274            return Err(Error::invalid_table(format!(
275                "Duplicate column names found: {duplicates:?}"
276            )));
277        }
278
279        let Some(pk) = primary_key else {
280            return Ok(columns.to_vec());
281        };
282
283        let pk_set: HashSet<_> = pk.column_names.iter().collect();
284        let all_columns: HashSet<_> = columns.iter().map(|c| &c.name).collect();
285        if !pk_set.is_subset(&all_columns) {
286            return Err(Error::invalid_table(format!(
287                "Primary key columns {pk_set:?} not found in schema"
288            )));
289        }
290
291        Ok(columns
292            .iter()
293            .map(|col| {
294                if pk_set.contains(&col.name) && col.data_type.is_nullable() {
295                    col.with_data_type(col.data_type.as_non_nullable())
296                } else {
297                    col.clone()
298                }
299            })
300            .collect())
301    }
302
303    fn find_duplicates<'a>(names: &'a [&String]) -> Option<HashSet<&'a String>> {
304        let mut seen = HashSet::new();
305        let mut duplicates = HashSet::new();
306
307        for name in names {
308            if !seen.insert(name) {
309                duplicates.insert(*name);
310            }
311        }
312
313        if duplicates.is_empty() {
314            None
315        } else {
316            Some(duplicates)
317        }
318    }
319}
320
321/// distribution of table
322#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
323pub struct TableDistribution {
324    bucket_count: Option<i32>,
325    bucket_keys: Vec<String>,
326}
327
328impl TableDistribution {
329    pub fn bucket_keys(&self) -> &[String] {
330        &self.bucket_keys
331    }
332
333    pub fn bucket_count(&self) -> Option<i32> {
334        self.bucket_count
335    }
336}
337
338#[derive(Debug, Default)]
339pub struct TableDescriptorBuilder {
340    schema: Option<Schema>,
341    properties: HashMap<String, String>,
342    custom_properties: HashMap<String, String>,
343    partition_keys: Arc<[String]>,
344    comment: Option<String>,
345    table_distribution: Option<TableDistribution>,
346}
347
348impl TableDescriptorBuilder {
349    pub fn new() -> Self {
350        Self::default()
351    }
352
353    pub fn schema(mut self, schema: Schema) -> Self {
354        self.schema = Some(schema);
355        self
356    }
357
358    pub fn log_format(mut self, log_format: LogFormat) -> Self {
359        self.properties
360            .insert("table.log.format".to_string(), log_format.to_string());
361        self
362    }
363
364    pub fn kv_format(mut self, kv_format: KvFormat) -> Self {
365        self.properties
366            .insert("table.kv.format".to_string(), kv_format.to_string());
367        self
368    }
369
370    pub fn property<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
371        self.properties.insert(key.into(), value.into());
372        self
373    }
374
375    pub fn properties<K: Into<String>, V: Into<String>>(
376        mut self,
377        properties: HashMap<K, V>,
378    ) -> Self {
379        for (k, v) in properties {
380            self.properties.insert(k.into(), v.into());
381        }
382        self
383    }
384
385    pub fn custom_property<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
386        self.custom_properties.insert(key.into(), value.into());
387        self
388    }
389
390    pub fn custom_properties<K: Into<String>, V: Into<String>>(
391        mut self,
392        custom_properties: HashMap<K, V>,
393    ) -> Self {
394        for (k, v) in custom_properties {
395            self.custom_properties.insert(k.into(), v.into());
396        }
397        self
398    }
399
400    pub fn partitioned_by<P: Into<String>>(mut self, partition_keys: Vec<P>) -> Self {
401        self.partition_keys = Arc::from(
402            partition_keys
403                .into_iter()
404                .map(|s| s.into())
405                .collect::<Vec<String>>(),
406        );
407        self
408    }
409
410    pub fn distributed_by(mut self, bucket_count: Option<i32>, bucket_keys: Vec<String>) -> Self {
411        self.table_distribution = Some(TableDistribution {
412            bucket_count,
413            bucket_keys,
414        });
415        self
416    }
417
418    pub fn comment<S: Into<String>>(mut self, comment: S) -> Self {
419        self.comment = Some(comment.into());
420        self
421    }
422
423    pub fn build(self) -> Result<TableDescriptor> {
424        let schema = self.schema.expect("Schema must be set");
425        let table_distribution = TableDescriptor::normalize_distribution(
426            &schema,
427            &self.partition_keys,
428            self.table_distribution,
429        )?;
430        Ok(TableDescriptor {
431            schema,
432            comment: self.comment,
433            partition_keys: self.partition_keys,
434            table_distribution,
435            properties: self.properties,
436            custom_properties: self.custom_properties,
437        })
438    }
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
442pub struct TableDescriptor {
443    schema: Schema,
444    comment: Option<String>,
445    partition_keys: Arc<[String]>,
446    table_distribution: Option<TableDistribution>,
447    properties: HashMap<String, String>,
448    custom_properties: HashMap<String, String>,
449}
450
451impl TableDescriptor {
452    pub fn builder() -> TableDescriptorBuilder {
453        TableDescriptorBuilder::new()
454    }
455
456    pub fn schema(&self) -> &Schema {
457        &self.schema
458    }
459
460    pub fn bucket_keys(&self) -> Vec<&str> {
461        self.table_distribution
462            .as_ref()
463            .map(|td| td.bucket_keys.iter().map(|s| s.as_str()).collect())
464            .unwrap_or_default()
465    }
466
467    pub fn is_default_bucket_key(&self) -> Result<bool> {
468        if self.schema.primary_key().is_some() {
469            Ok(self.bucket_keys()
470                == Self::default_bucket_key_of_primary_key_table(
471                    self.schema(),
472                    &self.partition_keys,
473                )?
474                .iter()
475                .map(|s| s.as_str())
476                .collect::<Vec<_>>())
477        } else {
478            Ok(self.bucket_keys().is_empty())
479        }
480    }
481
482    pub fn is_partitioned(&self) -> bool {
483        !self.partition_keys.is_empty()
484    }
485
486    pub fn has_primary_key(&self) -> bool {
487        self.schema.primary_key().is_some()
488    }
489
490    pub fn partition_keys(&self) -> &[String] {
491        &self.partition_keys
492    }
493
494    pub fn table_distribution(&self) -> Option<&TableDistribution> {
495        self.table_distribution.as_ref()
496    }
497
498    pub fn properties(&self) -> &HashMap<String, String> {
499        &self.properties
500    }
501
502    pub fn custom_properties(&self) -> &HashMap<String, String> {
503        &self.custom_properties
504    }
505
506    pub fn replication_factor(&self) -> Result<i32> {
507        self.properties
508            .get("table.replication.factor")
509            .ok_or_else(|| Error::invalid_table("Replication factor is not set"))?
510            .parse()
511            .map_err(|_e| Error::invalid_table("Replication factor can't be converted to int"))
512    }
513
514    pub fn with_properties<K: Into<String>, V: Into<String>>(
515        &self,
516        new_properties: HashMap<K, V>,
517    ) -> Self {
518        let mut properties = HashMap::new();
519        for (k, v) in new_properties {
520            properties.insert(k.into(), v.into());
521        }
522        Self {
523            properties,
524            ..self.clone()
525        }
526    }
527
528    pub fn with_replication_factor(&self, new_replication_factor: i32) -> Self {
529        let mut properties = self.properties.clone();
530        properties.insert(
531            "table.replication.factor".to_string(),
532            new_replication_factor.to_string(),
533        );
534        self.with_properties(properties)
535    }
536
537    pub fn with_bucket_count(&self, new_bucket_count: i32) -> Self {
538        Self {
539            table_distribution: Some(TableDistribution {
540                bucket_count: Some(new_bucket_count),
541                bucket_keys: self
542                    .table_distribution
543                    .as_ref()
544                    .map(|td| td.bucket_keys.clone())
545                    .unwrap_or_default(),
546            }),
547            ..self.clone()
548        }
549    }
550
551    pub fn comment(&self) -> Option<&str> {
552        self.comment.as_deref()
553    }
554
555    fn default_bucket_key_of_primary_key_table(
556        schema: &Schema,
557        partition_keys: &[String],
558    ) -> Result<Vec<String>> {
559        let mut bucket_keys = schema
560            .primary_key()
561            .expect("Primary key must be set")
562            .column_names()
563            .to_vec();
564
565        bucket_keys.retain(|k| !partition_keys.contains(k));
566
567        if bucket_keys.is_empty() {
568            return Err(Error::invalid_table(format!(
569                "Primary Key constraint {:?} should not be same with partition fields {:?}.",
570                schema.primary_key().unwrap().column_names(),
571                partition_keys
572            )));
573        }
574
575        Ok(bucket_keys)
576    }
577
578    fn normalize_distribution(
579        schema: &Schema,
580        partition_keys: &[String],
581        origin_distribution: Option<TableDistribution>,
582    ) -> Result<Option<TableDistribution>> {
583        if let Some(distribution) = origin_distribution {
584            if distribution
585                .bucket_keys
586                .iter()
587                .any(|k| partition_keys.contains(k))
588            {
589                return Err(Error::invalid_table(format!(
590                    "Bucket key {:?} shouldn't include any column in partition keys {:?}.",
591                    distribution.bucket_keys, partition_keys
592                )));
593            }
594
595            return if let Some(pk) = schema.primary_key() {
596                if distribution.bucket_keys.is_empty() {
597                    Ok(Some(TableDistribution {
598                        bucket_count: distribution.bucket_count,
599                        bucket_keys: Self::default_bucket_key_of_primary_key_table(
600                            schema,
601                            partition_keys,
602                        )?,
603                    }))
604                } else {
605                    let pk_columns: HashSet<_> = pk.column_names().iter().collect();
606                    if !distribution
607                        .bucket_keys
608                        .iter()
609                        .all(|k| pk_columns.contains(k))
610                    {
611                        return Err(Error::invalid_table(format!(
612                            "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. \
613                            The primary keys are {:?}, the partition keys are {:?}, but the user-defined bucket keys are {:?}.",
614                            pk.column_names(),
615                            partition_keys,
616                            distribution.bucket_keys
617                        )));
618                    }
619                    Ok(Some(distribution))
620                }
621            } else {
622                Ok(Some(distribution))
623            };
624        } else if schema.primary_key().is_some() {
625            return Ok(Some(TableDistribution {
626                bucket_count: None,
627                bucket_keys: Self::default_bucket_key_of_primary_key_table(schema, partition_keys)?,
628            }));
629        }
630
631        Ok(None)
632    }
633}
634
635#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
636pub enum LogFormat {
637    ARROW,
638    INDEXED,
639}
640
641impl Display for LogFormat {
642    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
643        match self {
644            LogFormat::ARROW => {
645                write!(f, "ARROW")?;
646            }
647            LogFormat::INDEXED => {
648                write!(f, "INDEXED")?;
649            }
650        }
651        Ok(())
652    }
653}
654
655impl LogFormat {
656    pub fn parse(s: &str) -> Result<Self> {
657        match s.to_uppercase().as_str() {
658            "ARROW" => Ok(LogFormat::ARROW),
659            "INDEXED" => Ok(LogFormat::INDEXED),
660            _ => Err(Error::invalid_table(format!("Unknown log format: {s}"))),
661        }
662    }
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, EnumString)]
666pub enum KvFormat {
667    INDEXED,
668    COMPACTED,
669}
670
671impl Display for KvFormat {
672    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
673        match self {
674            KvFormat::COMPACTED => write!(f, "COMPACTED")?,
675            KvFormat::INDEXED => write!(f, "INDEXED")?,
676        }
677        Ok(())
678    }
679}
680
681impl KvFormat {
682    pub fn parse(s: &str) -> Result<Self> {
683        match s.to_uppercase().as_str() {
684            "INDEXED" => Ok(KvFormat::INDEXED),
685            "COMPACTED" => Ok(KvFormat::COMPACTED),
686            _ => Err(Error::invalid_table(format!("Unknown kv format: {s}"))),
687        }
688    }
689}
690
691#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
692pub struct TablePath {
693    database: String,
694    table: String,
695}
696
697impl Display for TablePath {
698    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
699        write!(f, "{}.{}", self.database, self.table)
700    }
701}
702
703const MAX_NAME_LENGTH: usize = 200;
704
705const INTERNAL_NAME_PREFIX: &str = "__";
706
707impl TablePath {
708    pub fn new<D: Into<String>, T: Into<String>>(db: D, tbl: T) -> Self {
709        TablePath {
710            database: db.into(),
711            table: tbl.into(),
712        }
713    }
714
715    #[inline]
716    pub fn database(&self) -> &str {
717        &self.database
718    }
719
720    #[inline]
721    pub fn table(&self) -> &str {
722        &self.table
723    }
724
725    pub fn detect_invalid_name(identifier: &str) -> Option<String> {
726        if identifier.is_empty() {
727            return Some("the empty string is not allowed".to_string());
728        }
729        if identifier == "." {
730            return Some("'.' is not allowed".to_string());
731        }
732        if identifier == ".." {
733            return Some("'..' is not allowed".to_string());
734        }
735        if identifier.len() > MAX_NAME_LENGTH {
736            return Some(format!(
737                "the length of '{identifier}' is longer than the max allowed length {MAX_NAME_LENGTH}"
738            ));
739        }
740        if Self::contains_invalid_pattern(identifier) {
741            return Some(format!(
742                "'{identifier}' contains one or more characters other than ASCII alphanumerics, '_' and '-'"
743            ));
744        }
745        None
746    }
747
748    pub fn validate_prefix(identifier: &str) -> Option<String> {
749        if identifier.starts_with(INTERNAL_NAME_PREFIX) {
750            return Some(format!(
751                "'{INTERNAL_NAME_PREFIX}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server"
752            ));
753        }
754        None
755    }
756
757    // Valid characters for Fluss table names are the ASCII alphanumerics, '_' and '-'.
758    fn contains_invalid_pattern(identifier: &str) -> bool {
759        for c in identifier.chars() {
760            let valid_char = c.is_ascii_alphanumeric() || c == '_' || c == '-';
761            if !valid_char {
762                return true;
763            }
764        }
765        false
766    }
767}
768
769/// A database name, table name and partition name combo. It's used to represent the physical path of
770/// a bucket. If the bucket belongs to a partition (i.e., the table is a partitioned table),
771/// `partition_name` will be `Some(...)`; otherwise, it will be `None`.
772#[derive(Debug, Clone, PartialEq, Eq, Hash)]
773pub struct PhysicalTablePath {
774    table_path: Arc<TablePath>,
775    partition_name: Option<String>,
776}
777
778impl PhysicalTablePath {
779    pub fn of(table_path: Arc<TablePath>) -> Self {
780        Self {
781            table_path,
782            partition_name: None,
783        }
784    }
785
786    pub fn of_partitioned(table_path: Arc<TablePath>, partition_name: Option<String>) -> Self {
787        Self {
788            table_path,
789            partition_name,
790        }
791    }
792
793    pub fn of_with_names<D: Into<String>, T: Into<String>, P: Into<String>>(
794        database_name: D,
795        table_name: T,
796        partition_name: Option<P>,
797    ) -> Self {
798        Self {
799            table_path: Arc::new(TablePath::new(database_name, table_name)),
800            partition_name: partition_name.map(|p| p.into()),
801        }
802    }
803
804    pub fn get_table_path(&self) -> &TablePath {
805        &self.table_path
806    }
807
808    pub fn get_database_name(&self) -> &str {
809        self.table_path.database()
810    }
811
812    pub fn get_table_name(&self) -> &str {
813        self.table_path.table()
814    }
815
816    pub fn get_partition_name(&self) -> Option<&String> {
817        self.partition_name.as_ref()
818    }
819}
820
821impl Display for PhysicalTablePath {
822    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
823        match &self.partition_name {
824            Some(partition) => write!(f, "{}(p={})", self.table_path, partition),
825            None => write!(f, "{}", self.table_path),
826        }
827    }
828}
829
830#[derive(Debug, Clone)]
831pub struct TableInfo {
832    pub table_path: TablePath,
833    pub table_id: TableId,
834    pub schema_id: i32,
835    pub schema: Schema,
836    pub row_type: RowType,
837    pub primary_keys: Vec<String>,
838    pub physical_primary_keys: Vec<String>,
839    pub bucket_keys: Vec<String>,
840    pub partition_keys: Arc<[String]>,
841    pub num_buckets: i32,
842    pub properties: HashMap<String, String>,
843    pub table_config: TableConfig,
844    pub custom_properties: HashMap<String, String>,
845    pub comment: Option<String>,
846    pub created_time: i64,
847    pub modified_time: i64,
848}
849
850impl TableInfo {
851    pub fn row_type(&self) -> &RowType {
852        &self.row_type
853    }
854}
855
856#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
857pub struct AutoPartitionStrategy {
858    auto_partition_enabled: bool,
859    auto_partition_key: Option<String>,
860    auto_partition_time_unit: String,
861    auto_partition_num_precreate: i32,
862    auto_partition_num_retention: i32,
863    auto_partition_timezone: String,
864}
865
866impl AutoPartitionStrategy {
867    pub fn from(properties: &HashMap<String, String>) -> Self {
868        Self {
869            auto_partition_enabled: properties
870                .get("table.auto-partition.enabled")
871                .and_then(|s| s.parse().ok())
872                .unwrap_or(false),
873            auto_partition_key: properties
874                .get("table.auto-partition.key")
875                .map(|s| s.to_string()),
876            auto_partition_time_unit: properties
877                .get("table.auto-partition.time-unit")
878                .map(|s| s.to_string())
879                .unwrap_or_else(|| "DAY".to_string()),
880            auto_partition_num_precreate: properties
881                .get("table.auto-partition.num-precreate")
882                .and_then(|s| s.parse().ok())
883                .unwrap_or(2),
884            auto_partition_num_retention: properties
885                .get("table.auto-partition.num-retention")
886                .and_then(|s| s.parse().ok())
887                .unwrap_or(7),
888            auto_partition_timezone: properties
889                .get("table.auto-partition.time-zone")
890                .map(|s| s.to_string())
891                .unwrap_or_else(|| {
892                    jiff::tz::TimeZone::system()
893                        .iana_name()
894                        .unwrap_or("UTC")
895                        .to_string()
896                }),
897        }
898    }
899
900    pub fn is_auto_partition_enabled(&self) -> bool {
901        self.auto_partition_enabled
902    }
903
904    pub fn key(&self) -> Option<&str> {
905        self.auto_partition_key.as_deref()
906    }
907
908    pub fn time_unit(&self) -> &str {
909        &self.auto_partition_time_unit
910    }
911
912    pub fn num_precreate(&self) -> i32 {
913        self.auto_partition_num_precreate
914    }
915
916    pub fn num_retention(&self) -> i32 {
917        self.auto_partition_num_retention
918    }
919
920    pub fn timezone(&self) -> &str {
921        &self.auto_partition_timezone
922    }
923}
924
925#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
926pub struct TableConfig {
927    pub properties: HashMap<String, String>,
928}
929
930impl TableConfig {
931    pub fn from_properties(properties: HashMap<String, String>) -> Self {
932        TableConfig { properties }
933    }
934
935    pub fn get_arrow_compression_info(&self) -> Result<ArrowCompressionInfo> {
936        ArrowCompressionInfo::from_conf(&self.properties)
937    }
938
939    /// Returns the data lake format if configured, or None if not set.
940    pub fn get_datalake_format(&self) -> Result<Option<DataLakeFormat>> {
941        self.properties
942            .get("table.datalake.format")
943            .map(|f| f.parse().map_err(Error::from))
944            .transpose()
945    }
946
947    pub fn get_kv_format(&self) -> Result<KvFormat> {
948        // TODO: Consolidate configurations logic, constants, defaults in a single place
949        const DEFAULT_KV_FORMAT: &str = "COMPACTED";
950        let kv_format = self
951            .properties
952            .get("table.kv.format")
953            .map(String::as_str)
954            .unwrap_or(DEFAULT_KV_FORMAT);
955        kv_format.parse().map_err(Into::into)
956    }
957
958    pub fn get_log_format(&self) -> Result<LogFormat> {
959        // TODO: Consolidate configurations logic, constants, defaults in a single place
960        const DEFAULT_LOG_FORMAT: &str = "ARROW";
961        let log_format = self
962            .properties
963            .get("table.log.format")
964            .map(String::as_str)
965            .unwrap_or(DEFAULT_LOG_FORMAT);
966        LogFormat::parse(log_format)
967    }
968
969    pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy {
970        AutoPartitionStrategy::from(&self.properties)
971    }
972}
973
974impl TableInfo {
975    pub fn of(
976        table_path: TablePath,
977        table_id: i64,
978        schema_id: i32,
979        table_descriptor: TableDescriptor,
980        created_time: i64,
981        modified_time: i64,
982    ) -> TableInfo {
983        let TableDescriptor {
984            schema,
985            table_distribution,
986            comment,
987            partition_keys,
988            properties,
989            custom_properties,
990        } = table_descriptor;
991        let TableDistribution {
992            bucket_count,
993            bucket_keys,
994        } = table_distribution.unwrap();
995        TableInfo::new(
996            table_path,
997            table_id,
998            schema_id,
999            schema,
1000            bucket_keys,
1001            partition_keys,
1002            bucket_count.unwrap(),
1003            properties,
1004            custom_properties,
1005            comment,
1006            created_time,
1007            modified_time,
1008        )
1009    }
1010
1011    #[allow(clippy::too_many_arguments)]
1012    pub fn new(
1013        table_path: TablePath,
1014        table_id: TableId,
1015        schema_id: i32,
1016        schema: Schema,
1017        bucket_keys: Vec<String>,
1018        partition_keys: Arc<[String]>,
1019        num_buckets: i32,
1020        properties: HashMap<String, String>,
1021        custom_properties: HashMap<String, String>,
1022        comment: Option<String>,
1023        created_time: i64,
1024        modified_time: i64,
1025    ) -> Self {
1026        let row_type = schema.row_type.clone();
1027        let primary_keys: Vec<String> = schema
1028            .primary_key_column_names()
1029            .iter()
1030            .map(|col| (*col).to_string())
1031            .collect();
1032        let physical_primary_keys =
1033            Self::generate_physical_primary_key(&primary_keys, &partition_keys);
1034        let table_config = TableConfig::from_properties(properties.clone());
1035
1036        TableInfo {
1037            table_path,
1038            table_id,
1039            schema_id,
1040            schema,
1041            row_type,
1042            primary_keys,
1043            physical_primary_keys,
1044            bucket_keys,
1045            partition_keys,
1046            num_buckets,
1047            properties,
1048            table_config,
1049            custom_properties,
1050            comment,
1051            created_time,
1052            modified_time,
1053        }
1054    }
1055
1056    pub fn get_table_path(&self) -> &TablePath {
1057        &self.table_path
1058    }
1059
1060    pub fn get_table_id(&self) -> i64 {
1061        self.table_id
1062    }
1063
1064    pub fn get_schema_id(&self) -> i32 {
1065        self.schema_id
1066    }
1067
1068    pub fn get_schema(&self) -> &Schema {
1069        &self.schema
1070    }
1071
1072    pub fn get_row_type(&self) -> &RowType {
1073        &self.row_type
1074    }
1075
1076    pub fn has_primary_key(&self) -> bool {
1077        !self.primary_keys.is_empty()
1078    }
1079
1080    pub fn get_primary_keys(&self) -> &Vec<String> {
1081        &self.primary_keys
1082    }
1083
1084    pub fn get_physical_primary_keys(&self) -> &[String] {
1085        &self.physical_primary_keys
1086    }
1087
1088    pub fn has_bucket_key(&self) -> bool {
1089        !self.bucket_keys.is_empty()
1090    }
1091
1092    pub fn is_default_bucket_key(&self) -> bool {
1093        if self.has_primary_key() {
1094            self.bucket_keys == self.physical_primary_keys
1095        } else {
1096            self.bucket_keys.is_empty()
1097        }
1098    }
1099
1100    pub fn get_bucket_keys(&self) -> &[String] {
1101        &self.bucket_keys
1102    }
1103
1104    pub fn is_partitioned(&self) -> bool {
1105        !self.partition_keys.is_empty()
1106    }
1107
1108    pub fn is_auto_partitioned(&self) -> bool {
1109        self.is_partitioned()
1110            && self
1111                .table_config
1112                .get_auto_partition_strategy()
1113                .is_auto_partition_enabled()
1114    }
1115
1116    pub fn get_partition_keys(&self) -> &Arc<[String]> {
1117        &self.partition_keys
1118    }
1119
1120    pub fn get_num_buckets(&self) -> i32 {
1121        self.num_buckets
1122    }
1123
1124    pub fn get_properties(&self) -> &HashMap<String, String> {
1125        &self.properties
1126    }
1127
1128    pub fn get_table_config(&self) -> &TableConfig {
1129        &self.table_config
1130    }
1131
1132    pub fn get_custom_properties(&self) -> &HashMap<String, String> {
1133        &self.custom_properties
1134    }
1135
1136    pub fn get_comment(&self) -> Option<&str> {
1137        self.comment.as_deref()
1138    }
1139
1140    pub fn get_created_time(&self) -> i64 {
1141        self.created_time
1142    }
1143
1144    pub fn get_modified_time(&self) -> i64 {
1145        self.modified_time
1146    }
1147
1148    pub fn to_table_descriptor(&self) -> Result<TableDescriptor> {
1149        let mut builder = TableDescriptor::builder()
1150            .schema(self.schema.clone())
1151            .partitioned_by(self.partition_keys.to_vec())
1152            .distributed_by(Some(self.num_buckets), self.bucket_keys.clone())
1153            .properties(self.properties.clone())
1154            .custom_properties(self.custom_properties.clone());
1155
1156        if let Some(comment) = &self.comment {
1157            builder = builder.comment(comment.clone());
1158        }
1159
1160        builder.build()
1161    }
1162
1163    fn generate_physical_primary_key(
1164        primary_keys: &[String],
1165        partition_keys: &[String],
1166    ) -> Vec<String> {
1167        primary_keys
1168            .iter()
1169            .filter(|pk| !partition_keys.contains(*pk))
1170            .cloned()
1171            .collect()
1172    }
1173}
1174
1175impl Display for TableInfo {
1176    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1177        write!(
1178            f,
1179            "TableInfo{{ table_path={:?}, table_id={}, schema_id={}, schema={:?}, physical_primary_keys={:?}, bucket_keys={:?}, partition_keys={:?}, num_buckets={}, properties={:?}, custom_properties={:?}, comment={:?}, created_time={}, modified_time={} }}",
1180            self.table_path,
1181            self.table_id,
1182            self.schema_id,
1183            self.schema,
1184            self.physical_primary_keys,
1185            self.bucket_keys,
1186            self.partition_keys,
1187            self.num_buckets,
1188            self.properties,
1189            self.custom_properties,
1190            self.comment,
1191            self.created_time,
1192            self.modified_time
1193        )
1194    }
1195}
1196
1197#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
1198pub struct TableBucket {
1199    table_id: TableId,
1200    partition_id: Option<PartitionId>,
1201    bucket: BucketId,
1202}
1203
1204impl TableBucket {
1205    pub fn new(table_id: TableId, bucket: BucketId) -> Self {
1206        Self {
1207            table_id,
1208            partition_id: None,
1209            bucket,
1210        }
1211    }
1212
1213    pub fn new_with_partition(
1214        table_id: TableId,
1215        partition_id: Option<PartitionId>,
1216        bucket: BucketId,
1217    ) -> Self {
1218        TableBucket {
1219            table_id,
1220            partition_id,
1221            bucket,
1222        }
1223    }
1224
1225    pub fn table_id(&self) -> TableId {
1226        self.table_id
1227    }
1228
1229    pub fn bucket_id(&self) -> BucketId {
1230        self.bucket
1231    }
1232
1233    pub fn partition_id(&self) -> Option<PartitionId> {
1234        self.partition_id
1235    }
1236}
1237
1238impl Display for TableBucket {
1239    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1240        if let Some(partition_id) = self.partition_id {
1241            write!(
1242                f,
1243                "TableBucket(table_id={}, partition_id={}, bucket={})",
1244                self.table_id, partition_id, self.bucket
1245            )
1246        } else {
1247            write!(
1248                f,
1249                "TableBucket(table_id={}, bucket={})",
1250                self.table_id, self.bucket
1251            )
1252        }
1253    }
1254}
1255
1256#[derive(Debug, Clone, Serialize, Deserialize)]
1257pub struct LakeSnapshot {
1258    pub snapshot_id: i64,
1259    pub table_buckets_offset: HashMap<TableBucket, i64>,
1260}
1261
1262impl LakeSnapshot {
1263    pub fn new(snapshot_id: i64, table_buckets_offset: HashMap<TableBucket, i64>) -> Self {
1264        Self {
1265            snapshot_id,
1266            table_buckets_offset,
1267        }
1268    }
1269
1270    pub fn snapshot_id(&self) -> i64 {
1271        self.snapshot_id
1272    }
1273
1274    pub fn table_buckets_offset(&self) -> &HashMap<TableBucket, i64> {
1275        &self.table_buckets_offset
1276    }
1277}
1278
1279/// Tests for [`TablePath`].
1280#[cfg(test)]
1281mod tests {
1282    use super::*;
1283    use crate::metadata::DataTypes;
1284
1285    #[test]
1286    fn test_validate() {
1287        // assert valid name
1288        let path = TablePath::new("db_2-abc3".to_string(), "table-1_abc_2".to_string());
1289        assert!(TablePath::detect_invalid_name(path.database()).is_none());
1290        assert!(TablePath::detect_invalid_name(path.table()).is_none());
1291        assert_eq!(path.to_string(), "db_2-abc3.table-1_abc_2");
1292
1293        // assert invalid name prefix
1294        assert!(
1295            TablePath::validate_prefix("__table-1")
1296                .unwrap()
1297                .contains("'__' is not allowed as prefix")
1298        );
1299
1300        // check max length
1301        let long_name = "a".repeat(200);
1302        assert!(TablePath::detect_invalid_name(&long_name).is_none());
1303
1304        // assert invalid names
1305        assert_invalid_name("*abc", "'*abc' contains one or more characters other than");
1306        assert_invalid_name(
1307            "table.abc",
1308            "'table.abc' contains one or more characters other than",
1309        );
1310        assert_invalid_name("", "the empty string is not allowed");
1311        assert_invalid_name(" ", "' ' contains one or more characters other than");
1312        assert_invalid_name(".", "'.' is not allowed");
1313        assert_invalid_name("..", "'..' is not allowed");
1314        let invalid_long_name = "a".repeat(201);
1315        assert_invalid_name(
1316            &invalid_long_name,
1317            &format!(
1318                "the length of '{invalid_long_name}' is longer than the max allowed length {MAX_NAME_LENGTH}"
1319            ),
1320        );
1321    }
1322
1323    fn assert_invalid_name(name: &str, expected_message: &str) {
1324        let result = TablePath::detect_invalid_name(name);
1325        assert!(
1326            result.is_some(),
1327            "Expected '{name}' to be invalid, but it was valid"
1328        );
1329        assert!(
1330            result.as_ref().unwrap().contains(expected_message),
1331            "Expected message containing '{}', but got '{}'",
1332            expected_message,
1333            result.unwrap()
1334        );
1335    }
1336
1337    #[test]
1338    fn test_is_auto_partitioned() {
1339        let schema = Schema::builder()
1340            .column("id", DataTypes::int())
1341            .column("name", DataTypes::string())
1342            .primary_key(vec!["id".to_string()])
1343            .build()
1344            .unwrap();
1345
1346        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1347
1348        // 1. Not partitioned, auto partition disabled
1349        let mut properties = HashMap::new();
1350        let table_info = TableInfo::new(
1351            table_path.clone(),
1352            1,
1353            1,
1354            schema.clone(),
1355            vec!["id".to_string()],
1356            Arc::from(vec![]), // No partition keys
1357            1,
1358            properties.clone(),
1359            HashMap::new(),
1360            None,
1361            0,
1362            0,
1363        );
1364        assert!(!table_info.is_auto_partitioned());
1365
1366        // 2. Not partitioned, auto partition enabled
1367        properties.insert(
1368            "table.auto-partition.enabled".to_string(),
1369            "true".to_string(),
1370        );
1371        let table_info = TableInfo::new(
1372            table_path.clone(),
1373            1,
1374            1,
1375            schema.clone(),
1376            vec!["id".to_string()],
1377            Arc::from(vec![]), // No partition keys
1378            1,
1379            properties.clone(),
1380            HashMap::new(),
1381            None,
1382            0,
1383            0,
1384        );
1385        assert!(!table_info.is_auto_partitioned());
1386
1387        // 3. Partitioned, auto partition disabled
1388        properties.insert(
1389            "table.auto-partition.enabled".to_string(),
1390            "false".to_string(),
1391        );
1392        let table_info = TableInfo::new(
1393            table_path.clone(),
1394            1,
1395            1,
1396            schema.clone(),
1397            vec!["id".to_string()],
1398            Arc::from(vec!["name".to_string()]), // Partition keys
1399            1,
1400            properties.clone(),
1401            HashMap::new(),
1402            None,
1403            0,
1404            0,
1405        );
1406        assert!(!table_info.is_auto_partitioned());
1407
1408        // 4. Partitioned, auto partition enabled
1409        properties.insert(
1410            "table.auto-partition.enabled".to_string(),
1411            "true".to_string(),
1412        );
1413        let table_info = TableInfo::new(
1414            table_path.clone(),
1415            1,
1416            1,
1417            schema.clone(),
1418            vec!["id".to_string()],
1419            Arc::from(vec!["name".to_string()]), // Partition keys
1420            1,
1421            properties.clone(),
1422            HashMap::new(),
1423            None,
1424            0,
1425            0,
1426        );
1427        assert!(table_info.is_auto_partitioned());
1428    }
1429}