1use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::ARROW_EXT_NAME_KEY;
7use lance_arrow::DataTypeExt;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use std::collections::HashMap;
13
14use crate::format::pb;
15
16#[allow(clippy::fallible_impl_from)]
17impl From<&pb::Field> for Field {
18 fn from(field: &pb::Field) -> Self {
19 let lance_metadata: HashMap<String, String> = field
20 .metadata
21 .iter()
22 .map(|(key, value)| {
23 let string_value = String::from_utf8_lossy(value).to_string();
24 (key.clone(), string_value)
25 })
26 .collect();
27 let mut lance_metadata = lance_metadata;
28 if !field.extension_name.is_empty() {
29 lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
30 }
31 Self {
32 name: field.name.clone(),
33 id: field.id,
34 parent_id: field.parent_id,
35 logical_type: LogicalType::from(field.logical_type.as_str()),
36 metadata: lance_metadata,
37 encoding: match field.encoding {
38 1 => Some(Encoding::Plain),
39 2 => Some(Encoding::VarBinary),
40 3 => Some(Encoding::Dictionary),
41 4 => Some(Encoding::RLE),
42 _ => None,
43 },
44 nullable: field.nullable,
45 children: vec![],
46 dictionary: field.dictionary.as_ref().map(Dictionary::from),
47 unenforced_primary_key_position: if field.unenforced_primary_key_position > 0 {
48 Some(field.unenforced_primary_key_position)
49 } else if field.unenforced_primary_key {
50 Some(0)
51 } else {
52 None
53 },
54 unenforced_clustering_key_position: if field.unenforced_clustering_key_position > 0 {
55 Some(field.unenforced_clustering_key_position)
56 } else {
57 None
58 },
59 }
60 }
61}
62
63impl From<&Field> for pb::Field {
64 fn from(field: &Field) -> Self {
65 let pb_metadata = field
66 .metadata
67 .iter()
68 .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
69 .collect();
70 Self {
71 id: field.id,
72 parent_id: field.parent_id,
73 name: field.name.clone(),
74 logical_type: field.logical_type.to_string(),
75 encoding: match field.encoding {
76 Some(Encoding::Plain) => 1,
77 Some(Encoding::VarBinary) => 2,
78 Some(Encoding::Dictionary) => 3,
79 Some(Encoding::RLE) => 4,
80 _ => 0,
81 },
82 nullable: field.nullable,
83 dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
84 metadata: pb_metadata,
85 extension_name: field
86 .extension_name()
87 .map(|name| name.to_owned())
88 .unwrap_or_default(),
89 r#type: 0,
90 unenforced_primary_key: field.unenforced_primary_key_position.is_some(),
91 unenforced_primary_key_position: field.unenforced_primary_key_position.unwrap_or(0),
92 unenforced_clustering_key: false,
93 unenforced_clustering_key_position: field
94 .unenforced_clustering_key_position
95 .unwrap_or(0),
96 }
97 }
98}
99
100pub struct Fields(pub Vec<pb::Field>);
101
102impl From<&Field> for Fields {
103 fn from(field: &Field) -> Self {
104 let mut protos = vec![pb::Field::from(field)];
105 protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
106 Self(protos)
107 }
108}
109
110impl From<&Fields> for Schema {
112 fn from(fields: &Fields) -> Self {
113 let mut schema = Self {
114 fields: vec![],
115 metadata: HashMap::default(),
116 };
117
118 fields.0.iter().for_each(|f| {
119 if f.parent_id == -1 {
120 schema.fields.push(Field::from(f));
121 } else {
122 let parent = schema.mut_field_by_id(f.parent_id).unwrap();
123 parent.children.push(Field::from(f));
124 }
125 });
126
127 schema
128 }
129}
130
131pub struct FieldsWithMeta {
132 pub fields: Fields,
133 pub metadata: HashMap<String, Vec<u8>>,
134}
135
136impl From<FieldsWithMeta> for Schema {
138 fn from(fields_with_meta: FieldsWithMeta) -> Self {
139 let lance_metadata = fields_with_meta
140 .metadata
141 .into_iter()
142 .map(|(key, value)| {
143 let string_value = String::from_utf8_lossy(&value).to_string();
144 (key, string_value)
145 })
146 .collect();
147
148 let schema_with_fields = Self::from(&fields_with_meta.fields);
149 Self {
150 fields: schema_with_fields.fields,
151 metadata: lance_metadata,
152 }
153 }
154}
155
156impl From<&Schema> for Fields {
158 fn from(schema: &Schema) -> Self {
159 let mut protos = vec![];
160 schema.fields.iter().for_each(|f| {
161 protos.extend(Self::from(f).0);
162 });
163 Self(protos)
164 }
165}
166
167impl From<&Schema> for FieldsWithMeta {
169 fn from(schema: &Schema) -> Self {
170 let fields = schema.into();
171 let metadata = schema
172 .metadata
173 .clone()
174 .into_iter()
175 .map(|(key, value)| (key, value.into_bytes()))
176 .collect();
177 Self { fields, metadata }
178 }
179}
180
181impl From<&pb::Dictionary> for Dictionary {
182 fn from(proto: &pb::Dictionary) -> Self {
183 Self {
184 offset: proto.offset as usize,
185 length: proto.length as usize,
186 values: None,
187 }
188 }
189}
190
191impl From<&Dictionary> for pb::Dictionary {
192 fn from(d: &Dictionary) -> Self {
193 Self {
194 offset: d.offset as i64,
195 length: d.length as i64,
196 }
197 }
198}
199
200impl From<Encoding> for pb::Encoding {
201 fn from(e: Encoding) -> Self {
202 match e {
203 Encoding::Plain => Self::Plain,
204 Encoding::VarBinary => Self::VarBinary,
205 Encoding::Dictionary => Self::Dictionary,
206 Encoding::RLE => Self::Rle,
207 }
208 }
209}
210
211#[async_recursion]
212async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
213 if let DataType::Dictionary(_, value_type) = field.data_type() {
214 assert!(field.dictionary.is_some());
215 if let Some(dict_info) = field.dictionary.as_mut() {
216 use DataType::*;
217 match value_type.as_ref() {
218 _ if value_type.is_binary_like() => {
219 dict_info.values = Some(
220 read_binary_array(
221 reader,
222 value_type.as_ref(),
223 true, dict_info.offset,
225 dict_info.length,
226 ..,
227 )
228 .await?,
229 );
230 }
231 Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
232 dict_info.values = Some(
233 read_fixed_stride_array(
234 reader,
235 value_type.as_ref(),
236 dict_info.offset,
237 dict_info.length,
238 ..,
239 )
240 .await?,
241 );
242 }
243 _ => {
244 return Err(Error::schema(format!(
245 "Does not support {} as dictionary value type",
246 value_type
247 )));
248 }
249 }
250 } else {
251 panic!("Should not reach here: dictionary field does not load dictionary info")
252 }
253 Ok(())
254 } else {
255 for child in field.children.as_mut_slice() {
256 load_field_dictionary(child, reader).await?;
257 }
258 Ok(())
259 }
260}
261
262pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
265 for field in schema.fields.as_mut_slice() {
266 load_field_dictionary(field, reader).await?;
267 }
268 Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273 use arrow_schema::DataType;
274 use arrow_schema::Field as ArrowField;
275 use arrow_schema::Fields as ArrowFields;
276 use arrow_schema::Schema as ArrowSchema;
277 use lance_core::datatypes::Schema;
278 use std::collections::HashMap;
279
280 use super::{Fields, FieldsWithMeta};
281
282 #[test]
283 fn test_schema_set_ids() {
284 let arrow_schema = ArrowSchema::new(vec![
285 ArrowField::new("a", DataType::Int32, false),
286 ArrowField::new(
287 "b",
288 DataType::Struct(ArrowFields::from(vec![
289 ArrowField::new("f1", DataType::Utf8, true),
290 ArrowField::new("f2", DataType::Boolean, false),
291 ArrowField::new("f3", DataType::Float32, false),
292 ])),
293 true,
294 ),
295 ArrowField::new("c", DataType::Float64, false),
296 ]);
297 let schema = Schema::try_from(&arrow_schema).unwrap();
298
299 let protos: Fields = (&schema).into();
300 assert_eq!(
301 protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
302 (0..6).collect::<Vec<_>>()
303 );
304 }
305
306 #[test]
307 fn test_schema_metadata() {
308 let mut metadata: HashMap<String, String> = HashMap::new();
309 metadata.insert(String::from("k1"), String::from("v1"));
310 metadata.insert(String::from("k2"), String::from("v2"));
311
312 let arrow_schema = ArrowSchema::new_with_metadata(
313 vec![ArrowField::new("a", DataType::Int32, false)],
314 metadata,
315 );
316
317 let expected_schema = Schema::try_from(&arrow_schema).unwrap();
318 let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
319
320 let schema = Schema::from(fields_with_meta);
321 assert_eq!(expected_schema, schema);
322 }
323
324 #[test]
325 fn test_clustering_key_roundtrip() {
326 let arrow_schema = ArrowSchema::new(vec![
327 ArrowField::new("region", DataType::Utf8, true).with_metadata(
328 vec![(
329 "lance-schema:unenforced-clustering-key:position".to_owned(),
330 "1".to_owned(),
331 )]
332 .into_iter()
333 .collect::<HashMap<_, _>>(),
334 ),
335 ArrowField::new("date", DataType::Int32, false).with_metadata(
336 vec![(
337 "lance-schema:unenforced-clustering-key:position".to_owned(),
338 "2".to_owned(),
339 )]
340 .into_iter()
341 .collect::<HashMap<_, _>>(),
342 ),
343 ArrowField::new("value", DataType::Float64, true),
344 ]);
345
346 let schema = Schema::try_from(&arrow_schema).unwrap();
347 let ck = schema.unenforced_clustering_key();
348 assert_eq!(ck.len(), 2);
349 assert_eq!(ck[0].name, "region");
350 assert_eq!(ck[1].name, "date");
351
352 let fields_with_meta: FieldsWithMeta = (&schema).into();
354 let restored = Schema::from(fields_with_meta);
355
356 let ck2 = restored.unenforced_clustering_key();
357 assert_eq!(ck2.len(), 2);
358 assert_eq!(ck2[0].name, "region");
359 assert_eq!(ck2[1].name, "date");
360 assert_eq!(ck2[0].unenforced_clustering_key_position, Some(1));
361 assert_eq!(ck2[1].unenforced_clustering_key_position, Some(2));
362
363 let value_field = restored.field("value").unwrap();
365 assert!(!value_field.is_unenforced_clustering_key());
366 }
367}