1use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::ARROW_EXT_NAME_KEY;
7use lance_arrow::DataTypeExt;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use std::collections::HashMap;
13
14use crate::format::pb;
15
16#[allow(clippy::fallible_impl_from)]
17impl From<&pb::Field> for Field {
18 fn from(field: &pb::Field) -> Self {
19 let lance_metadata: HashMap<String, String> = field
20 .metadata
21 .iter()
22 .map(|(key, value)| {
23 let string_value = String::from_utf8_lossy(value).to_string();
24 (key.clone(), string_value)
25 })
26 .collect();
27 let mut lance_metadata = lance_metadata;
28 if !field.extension_name.is_empty() {
29 lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
30 }
31 Self {
32 name: field.name.clone(),
33 id: field.id,
34 parent_id: field.parent_id,
35 logical_type: LogicalType::from(field.logical_type.as_str()),
36 metadata: lance_metadata,
37 encoding: match field.encoding {
38 1 => Some(Encoding::Plain),
39 2 => Some(Encoding::VarBinary),
40 3 => Some(Encoding::Dictionary),
41 4 => Some(Encoding::RLE),
42 _ => None,
43 },
44 nullable: field.nullable,
45 children: vec![],
46 dictionary: field.dictionary.as_ref().map(Dictionary::from),
47 unenforced_primary_key_position: if field.unenforced_primary_key_position > 0 {
48 Some(field.unenforced_primary_key_position)
49 } else if field.unenforced_primary_key {
50 Some(0)
51 } else {
52 None
53 },
54 }
55 }
56}
57
58impl From<&Field> for pb::Field {
59 fn from(field: &Field) -> Self {
60 let pb_metadata = field
61 .metadata
62 .iter()
63 .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
64 .collect();
65 Self {
66 id: field.id,
67 parent_id: field.parent_id,
68 name: field.name.clone(),
69 logical_type: field.logical_type.to_string(),
70 encoding: match field.encoding {
71 Some(Encoding::Plain) => 1,
72 Some(Encoding::VarBinary) => 2,
73 Some(Encoding::Dictionary) => 3,
74 Some(Encoding::RLE) => 4,
75 _ => 0,
76 },
77 nullable: field.nullable,
78 dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
79 metadata: pb_metadata,
80 extension_name: field
81 .extension_name()
82 .map(|name| name.to_owned())
83 .unwrap_or_default(),
84 r#type: 0,
85 unenforced_primary_key: field.unenforced_primary_key_position.is_some(),
86 unenforced_primary_key_position: field.unenforced_primary_key_position.unwrap_or(0),
87 }
88 }
89}
90
91pub struct Fields(pub Vec<pb::Field>);
92
93impl From<&Field> for Fields {
94 fn from(field: &Field) -> Self {
95 let mut protos = vec![pb::Field::from(field)];
96 protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
97 Self(protos)
98 }
99}
100
101impl From<&Fields> for Schema {
103 fn from(fields: &Fields) -> Self {
104 let mut schema = Self {
105 fields: vec![],
106 metadata: HashMap::default(),
107 };
108
109 fields.0.iter().for_each(|f| {
110 if f.parent_id == -1 {
111 schema.fields.push(Field::from(f));
112 } else {
113 let parent = schema.mut_field_by_id(f.parent_id).unwrap();
114 parent.children.push(Field::from(f));
115 }
116 });
117
118 schema
119 }
120}
121
122pub struct FieldsWithMeta {
123 pub fields: Fields,
124 pub metadata: HashMap<String, Vec<u8>>,
125}
126
127impl From<FieldsWithMeta> for Schema {
129 fn from(fields_with_meta: FieldsWithMeta) -> Self {
130 let lance_metadata = fields_with_meta
131 .metadata
132 .into_iter()
133 .map(|(key, value)| {
134 let string_value = String::from_utf8_lossy(&value).to_string();
135 (key, string_value)
136 })
137 .collect();
138
139 let schema_with_fields = Self::from(&fields_with_meta.fields);
140 Self {
141 fields: schema_with_fields.fields,
142 metadata: lance_metadata,
143 }
144 }
145}
146
147impl From<&Schema> for Fields {
149 fn from(schema: &Schema) -> Self {
150 let mut protos = vec![];
151 schema.fields.iter().for_each(|f| {
152 protos.extend(Self::from(f).0);
153 });
154 Self(protos)
155 }
156}
157
158impl From<&Schema> for FieldsWithMeta {
160 fn from(schema: &Schema) -> Self {
161 let fields = schema.into();
162 let metadata = schema
163 .metadata
164 .clone()
165 .into_iter()
166 .map(|(key, value)| (key, value.into_bytes()))
167 .collect();
168 Self { fields, metadata }
169 }
170}
171
172impl From<&pb::Dictionary> for Dictionary {
173 fn from(proto: &pb::Dictionary) -> Self {
174 Self {
175 offset: proto.offset as usize,
176 length: proto.length as usize,
177 values: None,
178 }
179 }
180}
181
182impl From<&Dictionary> for pb::Dictionary {
183 fn from(d: &Dictionary) -> Self {
184 Self {
185 offset: d.offset as i64,
186 length: d.length as i64,
187 }
188 }
189}
190
191impl From<Encoding> for pb::Encoding {
192 fn from(e: Encoding) -> Self {
193 match e {
194 Encoding::Plain => Self::Plain,
195 Encoding::VarBinary => Self::VarBinary,
196 Encoding::Dictionary => Self::Dictionary,
197 Encoding::RLE => Self::Rle,
198 }
199 }
200}
201
202#[async_recursion]
203async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
204 if let DataType::Dictionary(_, value_type) = field.data_type() {
205 assert!(field.dictionary.is_some());
206 if let Some(dict_info) = field.dictionary.as_mut() {
207 use DataType::*;
208 match value_type.as_ref() {
209 _ if value_type.is_binary_like() => {
210 dict_info.values = Some(
211 read_binary_array(
212 reader,
213 value_type.as_ref(),
214 true, dict_info.offset,
216 dict_info.length,
217 ..,
218 )
219 .await?,
220 );
221 }
222 Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
223 dict_info.values = Some(
224 read_fixed_stride_array(
225 reader,
226 value_type.as_ref(),
227 dict_info.offset,
228 dict_info.length,
229 ..,
230 )
231 .await?,
232 );
233 }
234 _ => {
235 return Err(Error::schema(format!(
236 "Does not support {} as dictionary value type",
237 value_type
238 )));
239 }
240 }
241 } else {
242 panic!("Should not reach here: dictionary field does not load dictionary info")
243 }
244 Ok(())
245 } else {
246 for child in field.children.as_mut_slice() {
247 load_field_dictionary(child, reader).await?;
248 }
249 Ok(())
250 }
251}
252
253pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
256 for field in schema.fields.as_mut_slice() {
257 load_field_dictionary(field, reader).await?;
258 }
259 Ok(())
260}
261
262#[cfg(test)]
263mod tests {
264 use arrow_schema::DataType;
265 use arrow_schema::Field as ArrowField;
266 use arrow_schema::Fields as ArrowFields;
267 use arrow_schema::Schema as ArrowSchema;
268 use lance_core::datatypes::Schema;
269 use std::collections::HashMap;
270
271 use super::{Fields, FieldsWithMeta};
272
273 #[test]
274 fn test_schema_set_ids() {
275 let arrow_schema = ArrowSchema::new(vec![
276 ArrowField::new("a", DataType::Int32, false),
277 ArrowField::new(
278 "b",
279 DataType::Struct(ArrowFields::from(vec![
280 ArrowField::new("f1", DataType::Utf8, true),
281 ArrowField::new("f2", DataType::Boolean, false),
282 ArrowField::new("f3", DataType::Float32, false),
283 ])),
284 true,
285 ),
286 ArrowField::new("c", DataType::Float64, false),
287 ]);
288 let schema = Schema::try_from(&arrow_schema).unwrap();
289
290 let protos: Fields = (&schema).into();
291 assert_eq!(
292 protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
293 (0..6).collect::<Vec<_>>()
294 );
295 }
296
297 #[test]
298 fn test_schema_metadata() {
299 let mut metadata: HashMap<String, String> = HashMap::new();
300 metadata.insert(String::from("k1"), String::from("v1"));
301 metadata.insert(String::from("k2"), String::from("v2"));
302
303 let arrow_schema = ArrowSchema::new_with_metadata(
304 vec![ArrowField::new("a", DataType::Int32, false)],
305 metadata,
306 );
307
308 let expected_schema = Schema::try_from(&arrow_schema).unwrap();
309 let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
310
311 let schema = Schema::from(fields_with_meta);
312 assert_eq!(expected_schema, schema);
313 }
314}