datafusion_datasource/
table_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helper struct to manage table schemas with partition columns
19
20use arrow::datatypes::{FieldRef, SchemaBuilder, SchemaRef};
21use std::sync::Arc;
22
23/// Helper to hold table schema information for partitioned data sources.
24///
25/// When reading partitioned data (such as Hive-style partitioning), a table's schema
26/// consists of two parts:
27/// 1. **File schema**: The schema of the actual data files on disk
28/// 2. **Partition columns**: Columns that are encoded in the directory structure,
29///    not stored in the files themselves
30///
31/// # Example: Partitioned Table
32///
33/// Consider a table with the following directory structure:
34/// ```text
35/// /data/date=2025-10-10/region=us-west/data.parquet
36/// /data/date=2025-10-11/region=us-east/data.parquet
37/// ```
38///
39/// In this case:
40/// - **File schema**: The schema of `data.parquet` files (e.g., `[user_id, amount]`)
41/// - **Partition columns**: `[date, region]` extracted from the directory path
42/// - **Table schema**: The full schema combining both (e.g., `[user_id, amount, date, region]`)
43///
44/// # When to Use
45///
46/// Use `TableSchema` when:
47/// - Reading partitioned data sources (Parquet, CSV, etc. with Hive-style partitioning)
48/// - You need to efficiently access different schema representations without reconstructing them
49/// - You want to avoid repeatedly concatenating file and partition schemas
50///
51/// For non-partitioned data or when working with a single schema representation,
52/// working directly with Arrow's `Schema` or `SchemaRef` is simpler.
53///
54/// # Performance
55///
56/// This struct pre-computes and caches the full table schema, allowing cheap references
57/// to any representation without repeated allocations or reconstructions.
58#[derive(Debug, Clone)]
59pub struct TableSchema {
60    /// The schema of the data files themselves, without partition columns.
61    ///
62    /// For example, if your Parquet files contain `[user_id, amount]`,
63    /// this field holds that schema.
64    file_schema: SchemaRef,
65
66    /// Columns that are derived from the directory structure (partitioning scheme).
67    ///
68    /// For Hive-style partitioning like `/date=2025-10-10/region=us-west/`,
69    /// this contains the `date` and `region` fields.
70    ///
71    /// These columns are NOT present in the data files but are appended to each
72    /// row during query execution based on the file's location.
73    table_partition_cols: Arc<Vec<FieldRef>>,
74
75    /// The complete table schema: file_schema columns followed by partition columns.
76    ///
77    /// This is pre-computed during construction by concatenating `file_schema`
78    /// and `table_partition_cols`, so it can be returned as a cheap reference.
79    table_schema: SchemaRef,
80}
81
82impl TableSchema {
83    /// Create a new TableSchema from a file schema and partition columns.
84    ///
85    /// The table schema is automatically computed by appending the partition columns
86    /// to the file schema.
87    ///
88    /// You should prefer calling this method over
89    /// chaining [`TableSchema::from_file_schema`] and [`TableSchema::with_table_partition_cols`]
90    /// if you have both the file schema and partition columns available at construction time
91    /// since it avoids re-computing the table schema.
92    ///
93    /// # Arguments
94    ///
95    /// * `file_schema` - Schema of the data files (without partition columns)
96    /// * `table_partition_cols` - Partition columns to append to each row
97    ///
98    /// # Example
99    ///
100    /// ```
101    /// # use std::sync::Arc;
102    /// # use arrow::datatypes::{Schema, Field, DataType};
103    /// # use datafusion_datasource::TableSchema;
104    /// let file_schema = Arc::new(Schema::new(vec![
105    ///     Field::new("user_id", DataType::Int64, false),
106    ///     Field::new("amount", DataType::Float64, false),
107    /// ]));
108    ///
109    /// let partition_cols = vec![
110    ///     Arc::new(Field::new("date", DataType::Utf8, false)),
111    ///     Arc::new(Field::new("region", DataType::Utf8, false)),
112    /// ];
113    ///
114    /// let table_schema = TableSchema::new(file_schema, partition_cols);
115    ///
116    /// // Table schema will have 4 columns: user_id, amount, date, region
117    /// assert_eq!(table_schema.table_schema().fields().len(), 4);
118    /// ```
119    pub fn new(file_schema: SchemaRef, table_partition_cols: Vec<FieldRef>) -> Self {
120        let mut builder = SchemaBuilder::from(file_schema.as_ref());
121        builder.extend(table_partition_cols.iter().cloned());
122        Self {
123            file_schema,
124            table_partition_cols: Arc::new(table_partition_cols),
125            table_schema: Arc::new(builder.finish()),
126        }
127    }
128
129    /// Create a new TableSchema with no partition columns.
130    ///
131    /// You should prefer calling [`TableSchema::new`] if you have partition columns at
132    /// construction time since it avoids re-computing the table schema.
133    pub fn from_file_schema(file_schema: SchemaRef) -> Self {
134        Self::new(file_schema, vec![])
135    }
136
137    /// Add partition columns to an existing TableSchema, returning a new instance.
138    ///
139    /// You should prefer calling [`TableSchema::new`] instead of chaining [`TableSchema::from_file_schema`]
140    /// into [`TableSchema::with_table_partition_cols`] if you have partition columns at construction time
141    /// since it avoids re-computing the table schema.
142    pub fn with_table_partition_cols(mut self, partition_cols: Vec<FieldRef>) -> Self {
143        if self.table_partition_cols.is_empty() {
144            self.table_partition_cols = Arc::new(partition_cols);
145        } else {
146            // Append to existing partition columns
147            let table_partition_cols = Arc::get_mut(&mut self.table_partition_cols).expect(
148                "Expected to be the sole owner of table_partition_cols since this function accepts mut self",
149            );
150            table_partition_cols.extend(partition_cols);
151        }
152        let mut builder = SchemaBuilder::from(self.file_schema.as_ref());
153        builder.extend(self.table_partition_cols.iter().cloned());
154        self.table_schema = Arc::new(builder.finish());
155        self
156    }
157
158    /// Get the file schema (without partition columns).
159    ///
160    /// This is the schema of the actual data files on disk.
161    pub fn file_schema(&self) -> &SchemaRef {
162        &self.file_schema
163    }
164
165    /// Get the table partition columns.
166    ///
167    /// These are the columns derived from the directory structure that
168    /// will be appended to each row during query execution.
169    pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
170        &self.table_partition_cols
171    }
172
173    /// Get the full table schema (file schema + partition columns).
174    ///
175    /// This is the complete schema that will be seen by queries, combining
176    /// both the columns from the files and the partition columns.
177    pub fn table_schema(&self) -> &SchemaRef {
178        &self.table_schema
179    }
180}
181
182impl From<SchemaRef> for TableSchema {
183    fn from(schema: SchemaRef) -> Self {
184        Self::from_file_schema(schema)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::TableSchema;
191    use arrow::datatypes::{DataType, Field, Schema};
192    use std::sync::Arc;
193
194    #[test]
195    fn test_table_schema_creation() {
196        let file_schema = Arc::new(Schema::new(vec![
197            Field::new("user_id", DataType::Int64, false),
198            Field::new("amount", DataType::Float64, false),
199        ]));
200
201        let partition_cols = vec![
202            Arc::new(Field::new("date", DataType::Utf8, false)),
203            Arc::new(Field::new("region", DataType::Utf8, false)),
204        ];
205
206        let table_schema = TableSchema::new(file_schema.clone(), partition_cols.clone());
207
208        // Verify file schema
209        assert_eq!(table_schema.file_schema().as_ref(), file_schema.as_ref());
210
211        // Verify partition columns
212        assert_eq!(table_schema.table_partition_cols().len(), 2);
213        assert_eq!(table_schema.table_partition_cols()[0], partition_cols[0]);
214        assert_eq!(table_schema.table_partition_cols()[1], partition_cols[1]);
215
216        // Verify full table schema
217        let expected_fields = vec![
218            Field::new("user_id", DataType::Int64, false),
219            Field::new("amount", DataType::Float64, false),
220            Field::new("date", DataType::Utf8, false),
221            Field::new("region", DataType::Utf8, false),
222        ];
223        let expected_schema = Schema::new(expected_fields);
224        assert_eq!(table_schema.table_schema().as_ref(), &expected_schema);
225    }
226
227    #[test]
228    fn test_add_multiple_partition_columns() {
229        let file_schema =
230            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
231
232        let initial_partition_cols =
233            vec![Arc::new(Field::new("country", DataType::Utf8, false))];
234
235        let table_schema = TableSchema::new(file_schema.clone(), initial_partition_cols);
236
237        let additional_partition_cols = vec![
238            Arc::new(Field::new("city", DataType::Utf8, false)),
239            Arc::new(Field::new("year", DataType::Int32, false)),
240        ];
241
242        let updated_table_schema =
243            table_schema.with_table_partition_cols(additional_partition_cols);
244
245        // Verify file schema remains unchanged
246        assert_eq!(
247            updated_table_schema.file_schema().as_ref(),
248            file_schema.as_ref()
249        );
250
251        // Verify partition columns
252        assert_eq!(updated_table_schema.table_partition_cols().len(), 3);
253        assert_eq!(
254            updated_table_schema.table_partition_cols()[0].name(),
255            "country"
256        );
257        assert_eq!(
258            updated_table_schema.table_partition_cols()[1].name(),
259            "city"
260        );
261        assert_eq!(
262            updated_table_schema.table_partition_cols()[2].name(),
263            "year"
264        );
265
266        // Verify full table schema
267        let expected_fields = vec![
268            Field::new("id", DataType::Int32, false),
269            Field::new("country", DataType::Utf8, false),
270            Field::new("city", DataType::Utf8, false),
271            Field::new("year", DataType::Int32, false),
272        ];
273        let expected_schema = Schema::new(expected_fields);
274        assert_eq!(
275            updated_table_schema.table_schema().as_ref(),
276            &expected_schema
277        );
278    }
279}