1use std::collections::HashMap;
5
6use arrow_schema::DataType;
7use async_recursion::async_recursion;
8use lance_arrow::bfloat16::ARROW_EXT_NAME_KEY;
9use lance_arrow::DataTypeExt;
10use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
11use lance_core::{Error, Result};
12use lance_io::traits::Reader;
13use lance_io::utils::{read_binary_array, read_fixed_stride_array};
14use snafu::location;
15
16use crate::format::pb;
17
18#[allow(clippy::fallible_impl_from)]
19impl From<&pb::Field> for Field {
20 fn from(field: &pb::Field) -> Self {
21 let mut lance_metadata: HashMap<String, String> = field
22 .metadata
23 .iter()
24 .map(|(key, value)| {
25 let string_value = String::from_utf8_lossy(value).to_string();
26 (key.clone(), string_value)
27 })
28 .collect();
29 if !field.extension_name.is_empty() {
30 lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
31 }
32 Self {
33 name: field.name.clone(),
34 id: field.id,
35 parent_id: field.parent_id,
36 logical_type: LogicalType::from(field.logical_type.as_str()),
37 metadata: lance_metadata,
38 encoding: match field.encoding {
39 1 => Some(Encoding::Plain),
40 2 => Some(Encoding::VarBinary),
41 3 => Some(Encoding::Dictionary),
42 4 => Some(Encoding::RLE),
43 _ => None,
44 },
45 nullable: field.nullable,
46 children: vec![],
47 dictionary: field.dictionary.as_ref().map(Dictionary::from),
48 storage_class: field.storage_class.parse().unwrap(),
49 unenforced_primary_key: field.unenforced_primary_key,
50 }
51 }
52}
53
54impl From<&Field> for pb::Field {
55 fn from(field: &Field) -> Self {
56 let pb_metadata = field
57 .metadata
58 .clone()
59 .into_iter()
60 .map(|(key, value)| (key, value.into_bytes()))
61 .collect();
62 Self {
63 id: field.id,
64 parent_id: field.parent_id,
65 name: field.name.clone(),
66 logical_type: field.logical_type.to_string(),
67 encoding: match field.encoding {
68 Some(Encoding::Plain) => 1,
69 Some(Encoding::VarBinary) => 2,
70 Some(Encoding::Dictionary) => 3,
71 Some(Encoding::RLE) => 4,
72 _ => 0,
73 },
74 nullable: field.nullable,
75 dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
76 metadata: pb_metadata,
77 extension_name: field
78 .extension_name()
79 .map(|name| name.to_owned())
80 .unwrap_or_default(),
81 r#type: 0,
82 storage_class: field.storage_class.to_string(),
83 unenforced_primary_key: field.unenforced_primary_key,
84 }
85 }
86}
87
88pub struct Fields(pub Vec<pb::Field>);
89
90impl From<&Field> for Fields {
91 fn from(field: &Field) -> Self {
92 let mut protos = vec![pb::Field::from(field)];
93 protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
94 Self(protos)
95 }
96}
97
98impl From<&Fields> for Schema {
100 fn from(fields: &Fields) -> Self {
101 let mut schema = Self {
102 fields: vec![],
103 metadata: HashMap::default(),
104 };
105
106 fields.0.iter().for_each(|f| {
107 if f.parent_id == -1 {
108 schema.fields.push(Field::from(f));
109 } else {
110 let parent = schema.mut_field_by_id(f.parent_id).unwrap();
111 parent.children.push(Field::from(f));
112 }
113 });
114
115 schema
116 }
117}
118
119pub struct FieldsWithMeta {
120 pub fields: Fields,
121 pub metadata: HashMap<String, Vec<u8>>,
122}
123
124impl From<FieldsWithMeta> for Schema {
126 fn from(fields_with_meta: FieldsWithMeta) -> Self {
127 let lance_metadata = fields_with_meta
128 .metadata
129 .into_iter()
130 .map(|(key, value)| {
131 let string_value = String::from_utf8_lossy(&value).to_string();
132 (key, string_value)
133 })
134 .collect();
135
136 let schema_with_fields = Self::from(&fields_with_meta.fields);
137 Self {
138 fields: schema_with_fields.fields,
139 metadata: lance_metadata,
140 }
141 }
142}
143
144impl From<&Schema> for Fields {
146 fn from(schema: &Schema) -> Self {
147 let mut protos = vec![];
148 schema.fields.iter().for_each(|f| {
149 protos.extend(Self::from(f).0);
150 });
151 Self(protos)
152 }
153}
154
155impl From<&Schema> for FieldsWithMeta {
157 fn from(schema: &Schema) -> Self {
158 let fields = schema.into();
159 let metadata = schema
160 .metadata
161 .clone()
162 .into_iter()
163 .map(|(key, value)| (key, value.into_bytes()))
164 .collect();
165 Self { fields, metadata }
166 }
167}
168
169impl From<&pb::Dictionary> for Dictionary {
170 fn from(proto: &pb::Dictionary) -> Self {
171 Self {
172 offset: proto.offset as usize,
173 length: proto.length as usize,
174 values: None,
175 }
176 }
177}
178
179impl From<&Dictionary> for pb::Dictionary {
180 fn from(d: &Dictionary) -> Self {
181 Self {
182 offset: d.offset as i64,
183 length: d.length as i64,
184 }
185 }
186}
187
188impl From<Encoding> for pb::Encoding {
189 fn from(e: Encoding) -> Self {
190 match e {
191 Encoding::Plain => Self::Plain,
192 Encoding::VarBinary => Self::VarBinary,
193 Encoding::Dictionary => Self::Dictionary,
194 Encoding::RLE => Self::Rle,
195 }
196 }
197}
198
199#[async_recursion]
200async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
201 if let DataType::Dictionary(_, value_type) = field.data_type() {
202 assert!(field.dictionary.is_some());
203 if let Some(dict_info) = field.dictionary.as_mut() {
204 use DataType::*;
205 match value_type.as_ref() {
206 _ if value_type.is_binary_like() => {
207 dict_info.values = Some(
208 read_binary_array(
209 reader,
210 value_type.as_ref(),
211 true, dict_info.offset,
213 dict_info.length,
214 ..,
215 )
216 .await?,
217 );
218 }
219 Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
220 dict_info.values = Some(
221 read_fixed_stride_array(
222 reader,
223 value_type.as_ref(),
224 dict_info.offset,
225 dict_info.length,
226 ..,
227 )
228 .await?,
229 );
230 }
231 _ => {
232 return Err(Error::Schema {
233 message: format!(
234 "Does not support {} as dictionary value type",
235 value_type
236 ),
237 location: location!(),
238 });
239 }
240 }
241 } else {
242 panic!("Should not reach here: dictionary field does not load dictionary info")
243 }
244 Ok(())
245 } else {
246 for child in field.children.as_mut_slice() {
247 load_field_dictionary(child, reader).await?;
248 }
249 Ok(())
250 }
251}
252
253pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
256 for field in schema.fields.as_mut_slice() {
257 load_field_dictionary(field, reader).await?;
258 }
259 Ok(())
260}
261
262#[cfg(test)]
263mod tests {
264 use std::collections::HashMap;
265
266 use arrow_schema::DataType;
267 use arrow_schema::Field as ArrowField;
268 use arrow_schema::Fields as ArrowFields;
269 use arrow_schema::Schema as ArrowSchema;
270 use lance_core::datatypes::Schema;
271
272 use crate::datatypes::Fields;
273 use crate::datatypes::FieldsWithMeta;
274
275 #[test]
276 fn test_schema_set_ids() {
277 let arrow_schema = ArrowSchema::new(vec![
278 ArrowField::new("a", DataType::Int32, false),
279 ArrowField::new(
280 "b",
281 DataType::Struct(ArrowFields::from(vec![
282 ArrowField::new("f1", DataType::Utf8, true),
283 ArrowField::new("f2", DataType::Boolean, false),
284 ArrowField::new("f3", DataType::Float32, false),
285 ])),
286 true,
287 ),
288 ArrowField::new("c", DataType::Float64, false),
289 ]);
290 let schema = Schema::try_from(&arrow_schema).unwrap();
291
292 let protos: Fields = (&schema).into();
293 assert_eq!(
294 protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
295 (0..6).collect::<Vec<_>>()
296 );
297 }
298
299 #[test]
300 fn test_schema_metadata() {
301 let mut metadata: HashMap<String, String> = HashMap::new();
302 metadata.insert(String::from("k1"), String::from("v1"));
303 metadata.insert(String::from("k2"), String::from("v2"));
304
305 let arrow_schema = ArrowSchema::new_with_metadata(
306 vec![ArrowField::new("a", DataType::Int32, false)],
307 metadata,
308 );
309
310 let expected_schema = Schema::try_from(&arrow_schema).unwrap();
311 let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
312
313 let schema = Schema::from(fields_with_meta);
314 assert_eq!(expected_schema, schema);
315 }
316}