1use crate::compression::ArrowCompressionInfo;
19use crate::error::Error::IllegalArgument;
20use crate::error::{Error, Result};
21use crate::metadata::DataLakeFormat;
22use crate::metadata::datatype::{DataField, DataType, RowType};
23use crate::{BucketId, PartitionId, TableId};
24use core::fmt;
25use serde::{Deserialize, Serialize};
26use std::collections::{HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::sync::Arc;
29use strum_macros::EnumString;
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
32pub struct Column {
33 name: String,
34 data_type: DataType,
35 comment: Option<String>,
36}
37
38impl Column {
39 pub fn new<N: Into<String>>(name: N, data_type: DataType) -> Self {
40 Self {
41 name: name.into(),
42 data_type,
43 comment: None,
44 }
45 }
46
47 pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
48 self.comment = Some(comment.into());
49 self
50 }
51
52 pub fn with_data_type(&self, data_type: DataType) -> Self {
53 Self {
54 name: self.name.clone(),
55 data_type: data_type.clone(),
56 comment: self.comment.clone(),
57 }
58 }
59
60 pub fn name(&self) -> &str {
62 &self.name
63 }
64
65 pub fn data_type(&self) -> &DataType {
66 &self.data_type
67 }
68
69 pub fn comment(&self) -> Option<&str> {
70 self.comment.as_deref()
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
75pub struct PrimaryKey {
76 constraint_name: String,
77 column_names: Vec<String>,
78}
79
80impl PrimaryKey {
81 pub fn new<N: Into<String>>(constraint_name: N, column_names: Vec<String>) -> Self {
82 Self {
83 constraint_name: constraint_name.into(),
84 column_names,
85 }
86 }
87
88 pub fn constraint_name(&self) -> &str {
90 &self.constraint_name
91 }
92
93 pub fn column_names(&self) -> &[String] {
94 &self.column_names
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99pub struct Schema {
100 columns: Vec<Column>,
101 primary_key: Option<PrimaryKey>,
102 row_type: RowType,
103 auto_increment_col_names: Vec<String>,
104}
105
106impl Schema {
107 pub fn empty() -> Result<Self> {
108 Self::builder().build()
109 }
110
111 pub fn builder() -> SchemaBuilder {
112 SchemaBuilder::new()
113 }
114
115 pub fn columns(&self) -> &[Column] {
116 &self.columns
117 }
118
119 pub fn primary_key(&self) -> Option<&PrimaryKey> {
120 self.primary_key.as_ref()
121 }
122
123 pub fn row_type(&self) -> &RowType {
124 &self.row_type
125 }
126
127 pub fn primary_key_indexes(&self) -> Vec<usize> {
128 self.primary_key
129 .as_ref()
130 .map(|pk| {
131 pk.column_names
132 .iter()
133 .filter_map(|name| self.columns.iter().position(|c| &c.name == name))
134 .collect()
135 })
136 .unwrap_or_default()
137 }
138
139 pub fn primary_key_column_names(&self) -> Vec<&str> {
140 self.primary_key
141 .as_ref()
142 .map(|pk| pk.column_names.iter().map(|s| s.as_str()).collect())
143 .unwrap_or_default()
144 }
145
146 pub fn column_names(&self) -> Vec<&str> {
147 self.columns.iter().map(|c| c.name.as_str()).collect()
148 }
149
150 pub fn auto_increment_col_names(&self) -> &Vec<String> {
151 &self.auto_increment_col_names
152 }
153}
154
155#[derive(Debug, Default)]
156pub struct SchemaBuilder {
157 columns: Vec<Column>,
158 primary_key: Option<PrimaryKey>,
159 auto_increment_col_names: Vec<String>,
160}
161
162impl SchemaBuilder {
163 pub fn new() -> Self {
164 Self::default()
165 }
166
167 pub fn with_row_type(mut self, row_type: &DataType) -> Self {
168 match row_type {
169 DataType::Row(row) => {
170 for data_field in row.fields() {
171 self = self.column(&data_field.name, data_field.data_type.clone())
172 }
173 self
174 }
175 _ => {
176 panic!("data type must be row type")
177 }
178 }
179 }
180
181 pub fn column<N: Into<String>>(mut self, name: N, data_type: DataType) -> Self {
182 self.columns.push(Column::new(name.into(), data_type));
183 self
184 }
185
186 pub fn with_columns(mut self, columns: Vec<Column>) -> Self {
187 self.columns.extend_from_slice(columns.as_ref());
188 self
189 }
190
191 pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
192 if let Some(last) = self.columns.last_mut() {
193 *last = last.clone().with_comment(comment.into());
194 }
195 self
196 }
197
198 pub fn primary_key<I, S>(self, column_names: I) -> Self
199 where
200 I: IntoIterator<Item = S>,
201 S: Into<String>,
202 {
203 let names: Vec<String> = column_names.into_iter().map(|s| s.into()).collect();
204
205 let constraint_name = format!("PK_{}", names.join("_"));
206
207 self.primary_key_named(&constraint_name, names)
208 }
209
210 pub fn primary_key_named<N: Into<String>, P: Into<String>>(
211 mut self,
212 constraint_name: N,
213 column_names: Vec<P>,
214 ) -> Self {
215 self.primary_key = Some(PrimaryKey::new(
216 constraint_name.into(),
217 column_names.into_iter().map(|s| s.into()).collect(),
218 ));
219 self
220 }
221
222 pub fn enable_auto_increment<N: Into<String>>(mut self, column_name: N) -> Result<Self> {
227 if !self.auto_increment_col_names.is_empty() {
228 return Err(IllegalArgument {
229 message: "Multiple auto increment columns are not supported yet.".to_string(),
230 });
231 }
232
233 self.auto_increment_col_names.push(column_name.into());
234 Ok(self)
235 }
236
237 pub fn build(&self) -> Result<Schema> {
238 let columns = Self::normalize_columns(&self.columns, self.primary_key.as_ref())?;
239
240 let column_names: HashSet<_> = columns.iter().map(|c| &c.name).collect();
241 for auto_inc_col in &self.auto_increment_col_names {
242 if !column_names.contains(auto_inc_col) {
243 return Err(IllegalArgument {
244 message: format!(
245 "Auto increment column '{auto_inc_col}' is not found in the schema columns."
246 ),
247 });
248 }
249 }
250
251 let data_fields = columns
252 .iter()
253 .map(|c| DataField {
254 name: c.name.clone(),
255 data_type: c.data_type.clone(),
256 description: c.comment.clone(),
257 })
258 .collect();
259
260 Ok(Schema {
261 columns,
262 primary_key: self.primary_key.clone(),
263 row_type: RowType::new(data_fields),
264 auto_increment_col_names: self.auto_increment_col_names.clone(),
265 })
266 }
267
268 fn normalize_columns(
269 columns: &[Column],
270 primary_key: Option<&PrimaryKey>,
271 ) -> Result<Vec<Column>> {
272 let names: Vec<_> = columns.iter().map(|c| &c.name).collect();
273 if let Some(duplicates) = Self::find_duplicates(&names) {
274 return Err(Error::invalid_table(format!(
275 "Duplicate column names found: {duplicates:?}"
276 )));
277 }
278
279 let Some(pk) = primary_key else {
280 return Ok(columns.to_vec());
281 };
282
283 let pk_set: HashSet<_> = pk.column_names.iter().collect();
284 let all_columns: HashSet<_> = columns.iter().map(|c| &c.name).collect();
285 if !pk_set.is_subset(&all_columns) {
286 return Err(Error::invalid_table(format!(
287 "Primary key columns {pk_set:?} not found in schema"
288 )));
289 }
290
291 Ok(columns
292 .iter()
293 .map(|col| {
294 if pk_set.contains(&col.name) && col.data_type.is_nullable() {
295 col.with_data_type(col.data_type.as_non_nullable())
296 } else {
297 col.clone()
298 }
299 })
300 .collect())
301 }
302
303 fn find_duplicates<'a>(names: &'a [&String]) -> Option<HashSet<&'a String>> {
304 let mut seen = HashSet::new();
305 let mut duplicates = HashSet::new();
306
307 for name in names {
308 if !seen.insert(name) {
309 duplicates.insert(*name);
310 }
311 }
312
313 if duplicates.is_empty() {
314 None
315 } else {
316 Some(duplicates)
317 }
318 }
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
323pub struct TableDistribution {
324 bucket_count: Option<i32>,
325 bucket_keys: Vec<String>,
326}
327
328impl TableDistribution {
329 pub fn bucket_keys(&self) -> &[String] {
330 &self.bucket_keys
331 }
332
333 pub fn bucket_count(&self) -> Option<i32> {
334 self.bucket_count
335 }
336}
337
338#[derive(Debug, Default)]
339pub struct TableDescriptorBuilder {
340 schema: Option<Schema>,
341 properties: HashMap<String, String>,
342 custom_properties: HashMap<String, String>,
343 partition_keys: Arc<[String]>,
344 comment: Option<String>,
345 table_distribution: Option<TableDistribution>,
346}
347
348impl TableDescriptorBuilder {
349 pub fn new() -> Self {
350 Self::default()
351 }
352
353 pub fn schema(mut self, schema: Schema) -> Self {
354 self.schema = Some(schema);
355 self
356 }
357
358 pub fn log_format(mut self, log_format: LogFormat) -> Self {
359 self.properties
360 .insert("table.log.format".to_string(), log_format.to_string());
361 self
362 }
363
364 pub fn kv_format(mut self, kv_format: KvFormat) -> Self {
365 self.properties
366 .insert("table.kv.format".to_string(), kv_format.to_string());
367 self
368 }
369
370 pub fn property<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
371 self.properties.insert(key.into(), value.into());
372 self
373 }
374
375 pub fn properties<K: Into<String>, V: Into<String>>(
376 mut self,
377 properties: HashMap<K, V>,
378 ) -> Self {
379 for (k, v) in properties {
380 self.properties.insert(k.into(), v.into());
381 }
382 self
383 }
384
385 pub fn custom_property<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
386 self.custom_properties.insert(key.into(), value.into());
387 self
388 }
389
390 pub fn custom_properties<K: Into<String>, V: Into<String>>(
391 mut self,
392 custom_properties: HashMap<K, V>,
393 ) -> Self {
394 for (k, v) in custom_properties {
395 self.custom_properties.insert(k.into(), v.into());
396 }
397 self
398 }
399
400 pub fn partitioned_by<P: Into<String>>(mut self, partition_keys: Vec<P>) -> Self {
401 self.partition_keys = Arc::from(
402 partition_keys
403 .into_iter()
404 .map(|s| s.into())
405 .collect::<Vec<String>>(),
406 );
407 self
408 }
409
410 pub fn distributed_by(mut self, bucket_count: Option<i32>, bucket_keys: Vec<String>) -> Self {
411 self.table_distribution = Some(TableDistribution {
412 bucket_count,
413 bucket_keys,
414 });
415 self
416 }
417
418 pub fn comment<S: Into<String>>(mut self, comment: S) -> Self {
419 self.comment = Some(comment.into());
420 self
421 }
422
423 pub fn build(self) -> Result<TableDescriptor> {
424 let schema = self.schema.expect("Schema must be set");
425 let table_distribution = TableDescriptor::normalize_distribution(
426 &schema,
427 &self.partition_keys,
428 self.table_distribution,
429 )?;
430 Ok(TableDescriptor {
431 schema,
432 comment: self.comment,
433 partition_keys: self.partition_keys,
434 table_distribution,
435 properties: self.properties,
436 custom_properties: self.custom_properties,
437 })
438 }
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
442pub struct TableDescriptor {
443 schema: Schema,
444 comment: Option<String>,
445 partition_keys: Arc<[String]>,
446 table_distribution: Option<TableDistribution>,
447 properties: HashMap<String, String>,
448 custom_properties: HashMap<String, String>,
449}
450
451impl TableDescriptor {
452 pub fn builder() -> TableDescriptorBuilder {
453 TableDescriptorBuilder::new()
454 }
455
456 pub fn schema(&self) -> &Schema {
457 &self.schema
458 }
459
460 pub fn bucket_keys(&self) -> Vec<&str> {
461 self.table_distribution
462 .as_ref()
463 .map(|td| td.bucket_keys.iter().map(|s| s.as_str()).collect())
464 .unwrap_or_default()
465 }
466
467 pub fn is_default_bucket_key(&self) -> Result<bool> {
468 if self.schema.primary_key().is_some() {
469 Ok(self.bucket_keys()
470 == Self::default_bucket_key_of_primary_key_table(
471 self.schema(),
472 &self.partition_keys,
473 )?
474 .iter()
475 .map(|s| s.as_str())
476 .collect::<Vec<_>>())
477 } else {
478 Ok(self.bucket_keys().is_empty())
479 }
480 }
481
482 pub fn is_partitioned(&self) -> bool {
483 !self.partition_keys.is_empty()
484 }
485
486 pub fn has_primary_key(&self) -> bool {
487 self.schema.primary_key().is_some()
488 }
489
490 pub fn partition_keys(&self) -> &[String] {
491 &self.partition_keys
492 }
493
494 pub fn table_distribution(&self) -> Option<&TableDistribution> {
495 self.table_distribution.as_ref()
496 }
497
498 pub fn properties(&self) -> &HashMap<String, String> {
499 &self.properties
500 }
501
502 pub fn custom_properties(&self) -> &HashMap<String, String> {
503 &self.custom_properties
504 }
505
506 pub fn replication_factor(&self) -> Result<i32> {
507 self.properties
508 .get("table.replication.factor")
509 .ok_or_else(|| Error::invalid_table("Replication factor is not set"))?
510 .parse()
511 .map_err(|_e| Error::invalid_table("Replication factor can't be converted to int"))
512 }
513
514 pub fn with_properties<K: Into<String>, V: Into<String>>(
515 &self,
516 new_properties: HashMap<K, V>,
517 ) -> Self {
518 let mut properties = HashMap::new();
519 for (k, v) in new_properties {
520 properties.insert(k.into(), v.into());
521 }
522 Self {
523 properties,
524 ..self.clone()
525 }
526 }
527
528 pub fn with_replication_factor(&self, new_replication_factor: i32) -> Self {
529 let mut properties = self.properties.clone();
530 properties.insert(
531 "table.replication.factor".to_string(),
532 new_replication_factor.to_string(),
533 );
534 self.with_properties(properties)
535 }
536
537 pub fn with_bucket_count(&self, new_bucket_count: i32) -> Self {
538 Self {
539 table_distribution: Some(TableDistribution {
540 bucket_count: Some(new_bucket_count),
541 bucket_keys: self
542 .table_distribution
543 .as_ref()
544 .map(|td| td.bucket_keys.clone())
545 .unwrap_or_default(),
546 }),
547 ..self.clone()
548 }
549 }
550
551 pub fn comment(&self) -> Option<&str> {
552 self.comment.as_deref()
553 }
554
555 fn default_bucket_key_of_primary_key_table(
556 schema: &Schema,
557 partition_keys: &[String],
558 ) -> Result<Vec<String>> {
559 let mut bucket_keys = schema
560 .primary_key()
561 .expect("Primary key must be set")
562 .column_names()
563 .to_vec();
564
565 bucket_keys.retain(|k| !partition_keys.contains(k));
566
567 if bucket_keys.is_empty() {
568 return Err(Error::invalid_table(format!(
569 "Primary Key constraint {:?} should not be same with partition fields {:?}.",
570 schema.primary_key().unwrap().column_names(),
571 partition_keys
572 )));
573 }
574
575 Ok(bucket_keys)
576 }
577
578 fn normalize_distribution(
579 schema: &Schema,
580 partition_keys: &[String],
581 origin_distribution: Option<TableDistribution>,
582 ) -> Result<Option<TableDistribution>> {
583 if let Some(distribution) = origin_distribution {
584 if distribution
585 .bucket_keys
586 .iter()
587 .any(|k| partition_keys.contains(k))
588 {
589 return Err(Error::invalid_table(format!(
590 "Bucket key {:?} shouldn't include any column in partition keys {:?}.",
591 distribution.bucket_keys, partition_keys
592 )));
593 }
594
595 return if let Some(pk) = schema.primary_key() {
596 if distribution.bucket_keys.is_empty() {
597 Ok(Some(TableDistribution {
598 bucket_count: distribution.bucket_count,
599 bucket_keys: Self::default_bucket_key_of_primary_key_table(
600 schema,
601 partition_keys,
602 )?,
603 }))
604 } else {
605 let pk_columns: HashSet<_> = pk.column_names().iter().collect();
606 if !distribution
607 .bucket_keys
608 .iter()
609 .all(|k| pk_columns.contains(k))
610 {
611 return Err(Error::invalid_table(format!(
612 "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. \
613 The primary keys are {:?}, the partition keys are {:?}, but the user-defined bucket keys are {:?}.",
614 pk.column_names(),
615 partition_keys,
616 distribution.bucket_keys
617 )));
618 }
619 Ok(Some(distribution))
620 }
621 } else {
622 Ok(Some(distribution))
623 };
624 } else if schema.primary_key().is_some() {
625 return Ok(Some(TableDistribution {
626 bucket_count: None,
627 bucket_keys: Self::default_bucket_key_of_primary_key_table(schema, partition_keys)?,
628 }));
629 }
630
631 Ok(None)
632 }
633}
634
635#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
636pub enum LogFormat {
637 ARROW,
638 INDEXED,
639}
640
641impl Display for LogFormat {
642 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
643 match self {
644 LogFormat::ARROW => {
645 write!(f, "ARROW")?;
646 }
647 LogFormat::INDEXED => {
648 write!(f, "INDEXED")?;
649 }
650 }
651 Ok(())
652 }
653}
654
655impl LogFormat {
656 pub fn parse(s: &str) -> Result<Self> {
657 match s.to_uppercase().as_str() {
658 "ARROW" => Ok(LogFormat::ARROW),
659 "INDEXED" => Ok(LogFormat::INDEXED),
660 _ => Err(Error::invalid_table(format!("Unknown log format: {s}"))),
661 }
662 }
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, EnumString)]
666pub enum KvFormat {
667 INDEXED,
668 COMPACTED,
669}
670
671impl Display for KvFormat {
672 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
673 match self {
674 KvFormat::COMPACTED => write!(f, "COMPACTED")?,
675 KvFormat::INDEXED => write!(f, "INDEXED")?,
676 }
677 Ok(())
678 }
679}
680
681impl KvFormat {
682 pub fn parse(s: &str) -> Result<Self> {
683 match s.to_uppercase().as_str() {
684 "INDEXED" => Ok(KvFormat::INDEXED),
685 "COMPACTED" => Ok(KvFormat::COMPACTED),
686 _ => Err(Error::invalid_table(format!("Unknown kv format: {s}"))),
687 }
688 }
689}
690
691#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
692pub struct TablePath {
693 database: String,
694 table: String,
695}
696
697impl Display for TablePath {
698 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
699 write!(f, "{}.{}", self.database, self.table)
700 }
701}
702
703const MAX_NAME_LENGTH: usize = 200;
704
705const INTERNAL_NAME_PREFIX: &str = "__";
706
707impl TablePath {
708 pub fn new<D: Into<String>, T: Into<String>>(db: D, tbl: T) -> Self {
709 TablePath {
710 database: db.into(),
711 table: tbl.into(),
712 }
713 }
714
715 #[inline]
716 pub fn database(&self) -> &str {
717 &self.database
718 }
719
720 #[inline]
721 pub fn table(&self) -> &str {
722 &self.table
723 }
724
725 pub fn detect_invalid_name(identifier: &str) -> Option<String> {
726 if identifier.is_empty() {
727 return Some("the empty string is not allowed".to_string());
728 }
729 if identifier == "." {
730 return Some("'.' is not allowed".to_string());
731 }
732 if identifier == ".." {
733 return Some("'..' is not allowed".to_string());
734 }
735 if identifier.len() > MAX_NAME_LENGTH {
736 return Some(format!(
737 "the length of '{identifier}' is longer than the max allowed length {MAX_NAME_LENGTH}"
738 ));
739 }
740 if Self::contains_invalid_pattern(identifier) {
741 return Some(format!(
742 "'{identifier}' contains one or more characters other than ASCII alphanumerics, '_' and '-'"
743 ));
744 }
745 None
746 }
747
748 pub fn validate_prefix(identifier: &str) -> Option<String> {
749 if identifier.starts_with(INTERNAL_NAME_PREFIX) {
750 return Some(format!(
751 "'{INTERNAL_NAME_PREFIX}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server"
752 ));
753 }
754 None
755 }
756
757 fn contains_invalid_pattern(identifier: &str) -> bool {
759 for c in identifier.chars() {
760 let valid_char = c.is_ascii_alphanumeric() || c == '_' || c == '-';
761 if !valid_char {
762 return true;
763 }
764 }
765 false
766 }
767}
768
769#[derive(Debug, Clone, PartialEq, Eq, Hash)]
773pub struct PhysicalTablePath {
774 table_path: Arc<TablePath>,
775 partition_name: Option<String>,
776}
777
778impl PhysicalTablePath {
779 pub fn of(table_path: Arc<TablePath>) -> Self {
780 Self {
781 table_path,
782 partition_name: None,
783 }
784 }
785
786 pub fn of_partitioned(table_path: Arc<TablePath>, partition_name: Option<String>) -> Self {
787 Self {
788 table_path,
789 partition_name,
790 }
791 }
792
793 pub fn of_with_names<D: Into<String>, T: Into<String>, P: Into<String>>(
794 database_name: D,
795 table_name: T,
796 partition_name: Option<P>,
797 ) -> Self {
798 Self {
799 table_path: Arc::new(TablePath::new(database_name, table_name)),
800 partition_name: partition_name.map(|p| p.into()),
801 }
802 }
803
804 pub fn get_table_path(&self) -> &TablePath {
805 &self.table_path
806 }
807
808 pub fn get_database_name(&self) -> &str {
809 self.table_path.database()
810 }
811
812 pub fn get_table_name(&self) -> &str {
813 self.table_path.table()
814 }
815
816 pub fn get_partition_name(&self) -> Option<&String> {
817 self.partition_name.as_ref()
818 }
819}
820
821impl Display for PhysicalTablePath {
822 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
823 match &self.partition_name {
824 Some(partition) => write!(f, "{}(p={})", self.table_path, partition),
825 None => write!(f, "{}", self.table_path),
826 }
827 }
828}
829
830#[derive(Debug, Clone)]
831pub struct TableInfo {
832 pub table_path: TablePath,
833 pub table_id: TableId,
834 pub schema_id: i32,
835 pub schema: Schema,
836 pub row_type: RowType,
837 pub primary_keys: Vec<String>,
838 pub physical_primary_keys: Vec<String>,
839 pub bucket_keys: Vec<String>,
840 pub partition_keys: Arc<[String]>,
841 pub num_buckets: i32,
842 pub properties: HashMap<String, String>,
843 pub table_config: TableConfig,
844 pub custom_properties: HashMap<String, String>,
845 pub comment: Option<String>,
846 pub created_time: i64,
847 pub modified_time: i64,
848}
849
850impl TableInfo {
851 pub fn row_type(&self) -> &RowType {
852 &self.row_type
853 }
854}
855
856#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
857pub struct AutoPartitionStrategy {
858 auto_partition_enabled: bool,
859 auto_partition_key: Option<String>,
860 auto_partition_time_unit: String,
861 auto_partition_num_precreate: i32,
862 auto_partition_num_retention: i32,
863 auto_partition_timezone: String,
864}
865
866impl AutoPartitionStrategy {
867 pub fn from(properties: &HashMap<String, String>) -> Self {
868 Self {
869 auto_partition_enabled: properties
870 .get("table.auto-partition.enabled")
871 .and_then(|s| s.parse().ok())
872 .unwrap_or(false),
873 auto_partition_key: properties
874 .get("table.auto-partition.key")
875 .map(|s| s.to_string()),
876 auto_partition_time_unit: properties
877 .get("table.auto-partition.time-unit")
878 .map(|s| s.to_string())
879 .unwrap_or_else(|| "DAY".to_string()),
880 auto_partition_num_precreate: properties
881 .get("table.auto-partition.num-precreate")
882 .and_then(|s| s.parse().ok())
883 .unwrap_or(2),
884 auto_partition_num_retention: properties
885 .get("table.auto-partition.num-retention")
886 .and_then(|s| s.parse().ok())
887 .unwrap_or(7),
888 auto_partition_timezone: properties
889 .get("table.auto-partition.time-zone")
890 .map(|s| s.to_string())
891 .unwrap_or_else(|| {
892 jiff::tz::TimeZone::system()
893 .iana_name()
894 .unwrap_or("UTC")
895 .to_string()
896 }),
897 }
898 }
899
900 pub fn is_auto_partition_enabled(&self) -> bool {
901 self.auto_partition_enabled
902 }
903
904 pub fn key(&self) -> Option<&str> {
905 self.auto_partition_key.as_deref()
906 }
907
908 pub fn time_unit(&self) -> &str {
909 &self.auto_partition_time_unit
910 }
911
912 pub fn num_precreate(&self) -> i32 {
913 self.auto_partition_num_precreate
914 }
915
916 pub fn num_retention(&self) -> i32 {
917 self.auto_partition_num_retention
918 }
919
920 pub fn timezone(&self) -> &str {
921 &self.auto_partition_timezone
922 }
923}
924
925#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
926pub struct TableConfig {
927 pub properties: HashMap<String, String>,
928}
929
930impl TableConfig {
931 pub fn from_properties(properties: HashMap<String, String>) -> Self {
932 TableConfig { properties }
933 }
934
935 pub fn get_arrow_compression_info(&self) -> Result<ArrowCompressionInfo> {
936 ArrowCompressionInfo::from_conf(&self.properties)
937 }
938
939 pub fn get_datalake_format(&self) -> Result<Option<DataLakeFormat>> {
941 self.properties
942 .get("table.datalake.format")
943 .map(|f| f.parse().map_err(Error::from))
944 .transpose()
945 }
946
947 pub fn get_kv_format(&self) -> Result<KvFormat> {
948 const DEFAULT_KV_FORMAT: &str = "COMPACTED";
950 let kv_format = self
951 .properties
952 .get("table.kv.format")
953 .map(String::as_str)
954 .unwrap_or(DEFAULT_KV_FORMAT);
955 kv_format.parse().map_err(Into::into)
956 }
957
958 pub fn get_log_format(&self) -> Result<LogFormat> {
959 const DEFAULT_LOG_FORMAT: &str = "ARROW";
961 let log_format = self
962 .properties
963 .get("table.log.format")
964 .map(String::as_str)
965 .unwrap_or(DEFAULT_LOG_FORMAT);
966 LogFormat::parse(log_format)
967 }
968
969 pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy {
970 AutoPartitionStrategy::from(&self.properties)
971 }
972}
973
974impl TableInfo {
975 pub fn of(
976 table_path: TablePath,
977 table_id: i64,
978 schema_id: i32,
979 table_descriptor: TableDescriptor,
980 created_time: i64,
981 modified_time: i64,
982 ) -> TableInfo {
983 let TableDescriptor {
984 schema,
985 table_distribution,
986 comment,
987 partition_keys,
988 properties,
989 custom_properties,
990 } = table_descriptor;
991 let TableDistribution {
992 bucket_count,
993 bucket_keys,
994 } = table_distribution.unwrap();
995 TableInfo::new(
996 table_path,
997 table_id,
998 schema_id,
999 schema,
1000 bucket_keys,
1001 partition_keys,
1002 bucket_count.unwrap(),
1003 properties,
1004 custom_properties,
1005 comment,
1006 created_time,
1007 modified_time,
1008 )
1009 }
1010
1011 #[allow(clippy::too_many_arguments)]
1012 pub fn new(
1013 table_path: TablePath,
1014 table_id: TableId,
1015 schema_id: i32,
1016 schema: Schema,
1017 bucket_keys: Vec<String>,
1018 partition_keys: Arc<[String]>,
1019 num_buckets: i32,
1020 properties: HashMap<String, String>,
1021 custom_properties: HashMap<String, String>,
1022 comment: Option<String>,
1023 created_time: i64,
1024 modified_time: i64,
1025 ) -> Self {
1026 let row_type = schema.row_type.clone();
1027 let primary_keys: Vec<String> = schema
1028 .primary_key_column_names()
1029 .iter()
1030 .map(|col| (*col).to_string())
1031 .collect();
1032 let physical_primary_keys =
1033 Self::generate_physical_primary_key(&primary_keys, &partition_keys);
1034 let table_config = TableConfig::from_properties(properties.clone());
1035
1036 TableInfo {
1037 table_path,
1038 table_id,
1039 schema_id,
1040 schema,
1041 row_type,
1042 primary_keys,
1043 physical_primary_keys,
1044 bucket_keys,
1045 partition_keys,
1046 num_buckets,
1047 properties,
1048 table_config,
1049 custom_properties,
1050 comment,
1051 created_time,
1052 modified_time,
1053 }
1054 }
1055
1056 pub fn get_table_path(&self) -> &TablePath {
1057 &self.table_path
1058 }
1059
1060 pub fn get_table_id(&self) -> i64 {
1061 self.table_id
1062 }
1063
1064 pub fn get_schema_id(&self) -> i32 {
1065 self.schema_id
1066 }
1067
1068 pub fn get_schema(&self) -> &Schema {
1069 &self.schema
1070 }
1071
1072 pub fn get_row_type(&self) -> &RowType {
1073 &self.row_type
1074 }
1075
1076 pub fn has_primary_key(&self) -> bool {
1077 !self.primary_keys.is_empty()
1078 }
1079
1080 pub fn get_primary_keys(&self) -> &Vec<String> {
1081 &self.primary_keys
1082 }
1083
1084 pub fn get_physical_primary_keys(&self) -> &[String] {
1085 &self.physical_primary_keys
1086 }
1087
1088 pub fn has_bucket_key(&self) -> bool {
1089 !self.bucket_keys.is_empty()
1090 }
1091
1092 pub fn is_default_bucket_key(&self) -> bool {
1093 if self.has_primary_key() {
1094 self.bucket_keys == self.physical_primary_keys
1095 } else {
1096 self.bucket_keys.is_empty()
1097 }
1098 }
1099
1100 pub fn get_bucket_keys(&self) -> &[String] {
1101 &self.bucket_keys
1102 }
1103
1104 pub fn is_partitioned(&self) -> bool {
1105 !self.partition_keys.is_empty()
1106 }
1107
1108 pub fn is_auto_partitioned(&self) -> bool {
1109 self.is_partitioned()
1110 && self
1111 .table_config
1112 .get_auto_partition_strategy()
1113 .is_auto_partition_enabled()
1114 }
1115
1116 pub fn get_partition_keys(&self) -> &Arc<[String]> {
1117 &self.partition_keys
1118 }
1119
1120 pub fn get_num_buckets(&self) -> i32 {
1121 self.num_buckets
1122 }
1123
1124 pub fn get_properties(&self) -> &HashMap<String, String> {
1125 &self.properties
1126 }
1127
1128 pub fn get_table_config(&self) -> &TableConfig {
1129 &self.table_config
1130 }
1131
1132 pub fn get_custom_properties(&self) -> &HashMap<String, String> {
1133 &self.custom_properties
1134 }
1135
1136 pub fn get_comment(&self) -> Option<&str> {
1137 self.comment.as_deref()
1138 }
1139
1140 pub fn get_created_time(&self) -> i64 {
1141 self.created_time
1142 }
1143
1144 pub fn get_modified_time(&self) -> i64 {
1145 self.modified_time
1146 }
1147
1148 pub fn to_table_descriptor(&self) -> Result<TableDescriptor> {
1149 let mut builder = TableDescriptor::builder()
1150 .schema(self.schema.clone())
1151 .partitioned_by(self.partition_keys.to_vec())
1152 .distributed_by(Some(self.num_buckets), self.bucket_keys.clone())
1153 .properties(self.properties.clone())
1154 .custom_properties(self.custom_properties.clone());
1155
1156 if let Some(comment) = &self.comment {
1157 builder = builder.comment(comment.clone());
1158 }
1159
1160 builder.build()
1161 }
1162
1163 fn generate_physical_primary_key(
1164 primary_keys: &[String],
1165 partition_keys: &[String],
1166 ) -> Vec<String> {
1167 primary_keys
1168 .iter()
1169 .filter(|pk| !partition_keys.contains(*pk))
1170 .cloned()
1171 .collect()
1172 }
1173}
1174
1175impl Display for TableInfo {
1176 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1177 write!(
1178 f,
1179 "TableInfo{{ table_path={:?}, table_id={}, schema_id={}, schema={:?}, physical_primary_keys={:?}, bucket_keys={:?}, partition_keys={:?}, num_buckets={}, properties={:?}, custom_properties={:?}, comment={:?}, created_time={}, modified_time={} }}",
1180 self.table_path,
1181 self.table_id,
1182 self.schema_id,
1183 self.schema,
1184 self.physical_primary_keys,
1185 self.bucket_keys,
1186 self.partition_keys,
1187 self.num_buckets,
1188 self.properties,
1189 self.custom_properties,
1190 self.comment,
1191 self.created_time,
1192 self.modified_time
1193 )
1194 }
1195}
1196
1197#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
1198pub struct TableBucket {
1199 table_id: TableId,
1200 partition_id: Option<PartitionId>,
1201 bucket: BucketId,
1202}
1203
1204impl TableBucket {
1205 pub fn new(table_id: TableId, bucket: BucketId) -> Self {
1206 Self {
1207 table_id,
1208 partition_id: None,
1209 bucket,
1210 }
1211 }
1212
1213 pub fn new_with_partition(
1214 table_id: TableId,
1215 partition_id: Option<PartitionId>,
1216 bucket: BucketId,
1217 ) -> Self {
1218 TableBucket {
1219 table_id,
1220 partition_id,
1221 bucket,
1222 }
1223 }
1224
1225 pub fn table_id(&self) -> TableId {
1226 self.table_id
1227 }
1228
1229 pub fn bucket_id(&self) -> BucketId {
1230 self.bucket
1231 }
1232
1233 pub fn partition_id(&self) -> Option<PartitionId> {
1234 self.partition_id
1235 }
1236}
1237
1238impl Display for TableBucket {
1239 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1240 if let Some(partition_id) = self.partition_id {
1241 write!(
1242 f,
1243 "TableBucket(table_id={}, partition_id={}, bucket={})",
1244 self.table_id, partition_id, self.bucket
1245 )
1246 } else {
1247 write!(
1248 f,
1249 "TableBucket(table_id={}, bucket={})",
1250 self.table_id, self.bucket
1251 )
1252 }
1253 }
1254}
1255
1256#[derive(Debug, Clone, Serialize, Deserialize)]
1257pub struct LakeSnapshot {
1258 pub snapshot_id: i64,
1259 pub table_buckets_offset: HashMap<TableBucket, i64>,
1260}
1261
1262impl LakeSnapshot {
1263 pub fn new(snapshot_id: i64, table_buckets_offset: HashMap<TableBucket, i64>) -> Self {
1264 Self {
1265 snapshot_id,
1266 table_buckets_offset,
1267 }
1268 }
1269
1270 pub fn snapshot_id(&self) -> i64 {
1271 self.snapshot_id
1272 }
1273
1274 pub fn table_buckets_offset(&self) -> &HashMap<TableBucket, i64> {
1275 &self.table_buckets_offset
1276 }
1277}
1278
1279#[cfg(test)]
1281mod tests {
1282 use super::*;
1283 use crate::metadata::DataTypes;
1284
1285 #[test]
1286 fn test_validate() {
1287 let path = TablePath::new("db_2-abc3".to_string(), "table-1_abc_2".to_string());
1289 assert!(TablePath::detect_invalid_name(path.database()).is_none());
1290 assert!(TablePath::detect_invalid_name(path.table()).is_none());
1291 assert_eq!(path.to_string(), "db_2-abc3.table-1_abc_2");
1292
1293 assert!(
1295 TablePath::validate_prefix("__table-1")
1296 .unwrap()
1297 .contains("'__' is not allowed as prefix")
1298 );
1299
1300 let long_name = "a".repeat(200);
1302 assert!(TablePath::detect_invalid_name(&long_name).is_none());
1303
1304 assert_invalid_name("*abc", "'*abc' contains one or more characters other than");
1306 assert_invalid_name(
1307 "table.abc",
1308 "'table.abc' contains one or more characters other than",
1309 );
1310 assert_invalid_name("", "the empty string is not allowed");
1311 assert_invalid_name(" ", "' ' contains one or more characters other than");
1312 assert_invalid_name(".", "'.' is not allowed");
1313 assert_invalid_name("..", "'..' is not allowed");
1314 let invalid_long_name = "a".repeat(201);
1315 assert_invalid_name(
1316 &invalid_long_name,
1317 &format!(
1318 "the length of '{invalid_long_name}' is longer than the max allowed length {MAX_NAME_LENGTH}"
1319 ),
1320 );
1321 }
1322
1323 fn assert_invalid_name(name: &str, expected_message: &str) {
1324 let result = TablePath::detect_invalid_name(name);
1325 assert!(
1326 result.is_some(),
1327 "Expected '{name}' to be invalid, but it was valid"
1328 );
1329 assert!(
1330 result.as_ref().unwrap().contains(expected_message),
1331 "Expected message containing '{}', but got '{}'",
1332 expected_message,
1333 result.unwrap()
1334 );
1335 }
1336
1337 #[test]
1338 fn test_is_auto_partitioned() {
1339 let schema = Schema::builder()
1340 .column("id", DataTypes::int())
1341 .column("name", DataTypes::string())
1342 .primary_key(vec!["id".to_string()])
1343 .build()
1344 .unwrap();
1345
1346 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1347
1348 let mut properties = HashMap::new();
1350 let table_info = TableInfo::new(
1351 table_path.clone(),
1352 1,
1353 1,
1354 schema.clone(),
1355 vec!["id".to_string()],
1356 Arc::from(vec![]), 1,
1358 properties.clone(),
1359 HashMap::new(),
1360 None,
1361 0,
1362 0,
1363 );
1364 assert!(!table_info.is_auto_partitioned());
1365
1366 properties.insert(
1368 "table.auto-partition.enabled".to_string(),
1369 "true".to_string(),
1370 );
1371 let table_info = TableInfo::new(
1372 table_path.clone(),
1373 1,
1374 1,
1375 schema.clone(),
1376 vec!["id".to_string()],
1377 Arc::from(vec![]), 1,
1379 properties.clone(),
1380 HashMap::new(),
1381 None,
1382 0,
1383 0,
1384 );
1385 assert!(!table_info.is_auto_partitioned());
1386
1387 properties.insert(
1389 "table.auto-partition.enabled".to_string(),
1390 "false".to_string(),
1391 );
1392 let table_info = TableInfo::new(
1393 table_path.clone(),
1394 1,
1395 1,
1396 schema.clone(),
1397 vec!["id".to_string()],
1398 Arc::from(vec!["name".to_string()]), 1,
1400 properties.clone(),
1401 HashMap::new(),
1402 None,
1403 0,
1404 0,
1405 );
1406 assert!(!table_info.is_auto_partitioned());
1407
1408 properties.insert(
1410 "table.auto-partition.enabled".to_string(),
1411 "true".to_string(),
1412 );
1413 let table_info = TableInfo::new(
1414 table_path.clone(),
1415 1,
1416 1,
1417 schema.clone(),
1418 vec!["id".to_string()],
1419 Arc::from(vec!["name".to_string()]), 1,
1421 properties.clone(),
1422 HashMap::new(),
1423 None,
1424 0,
1425 0,
1426 );
1427 assert!(table_info.is_auto_partitioned());
1428 }
1429}