Skip to main content

lance_file/
datatypes.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::DataTypeExt;
7use lance_arrow::ARROW_EXT_NAME_KEY;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use snafu::location;
13use std::collections::HashMap;
14
15use crate::format::pb;
16
17#[allow(clippy::fallible_impl_from)]
18impl From<&pb::Field> for Field {
19    fn from(field: &pb::Field) -> Self {
20        let lance_metadata: HashMap<String, String> = field
21            .metadata
22            .iter()
23            .map(|(key, value)| {
24                let string_value = String::from_utf8_lossy(value).to_string();
25                (key.clone(), string_value)
26            })
27            .collect();
28        let mut lance_metadata = lance_metadata;
29        if !field.extension_name.is_empty() {
30            lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
31        }
32        Self {
33            name: field.name.clone(),
34            id: field.id,
35            parent_id: field.parent_id,
36            logical_type: LogicalType::from(field.logical_type.as_str()),
37            metadata: lance_metadata,
38            encoding: match field.encoding {
39                1 => Some(Encoding::Plain),
40                2 => Some(Encoding::VarBinary),
41                3 => Some(Encoding::Dictionary),
42                4 => Some(Encoding::RLE),
43                _ => None,
44            },
45            nullable: field.nullable,
46            children: vec![],
47            dictionary: field.dictionary.as_ref().map(Dictionary::from),
48            unenforced_primary_key_position: if field.unenforced_primary_key_position > 0 {
49                Some(field.unenforced_primary_key_position)
50            } else if field.unenforced_primary_key {
51                Some(0)
52            } else {
53                None
54            },
55        }
56    }
57}
58
59impl From<&Field> for pb::Field {
60    fn from(field: &Field) -> Self {
61        let pb_metadata = field
62            .metadata
63            .iter()
64            .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
65            .collect();
66        Self {
67            id: field.id,
68            parent_id: field.parent_id,
69            name: field.name.clone(),
70            logical_type: field.logical_type.to_string(),
71            encoding: match field.encoding {
72                Some(Encoding::Plain) => 1,
73                Some(Encoding::VarBinary) => 2,
74                Some(Encoding::Dictionary) => 3,
75                Some(Encoding::RLE) => 4,
76                _ => 0,
77            },
78            nullable: field.nullable,
79            dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
80            metadata: pb_metadata,
81            extension_name: field
82                .extension_name()
83                .map(|name| name.to_owned())
84                .unwrap_or_default(),
85            r#type: 0,
86            unenforced_primary_key: field.unenforced_primary_key_position.is_some(),
87            unenforced_primary_key_position: field.unenforced_primary_key_position.unwrap_or(0),
88        }
89    }
90}
91
92pub struct Fields(pub Vec<pb::Field>);
93
94impl From<&Field> for Fields {
95    fn from(field: &Field) -> Self {
96        let mut protos = vec![pb::Field::from(field)];
97        protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
98        Self(protos)
99    }
100}
101
102/// Convert list of protobuf `Field` to a Schema.
103impl From<&Fields> for Schema {
104    fn from(fields: &Fields) -> Self {
105        let mut schema = Self {
106            fields: vec![],
107            metadata: HashMap::default(),
108        };
109
110        fields.0.iter().for_each(|f| {
111            if f.parent_id == -1 {
112                schema.fields.push(Field::from(f));
113            } else {
114                let parent = schema.mut_field_by_id(f.parent_id).unwrap();
115                parent.children.push(Field::from(f));
116            }
117        });
118
119        schema
120    }
121}
122
123pub struct FieldsWithMeta {
124    pub fields: Fields,
125    pub metadata: HashMap<String, Vec<u8>>,
126}
127
128/// Convert list of protobuf `Field` and Metadata to a Schema.
129impl From<FieldsWithMeta> for Schema {
130    fn from(fields_with_meta: FieldsWithMeta) -> Self {
131        let lance_metadata = fields_with_meta
132            .metadata
133            .into_iter()
134            .map(|(key, value)| {
135                let string_value = String::from_utf8_lossy(&value).to_string();
136                (key, string_value)
137            })
138            .collect();
139
140        let schema_with_fields = Self::from(&fields_with_meta.fields);
141        Self {
142            fields: schema_with_fields.fields,
143            metadata: lance_metadata,
144        }
145    }
146}
147
148/// Convert a Schema to a list of protobuf Field.
149impl From<&Schema> for Fields {
150    fn from(schema: &Schema) -> Self {
151        let mut protos = vec![];
152        schema.fields.iter().for_each(|f| {
153            protos.extend(Self::from(f).0);
154        });
155        Self(protos)
156    }
157}
158
159/// Convert a Schema to a list of protobuf Field and Metadata
160impl From<&Schema> for FieldsWithMeta {
161    fn from(schema: &Schema) -> Self {
162        let fields = schema.into();
163        let metadata = schema
164            .metadata
165            .clone()
166            .into_iter()
167            .map(|(key, value)| (key, value.into_bytes()))
168            .collect();
169        Self { fields, metadata }
170    }
171}
172
173impl From<&pb::Dictionary> for Dictionary {
174    fn from(proto: &pb::Dictionary) -> Self {
175        Self {
176            offset: proto.offset as usize,
177            length: proto.length as usize,
178            values: None,
179        }
180    }
181}
182
183impl From<&Dictionary> for pb::Dictionary {
184    fn from(d: &Dictionary) -> Self {
185        Self {
186            offset: d.offset as i64,
187            length: d.length as i64,
188        }
189    }
190}
191
192impl From<Encoding> for pb::Encoding {
193    fn from(e: Encoding) -> Self {
194        match e {
195            Encoding::Plain => Self::Plain,
196            Encoding::VarBinary => Self::VarBinary,
197            Encoding::Dictionary => Self::Dictionary,
198            Encoding::RLE => Self::Rle,
199        }
200    }
201}
202
203#[async_recursion]
204async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
205    if let DataType::Dictionary(_, value_type) = field.data_type() {
206        assert!(field.dictionary.is_some());
207        if let Some(dict_info) = field.dictionary.as_mut() {
208            use DataType::*;
209            match value_type.as_ref() {
210                _ if value_type.is_binary_like() => {
211                    dict_info.values = Some(
212                        read_binary_array(
213                            reader,
214                            value_type.as_ref(),
215                            true, // Empty values are null
216                            dict_info.offset,
217                            dict_info.length,
218                            ..,
219                        )
220                        .await?,
221                    );
222                }
223                Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
224                    dict_info.values = Some(
225                        read_fixed_stride_array(
226                            reader,
227                            value_type.as_ref(),
228                            dict_info.offset,
229                            dict_info.length,
230                            ..,
231                        )
232                        .await?,
233                    );
234                }
235                _ => {
236                    return Err(Error::Schema {
237                        message: format!(
238                            "Does not support {} as dictionary value type",
239                            value_type
240                        ),
241                        location: location!(),
242                    });
243                }
244            }
245        } else {
246            panic!("Should not reach here: dictionary field does not load dictionary info")
247        }
248        Ok(())
249    } else {
250        for child in field.children.as_mut_slice() {
251            load_field_dictionary(child, reader).await?;
252        }
253        Ok(())
254    }
255}
256
257/// Load dictionary value array from manifest files.
258// TODO: pub(crate)
259pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
260    for field in schema.fields.as_mut_slice() {
261        load_field_dictionary(field, reader).await?;
262    }
263    Ok(())
264}
265
266#[cfg(test)]
267mod tests {
268    use arrow_schema::DataType;
269    use arrow_schema::Field as ArrowField;
270    use arrow_schema::Fields as ArrowFields;
271    use arrow_schema::Schema as ArrowSchema;
272    use lance_core::datatypes::Schema;
273    use std::collections::HashMap;
274
275    use super::{Fields, FieldsWithMeta};
276
277    #[test]
278    fn test_schema_set_ids() {
279        let arrow_schema = ArrowSchema::new(vec![
280            ArrowField::new("a", DataType::Int32, false),
281            ArrowField::new(
282                "b",
283                DataType::Struct(ArrowFields::from(vec![
284                    ArrowField::new("f1", DataType::Utf8, true),
285                    ArrowField::new("f2", DataType::Boolean, false),
286                    ArrowField::new("f3", DataType::Float32, false),
287                ])),
288                true,
289            ),
290            ArrowField::new("c", DataType::Float64, false),
291        ]);
292        let schema = Schema::try_from(&arrow_schema).unwrap();
293
294        let protos: Fields = (&schema).into();
295        assert_eq!(
296            protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
297            (0..6).collect::<Vec<_>>()
298        );
299    }
300
301    #[test]
302    fn test_schema_metadata() {
303        let mut metadata: HashMap<String, String> = HashMap::new();
304        metadata.insert(String::from("k1"), String::from("v1"));
305        metadata.insert(String::from("k2"), String::from("v2"));
306
307        let arrow_schema = ArrowSchema::new_with_metadata(
308            vec![ArrowField::new("a", DataType::Int32, false)],
309            metadata,
310        );
311
312        let expected_schema = Schema::try_from(&arrow_schema).unwrap();
313        let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
314
315        let schema = Schema::from(fields_with_meta);
316        assert_eq!(expected_schema, schema);
317    }
318}