lance_file/
datatypes.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::DataTypeExt;
7use lance_arrow::ARROW_EXT_NAME_KEY;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use snafu::location;
13use std::collections::HashMap;
14
15use crate::format::pb;
16
17#[allow(clippy::fallible_impl_from)]
18impl From<&pb::Field> for Field {
19    fn from(field: &pb::Field) -> Self {
20        let lance_metadata: HashMap<String, String> = field
21            .metadata
22            .iter()
23            .map(|(key, value)| {
24                let string_value = String::from_utf8_lossy(value).to_string();
25                (key.clone(), string_value)
26            })
27            .collect();
28        let mut lance_metadata = lance_metadata;
29        if !field.extension_name.is_empty() {
30            lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
31        }
32        Self {
33            name: field.name.clone(),
34            id: field.id,
35            parent_id: field.parent_id,
36            logical_type: LogicalType::from(field.logical_type.as_str()),
37            metadata: lance_metadata,
38            encoding: match field.encoding {
39                1 => Some(Encoding::Plain),
40                2 => Some(Encoding::VarBinary),
41                3 => Some(Encoding::Dictionary),
42                4 => Some(Encoding::RLE),
43                _ => None,
44            },
45            nullable: field.nullable,
46            children: vec![],
47            dictionary: field.dictionary.as_ref().map(Dictionary::from),
48            unenforced_primary_key: field.unenforced_primary_key,
49        }
50    }
51}
52
53impl From<&Field> for pb::Field {
54    fn from(field: &Field) -> Self {
55        let pb_metadata = field
56            .metadata
57            .iter()
58            .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
59            .collect();
60        Self {
61            id: field.id,
62            parent_id: field.parent_id,
63            name: field.name.clone(),
64            logical_type: field.logical_type.to_string(),
65            encoding: match field.encoding {
66                Some(Encoding::Plain) => 1,
67                Some(Encoding::VarBinary) => 2,
68                Some(Encoding::Dictionary) => 3,
69                Some(Encoding::RLE) => 4,
70                _ => 0,
71            },
72            nullable: field.nullable,
73            dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
74            metadata: pb_metadata,
75            extension_name: field
76                .extension_name()
77                .map(|name| name.to_owned())
78                .unwrap_or_default(),
79            r#type: 0,
80            unenforced_primary_key: field.unenforced_primary_key,
81        }
82    }
83}
84
85pub struct Fields(pub Vec<pb::Field>);
86
87impl From<&Field> for Fields {
88    fn from(field: &Field) -> Self {
89        let mut protos = vec![pb::Field::from(field)];
90        protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
91        Self(protos)
92    }
93}
94
95/// Convert list of protobuf `Field` to a Schema.
96impl From<&Fields> for Schema {
97    fn from(fields: &Fields) -> Self {
98        let mut schema = Self {
99            fields: vec![],
100            metadata: HashMap::default(),
101        };
102
103        fields.0.iter().for_each(|f| {
104            if f.parent_id == -1 {
105                schema.fields.push(Field::from(f));
106            } else {
107                let parent = schema.mut_field_by_id(f.parent_id).unwrap();
108                parent.children.push(Field::from(f));
109            }
110        });
111
112        schema
113    }
114}
115
116pub struct FieldsWithMeta {
117    pub fields: Fields,
118    pub metadata: HashMap<String, Vec<u8>>,
119}
120
121/// Convert list of protobuf `Field` and Metadata to a Schema.
122impl From<FieldsWithMeta> for Schema {
123    fn from(fields_with_meta: FieldsWithMeta) -> Self {
124        let lance_metadata = fields_with_meta
125            .metadata
126            .into_iter()
127            .map(|(key, value)| {
128                let string_value = String::from_utf8_lossy(&value).to_string();
129                (key, string_value)
130            })
131            .collect();
132
133        let schema_with_fields = Self::from(&fields_with_meta.fields);
134        Self {
135            fields: schema_with_fields.fields,
136            metadata: lance_metadata,
137        }
138    }
139}
140
141/// Convert a Schema to a list of protobuf Field.
142impl From<&Schema> for Fields {
143    fn from(schema: &Schema) -> Self {
144        let mut protos = vec![];
145        schema.fields.iter().for_each(|f| {
146            protos.extend(Self::from(f).0);
147        });
148        Self(protos)
149    }
150}
151
152/// Convert a Schema to a list of protobuf Field and Metadata
153impl From<&Schema> for FieldsWithMeta {
154    fn from(schema: &Schema) -> Self {
155        let fields = schema.into();
156        let metadata = schema
157            .metadata
158            .clone()
159            .into_iter()
160            .map(|(key, value)| (key, value.into_bytes()))
161            .collect();
162        Self { fields, metadata }
163    }
164}
165
166impl From<&pb::Dictionary> for Dictionary {
167    fn from(proto: &pb::Dictionary) -> Self {
168        Self {
169            offset: proto.offset as usize,
170            length: proto.length as usize,
171            values: None,
172        }
173    }
174}
175
176impl From<&Dictionary> for pb::Dictionary {
177    fn from(d: &Dictionary) -> Self {
178        Self {
179            offset: d.offset as i64,
180            length: d.length as i64,
181        }
182    }
183}
184
185impl From<Encoding> for pb::Encoding {
186    fn from(e: Encoding) -> Self {
187        match e {
188            Encoding::Plain => Self::Plain,
189            Encoding::VarBinary => Self::VarBinary,
190            Encoding::Dictionary => Self::Dictionary,
191            Encoding::RLE => Self::Rle,
192        }
193    }
194}
195
196#[async_recursion]
197async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
198    if let DataType::Dictionary(_, value_type) = field.data_type() {
199        assert!(field.dictionary.is_some());
200        if let Some(dict_info) = field.dictionary.as_mut() {
201            use DataType::*;
202            match value_type.as_ref() {
203                _ if value_type.is_binary_like() => {
204                    dict_info.values = Some(
205                        read_binary_array(
206                            reader,
207                            value_type.as_ref(),
208                            true, // Empty values are null
209                            dict_info.offset,
210                            dict_info.length,
211                            ..,
212                        )
213                        .await?,
214                    );
215                }
216                Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
217                    dict_info.values = Some(
218                        read_fixed_stride_array(
219                            reader,
220                            value_type.as_ref(),
221                            dict_info.offset,
222                            dict_info.length,
223                            ..,
224                        )
225                        .await?,
226                    );
227                }
228                _ => {
229                    return Err(Error::Schema {
230                        message: format!(
231                            "Does not support {} as dictionary value type",
232                            value_type
233                        ),
234                        location: location!(),
235                    });
236                }
237            }
238        } else {
239            panic!("Should not reach here: dictionary field does not load dictionary info")
240        }
241        Ok(())
242    } else {
243        for child in field.children.as_mut_slice() {
244            load_field_dictionary(child, reader).await?;
245        }
246        Ok(())
247    }
248}
249
250/// Load dictionary value array from manifest files.
251// TODO: pub(crate)
252pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
253    for field in schema.fields.as_mut_slice() {
254        load_field_dictionary(field, reader).await?;
255    }
256    Ok(())
257}
258
259#[cfg(test)]
260mod tests {
261    use arrow_schema::DataType;
262    use arrow_schema::Field as ArrowField;
263    use arrow_schema::Fields as ArrowFields;
264    use arrow_schema::Schema as ArrowSchema;
265    use lance_core::datatypes::Schema;
266    use std::collections::HashMap;
267
268    use super::{Fields, FieldsWithMeta};
269
270    #[test]
271    fn test_schema_set_ids() {
272        let arrow_schema = ArrowSchema::new(vec![
273            ArrowField::new("a", DataType::Int32, false),
274            ArrowField::new(
275                "b",
276                DataType::Struct(ArrowFields::from(vec![
277                    ArrowField::new("f1", DataType::Utf8, true),
278                    ArrowField::new("f2", DataType::Boolean, false),
279                    ArrowField::new("f3", DataType::Float32, false),
280                ])),
281                true,
282            ),
283            ArrowField::new("c", DataType::Float64, false),
284        ]);
285        let schema = Schema::try_from(&arrow_schema).unwrap();
286
287        let protos: Fields = (&schema).into();
288        assert_eq!(
289            protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
290            (0..6).collect::<Vec<_>>()
291        );
292    }
293
294    #[test]
295    fn test_schema_metadata() {
296        let mut metadata: HashMap<String, String> = HashMap::new();
297        metadata.insert(String::from("k1"), String::from("v1"));
298        metadata.insert(String::from("k2"), String::from("v2"));
299
300        let arrow_schema = ArrowSchema::new_with_metadata(
301            vec![ArrowField::new("a", DataType::Int32, false)],
302            metadata,
303        );
304
305        let expected_schema = Schema::try_from(&arrow_schema).unwrap();
306        let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
307
308        let schema = Schema::from(fields_with_meta);
309        assert_eq!(expected_schema, schema);
310    }
311}