Skip to main content

lance_file/
datatypes.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::ARROW_EXT_NAME_KEY;
7use lance_arrow::DataTypeExt;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use std::collections::HashMap;
13
14use crate::format::pb;
15
16#[allow(clippy::fallible_impl_from)]
17impl From<&pb::Field> for Field {
18    fn from(field: &pb::Field) -> Self {
19        let lance_metadata: HashMap<String, String> = field
20            .metadata
21            .iter()
22            .map(|(key, value)| {
23                let string_value = String::from_utf8_lossy(value).to_string();
24                (key.clone(), string_value)
25            })
26            .collect();
27        let mut lance_metadata = lance_metadata;
28        if !field.extension_name.is_empty() {
29            lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
30        }
31        Self {
32            name: field.name.clone(),
33            id: field.id,
34            parent_id: field.parent_id,
35            logical_type: LogicalType::from(field.logical_type.as_str()),
36            metadata: lance_metadata,
37            encoding: match field.encoding {
38                1 => Some(Encoding::Plain),
39                2 => Some(Encoding::VarBinary),
40                3 => Some(Encoding::Dictionary),
41                4 => Some(Encoding::RLE),
42                _ => None,
43            },
44            nullable: field.nullable,
45            children: vec![],
46            dictionary: field.dictionary.as_ref().map(Dictionary::from),
47            unenforced_primary_key_position: if field.unenforced_primary_key_position > 0 {
48                Some(field.unenforced_primary_key_position)
49            } else if field.unenforced_primary_key {
50                Some(0)
51            } else {
52                None
53            },
54            unenforced_clustering_key_position: if field.unenforced_clustering_key_position > 0 {
55                Some(field.unenforced_clustering_key_position)
56            } else {
57                None
58            },
59        }
60    }
61}
62
63impl From<&Field> for pb::Field {
64    fn from(field: &Field) -> Self {
65        let pb_metadata = field
66            .metadata
67            .iter()
68            .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
69            .collect();
70        Self {
71            id: field.id,
72            parent_id: field.parent_id,
73            name: field.name.clone(),
74            logical_type: field.logical_type.to_string(),
75            encoding: match field.encoding {
76                Some(Encoding::Plain) => 1,
77                Some(Encoding::VarBinary) => 2,
78                Some(Encoding::Dictionary) => 3,
79                Some(Encoding::RLE) => 4,
80                _ => 0,
81            },
82            nullable: field.nullable,
83            dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
84            metadata: pb_metadata,
85            extension_name: field
86                .extension_name()
87                .map(|name| name.to_owned())
88                .unwrap_or_default(),
89            r#type: 0,
90            unenforced_primary_key: field.unenforced_primary_key_position.is_some(),
91            unenforced_primary_key_position: field.unenforced_primary_key_position.unwrap_or(0),
92            unenforced_clustering_key: false,
93            unenforced_clustering_key_position: field
94                .unenforced_clustering_key_position
95                .unwrap_or(0),
96        }
97    }
98}
99
100pub struct Fields(pub Vec<pb::Field>);
101
102impl From<&Field> for Fields {
103    fn from(field: &Field) -> Self {
104        let mut protos = vec![pb::Field::from(field)];
105        protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
106        Self(protos)
107    }
108}
109
110/// Convert list of protobuf `Field` to a Schema.
111impl From<&Fields> for Schema {
112    fn from(fields: &Fields) -> Self {
113        let mut schema = Self {
114            fields: vec![],
115            metadata: HashMap::default(),
116        };
117
118        fields.0.iter().for_each(|f| {
119            if f.parent_id == -1 {
120                schema.fields.push(Field::from(f));
121            } else {
122                let parent = schema.mut_field_by_id(f.parent_id).unwrap();
123                parent.children.push(Field::from(f));
124            }
125        });
126
127        schema
128    }
129}
130
131pub struct FieldsWithMeta {
132    pub fields: Fields,
133    pub metadata: HashMap<String, Vec<u8>>,
134}
135
136/// Convert list of protobuf `Field` and Metadata to a Schema.
137impl From<FieldsWithMeta> for Schema {
138    fn from(fields_with_meta: FieldsWithMeta) -> Self {
139        let lance_metadata = fields_with_meta
140            .metadata
141            .into_iter()
142            .map(|(key, value)| {
143                let string_value = String::from_utf8_lossy(&value).to_string();
144                (key, string_value)
145            })
146            .collect();
147
148        let schema_with_fields = Self::from(&fields_with_meta.fields);
149        Self {
150            fields: schema_with_fields.fields,
151            metadata: lance_metadata,
152        }
153    }
154}
155
156/// Convert a Schema to a list of protobuf Field.
157impl From<&Schema> for Fields {
158    fn from(schema: &Schema) -> Self {
159        let mut protos = vec![];
160        schema.fields.iter().for_each(|f| {
161            protos.extend(Self::from(f).0);
162        });
163        Self(protos)
164    }
165}
166
167/// Convert a Schema to a list of protobuf Field and Metadata
168impl From<&Schema> for FieldsWithMeta {
169    fn from(schema: &Schema) -> Self {
170        let fields = schema.into();
171        let metadata = schema
172            .metadata
173            .clone()
174            .into_iter()
175            .map(|(key, value)| (key, value.into_bytes()))
176            .collect();
177        Self { fields, metadata }
178    }
179}
180
181impl From<&pb::Dictionary> for Dictionary {
182    fn from(proto: &pb::Dictionary) -> Self {
183        Self {
184            offset: proto.offset as usize,
185            length: proto.length as usize,
186            values: None,
187        }
188    }
189}
190
191impl From<&Dictionary> for pb::Dictionary {
192    fn from(d: &Dictionary) -> Self {
193        Self {
194            offset: d.offset as i64,
195            length: d.length as i64,
196        }
197    }
198}
199
200impl From<Encoding> for pb::Encoding {
201    fn from(e: Encoding) -> Self {
202        match e {
203            Encoding::Plain => Self::Plain,
204            Encoding::VarBinary => Self::VarBinary,
205            Encoding::Dictionary => Self::Dictionary,
206            Encoding::RLE => Self::Rle,
207        }
208    }
209}
210
211#[async_recursion]
212async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
213    if let DataType::Dictionary(_, value_type) = field.data_type() {
214        assert!(field.dictionary.is_some());
215        if let Some(dict_info) = field.dictionary.as_mut() {
216            use DataType::*;
217            match value_type.as_ref() {
218                _ if value_type.is_binary_like() => {
219                    dict_info.values = Some(
220                        read_binary_array(
221                            reader,
222                            value_type.as_ref(),
223                            true, // Empty values are null
224                            dict_info.offset,
225                            dict_info.length,
226                            ..,
227                        )
228                        .await?,
229                    );
230                }
231                Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
232                    dict_info.values = Some(
233                        read_fixed_stride_array(
234                            reader,
235                            value_type.as_ref(),
236                            dict_info.offset,
237                            dict_info.length,
238                            ..,
239                        )
240                        .await?,
241                    );
242                }
243                _ => {
244                    return Err(Error::schema(format!(
245                        "Does not support {} as dictionary value type",
246                        value_type
247                    )));
248                }
249            }
250        } else {
251            panic!("Should not reach here: dictionary field does not load dictionary info")
252        }
253        Ok(())
254    } else {
255        for child in field.children.as_mut_slice() {
256            load_field_dictionary(child, reader).await?;
257        }
258        Ok(())
259    }
260}
261
262/// Load dictionary value array from manifest files.
263// TODO: pub(crate)
264pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
265    for field in schema.fields.as_mut_slice() {
266        load_field_dictionary(field, reader).await?;
267    }
268    Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273    use arrow_schema::DataType;
274    use arrow_schema::Field as ArrowField;
275    use arrow_schema::Fields as ArrowFields;
276    use arrow_schema::Schema as ArrowSchema;
277    use lance_core::datatypes::Schema;
278    use std::collections::HashMap;
279
280    use super::{Fields, FieldsWithMeta};
281
282    #[test]
283    fn test_schema_set_ids() {
284        let arrow_schema = ArrowSchema::new(vec![
285            ArrowField::new("a", DataType::Int32, false),
286            ArrowField::new(
287                "b",
288                DataType::Struct(ArrowFields::from(vec![
289                    ArrowField::new("f1", DataType::Utf8, true),
290                    ArrowField::new("f2", DataType::Boolean, false),
291                    ArrowField::new("f3", DataType::Float32, false),
292                ])),
293                true,
294            ),
295            ArrowField::new("c", DataType::Float64, false),
296        ]);
297        let schema = Schema::try_from(&arrow_schema).unwrap();
298
299        let protos: Fields = (&schema).into();
300        assert_eq!(
301            protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
302            (0..6).collect::<Vec<_>>()
303        );
304    }
305
306    #[test]
307    fn test_schema_metadata() {
308        let mut metadata: HashMap<String, String> = HashMap::new();
309        metadata.insert(String::from("k1"), String::from("v1"));
310        metadata.insert(String::from("k2"), String::from("v2"));
311
312        let arrow_schema = ArrowSchema::new_with_metadata(
313            vec![ArrowField::new("a", DataType::Int32, false)],
314            metadata,
315        );
316
317        let expected_schema = Schema::try_from(&arrow_schema).unwrap();
318        let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
319
320        let schema = Schema::from(fields_with_meta);
321        assert_eq!(expected_schema, schema);
322    }
323
324    #[test]
325    fn test_clustering_key_roundtrip() {
326        let arrow_schema = ArrowSchema::new(vec![
327            ArrowField::new("region", DataType::Utf8, true).with_metadata(
328                vec![(
329                    "lance-schema:unenforced-clustering-key:position".to_owned(),
330                    "1".to_owned(),
331                )]
332                .into_iter()
333                .collect::<HashMap<_, _>>(),
334            ),
335            ArrowField::new("date", DataType::Int32, false).with_metadata(
336                vec![(
337                    "lance-schema:unenforced-clustering-key:position".to_owned(),
338                    "2".to_owned(),
339                )]
340                .into_iter()
341                .collect::<HashMap<_, _>>(),
342            ),
343            ArrowField::new("value", DataType::Float64, true),
344        ]);
345
346        let schema = Schema::try_from(&arrow_schema).unwrap();
347        let ck = schema.unenforced_clustering_key();
348        assert_eq!(ck.len(), 2);
349        assert_eq!(ck[0].name, "region");
350        assert_eq!(ck[1].name, "date");
351
352        // Round-trip through protobuf
353        let fields_with_meta: FieldsWithMeta = (&schema).into();
354        let restored = Schema::from(fields_with_meta);
355
356        let ck2 = restored.unenforced_clustering_key();
357        assert_eq!(ck2.len(), 2);
358        assert_eq!(ck2[0].name, "region");
359        assert_eq!(ck2[1].name, "date");
360        assert_eq!(ck2[0].unenforced_clustering_key_position, Some(1));
361        assert_eq!(ck2[1].unenforced_clustering_key_position, Some(2));
362
363        // Non-clustering-key field should not have position
364        let value_field = restored.field("value").unwrap();
365        assert!(!value_field.is_unenforced_clustering_key());
366    }
367}