Skip to main content

lance_file/
datatypes.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::ARROW_EXT_NAME_KEY;
7use lance_arrow::DataTypeExt;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use std::collections::HashMap;
13
14use crate::format::pb;
15
16#[allow(clippy::fallible_impl_from)]
17impl From<&pb::Field> for Field {
18    fn from(field: &pb::Field) -> Self {
19        let lance_metadata: HashMap<String, String> = field
20            .metadata
21            .iter()
22            .map(|(key, value)| {
23                let string_value = String::from_utf8_lossy(value).to_string();
24                (key.clone(), string_value)
25            })
26            .collect();
27        let mut lance_metadata = lance_metadata;
28        if !field.extension_name.is_empty() {
29            lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
30        }
31        Self {
32            name: field.name.clone(),
33            id: field.id,
34            parent_id: field.parent_id,
35            logical_type: LogicalType::from(field.logical_type.as_str()),
36            metadata: lance_metadata,
37            encoding: match field.encoding {
38                1 => Some(Encoding::Plain),
39                2 => Some(Encoding::VarBinary),
40                3 => Some(Encoding::Dictionary),
41                4 => Some(Encoding::RLE),
42                _ => None,
43            },
44            nullable: field.nullable,
45            children: vec![],
46            dictionary: field.dictionary.as_ref().map(Dictionary::from),
47            unenforced_primary_key_position: if field.unenforced_primary_key_position > 0 {
48                Some(field.unenforced_primary_key_position)
49            } else if field.unenforced_primary_key {
50                Some(0)
51            } else {
52                None
53            },
54        }
55    }
56}
57
58impl From<&Field> for pb::Field {
59    fn from(field: &Field) -> Self {
60        let pb_metadata = field
61            .metadata
62            .iter()
63            .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
64            .collect();
65        Self {
66            id: field.id,
67            parent_id: field.parent_id,
68            name: field.name.clone(),
69            logical_type: field.logical_type.to_string(),
70            encoding: match field.encoding {
71                Some(Encoding::Plain) => 1,
72                Some(Encoding::VarBinary) => 2,
73                Some(Encoding::Dictionary) => 3,
74                Some(Encoding::RLE) => 4,
75                _ => 0,
76            },
77            nullable: field.nullable,
78            dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
79            metadata: pb_metadata,
80            extension_name: field
81                .extension_name()
82                .map(|name| name.to_owned())
83                .unwrap_or_default(),
84            r#type: 0,
85            unenforced_primary_key: field.unenforced_primary_key_position.is_some(),
86            unenforced_primary_key_position: field.unenforced_primary_key_position.unwrap_or(0),
87        }
88    }
89}
90
91pub struct Fields(pub Vec<pb::Field>);
92
93impl From<&Field> for Fields {
94    fn from(field: &Field) -> Self {
95        let mut protos = vec![pb::Field::from(field)];
96        protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
97        Self(protos)
98    }
99}
100
101/// Convert list of protobuf `Field` to a Schema.
102impl From<&Fields> for Schema {
103    fn from(fields: &Fields) -> Self {
104        let mut schema = Self {
105            fields: vec![],
106            metadata: HashMap::default(),
107        };
108
109        fields.0.iter().for_each(|f| {
110            if f.parent_id == -1 {
111                schema.fields.push(Field::from(f));
112            } else {
113                let parent = schema.mut_field_by_id(f.parent_id).unwrap();
114                parent.children.push(Field::from(f));
115            }
116        });
117
118        schema
119    }
120}
121
122pub struct FieldsWithMeta {
123    pub fields: Fields,
124    pub metadata: HashMap<String, Vec<u8>>,
125}
126
127/// Convert list of protobuf `Field` and Metadata to a Schema.
128impl From<FieldsWithMeta> for Schema {
129    fn from(fields_with_meta: FieldsWithMeta) -> Self {
130        let lance_metadata = fields_with_meta
131            .metadata
132            .into_iter()
133            .map(|(key, value)| {
134                let string_value = String::from_utf8_lossy(&value).to_string();
135                (key, string_value)
136            })
137            .collect();
138
139        let schema_with_fields = Self::from(&fields_with_meta.fields);
140        Self {
141            fields: schema_with_fields.fields,
142            metadata: lance_metadata,
143        }
144    }
145}
146
147/// Convert a Schema to a list of protobuf Field.
148impl From<&Schema> for Fields {
149    fn from(schema: &Schema) -> Self {
150        let mut protos = vec![];
151        schema.fields.iter().for_each(|f| {
152            protos.extend(Self::from(f).0);
153        });
154        Self(protos)
155    }
156}
157
158/// Convert a Schema to a list of protobuf Field and Metadata
159impl From<&Schema> for FieldsWithMeta {
160    fn from(schema: &Schema) -> Self {
161        let fields = schema.into();
162        let metadata = schema
163            .metadata
164            .clone()
165            .into_iter()
166            .map(|(key, value)| (key, value.into_bytes()))
167            .collect();
168        Self { fields, metadata }
169    }
170}
171
172impl From<&pb::Dictionary> for Dictionary {
173    fn from(proto: &pb::Dictionary) -> Self {
174        Self {
175            offset: proto.offset as usize,
176            length: proto.length as usize,
177            values: None,
178        }
179    }
180}
181
182impl From<&Dictionary> for pb::Dictionary {
183    fn from(d: &Dictionary) -> Self {
184        Self {
185            offset: d.offset as i64,
186            length: d.length as i64,
187        }
188    }
189}
190
191impl From<Encoding> for pb::Encoding {
192    fn from(e: Encoding) -> Self {
193        match e {
194            Encoding::Plain => Self::Plain,
195            Encoding::VarBinary => Self::VarBinary,
196            Encoding::Dictionary => Self::Dictionary,
197            Encoding::RLE => Self::Rle,
198        }
199    }
200}
201
202#[async_recursion]
203async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
204    if let DataType::Dictionary(_, value_type) = field.data_type() {
205        assert!(field.dictionary.is_some());
206        if let Some(dict_info) = field.dictionary.as_mut() {
207            use DataType::*;
208            match value_type.as_ref() {
209                _ if value_type.is_binary_like() => {
210                    dict_info.values = Some(
211                        read_binary_array(
212                            reader,
213                            value_type.as_ref(),
214                            true, // Empty values are null
215                            dict_info.offset,
216                            dict_info.length,
217                            ..,
218                        )
219                        .await?,
220                    );
221                }
222                Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
223                    dict_info.values = Some(
224                        read_fixed_stride_array(
225                            reader,
226                            value_type.as_ref(),
227                            dict_info.offset,
228                            dict_info.length,
229                            ..,
230                        )
231                        .await?,
232                    );
233                }
234                _ => {
235                    return Err(Error::schema(format!(
236                        "Does not support {} as dictionary value type",
237                        value_type
238                    )));
239                }
240            }
241        } else {
242            panic!("Should not reach here: dictionary field does not load dictionary info")
243        }
244        Ok(())
245    } else {
246        for child in field.children.as_mut_slice() {
247            load_field_dictionary(child, reader).await?;
248        }
249        Ok(())
250    }
251}
252
253/// Load dictionary value array from manifest files.
254// TODO: pub(crate)
255pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
256    for field in schema.fields.as_mut_slice() {
257        load_field_dictionary(field, reader).await?;
258    }
259    Ok(())
260}
261
262#[cfg(test)]
263mod tests {
264    use arrow_schema::DataType;
265    use arrow_schema::Field as ArrowField;
266    use arrow_schema::Fields as ArrowFields;
267    use arrow_schema::Schema as ArrowSchema;
268    use lance_core::datatypes::Schema;
269    use std::collections::HashMap;
270
271    use super::{Fields, FieldsWithMeta};
272
273    #[test]
274    fn test_schema_set_ids() {
275        let arrow_schema = ArrowSchema::new(vec![
276            ArrowField::new("a", DataType::Int32, false),
277            ArrowField::new(
278                "b",
279                DataType::Struct(ArrowFields::from(vec![
280                    ArrowField::new("f1", DataType::Utf8, true),
281                    ArrowField::new("f2", DataType::Boolean, false),
282                    ArrowField::new("f3", DataType::Float32, false),
283                ])),
284                true,
285            ),
286            ArrowField::new("c", DataType::Float64, false),
287        ]);
288        let schema = Schema::try_from(&arrow_schema).unwrap();
289
290        let protos: Fields = (&schema).into();
291        assert_eq!(
292            protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
293            (0..6).collect::<Vec<_>>()
294        );
295    }
296
297    #[test]
298    fn test_schema_metadata() {
299        let mut metadata: HashMap<String, String> = HashMap::new();
300        metadata.insert(String::from("k1"), String::from("v1"));
301        metadata.insert(String::from("k2"), String::from("v2"));
302
303        let arrow_schema = ArrowSchema::new_with_metadata(
304            vec![ArrowField::new("a", DataType::Int32, false)],
305            metadata,
306        );
307
308        let expected_schema = Schema::try_from(&arrow_schema).unwrap();
309        let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
310
311        let schema = Schema::from(fields_with_meta);
312        assert_eq!(expected_schema, schema);
313    }
314}