1use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::DataTypeExt;
7use lance_arrow::ARROW_EXT_NAME_KEY;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use snafu::location;
13use std::collections::HashMap;
14
15use crate::format::pb;
16
17#[allow(clippy::fallible_impl_from)]
18impl From<&pb::Field> for Field {
19 fn from(field: &pb::Field) -> Self {
20 let lance_metadata: HashMap<String, String> = field
21 .metadata
22 .iter()
23 .map(|(key, value)| {
24 let string_value = String::from_utf8_lossy(value).to_string();
25 (key.clone(), string_value)
26 })
27 .collect();
28 let mut lance_metadata = lance_metadata;
29 if !field.extension_name.is_empty() {
30 lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
31 }
32 Self {
33 name: field.name.clone(),
34 id: field.id,
35 parent_id: field.parent_id,
36 logical_type: LogicalType::from(field.logical_type.as_str()),
37 metadata: lance_metadata,
38 encoding: match field.encoding {
39 1 => Some(Encoding::Plain),
40 2 => Some(Encoding::VarBinary),
41 3 => Some(Encoding::Dictionary),
42 4 => Some(Encoding::RLE),
43 _ => None,
44 },
45 nullable: field.nullable,
46 children: vec![],
47 dictionary: field.dictionary.as_ref().map(Dictionary::from),
48 unenforced_primary_key: field.unenforced_primary_key,
49 }
50 }
51}
52
53impl From<&Field> for pb::Field {
54 fn from(field: &Field) -> Self {
55 let pb_metadata = field
56 .metadata
57 .iter()
58 .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
59 .collect();
60 Self {
61 id: field.id,
62 parent_id: field.parent_id,
63 name: field.name.clone(),
64 logical_type: field.logical_type.to_string(),
65 encoding: match field.encoding {
66 Some(Encoding::Plain) => 1,
67 Some(Encoding::VarBinary) => 2,
68 Some(Encoding::Dictionary) => 3,
69 Some(Encoding::RLE) => 4,
70 _ => 0,
71 },
72 nullable: field.nullable,
73 dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
74 metadata: pb_metadata,
75 extension_name: field
76 .extension_name()
77 .map(|name| name.to_owned())
78 .unwrap_or_default(),
79 r#type: 0,
80 unenforced_primary_key: field.unenforced_primary_key,
81 }
82 }
83}
84
85pub struct Fields(pub Vec<pb::Field>);
86
87impl From<&Field> for Fields {
88 fn from(field: &Field) -> Self {
89 let mut protos = vec![pb::Field::from(field)];
90 protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
91 Self(protos)
92 }
93}
94
95impl From<&Fields> for Schema {
97 fn from(fields: &Fields) -> Self {
98 let mut schema = Self {
99 fields: vec![],
100 metadata: HashMap::default(),
101 };
102
103 fields.0.iter().for_each(|f| {
104 if f.parent_id == -1 {
105 schema.fields.push(Field::from(f));
106 } else {
107 let parent = schema.mut_field_by_id(f.parent_id).unwrap();
108 parent.children.push(Field::from(f));
109 }
110 });
111
112 schema
113 }
114}
115
116pub struct FieldsWithMeta {
117 pub fields: Fields,
118 pub metadata: HashMap<String, Vec<u8>>,
119}
120
121impl From<FieldsWithMeta> for Schema {
123 fn from(fields_with_meta: FieldsWithMeta) -> Self {
124 let lance_metadata = fields_with_meta
125 .metadata
126 .into_iter()
127 .map(|(key, value)| {
128 let string_value = String::from_utf8_lossy(&value).to_string();
129 (key, string_value)
130 })
131 .collect();
132
133 let schema_with_fields = Self::from(&fields_with_meta.fields);
134 Self {
135 fields: schema_with_fields.fields,
136 metadata: lance_metadata,
137 }
138 }
139}
140
141impl From<&Schema> for Fields {
143 fn from(schema: &Schema) -> Self {
144 let mut protos = vec![];
145 schema.fields.iter().for_each(|f| {
146 protos.extend(Self::from(f).0);
147 });
148 Self(protos)
149 }
150}
151
152impl From<&Schema> for FieldsWithMeta {
154 fn from(schema: &Schema) -> Self {
155 let fields = schema.into();
156 let metadata = schema
157 .metadata
158 .clone()
159 .into_iter()
160 .map(|(key, value)| (key, value.into_bytes()))
161 .collect();
162 Self { fields, metadata }
163 }
164}
165
166impl From<&pb::Dictionary> for Dictionary {
167 fn from(proto: &pb::Dictionary) -> Self {
168 Self {
169 offset: proto.offset as usize,
170 length: proto.length as usize,
171 values: None,
172 }
173 }
174}
175
176impl From<&Dictionary> for pb::Dictionary {
177 fn from(d: &Dictionary) -> Self {
178 Self {
179 offset: d.offset as i64,
180 length: d.length as i64,
181 }
182 }
183}
184
185impl From<Encoding> for pb::Encoding {
186 fn from(e: Encoding) -> Self {
187 match e {
188 Encoding::Plain => Self::Plain,
189 Encoding::VarBinary => Self::VarBinary,
190 Encoding::Dictionary => Self::Dictionary,
191 Encoding::RLE => Self::Rle,
192 }
193 }
194}
195
196#[async_recursion]
197async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
198 if let DataType::Dictionary(_, value_type) = field.data_type() {
199 assert!(field.dictionary.is_some());
200 if let Some(dict_info) = field.dictionary.as_mut() {
201 use DataType::*;
202 match value_type.as_ref() {
203 _ if value_type.is_binary_like() => {
204 dict_info.values = Some(
205 read_binary_array(
206 reader,
207 value_type.as_ref(),
208 true, dict_info.offset,
210 dict_info.length,
211 ..,
212 )
213 .await?,
214 );
215 }
216 Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
217 dict_info.values = Some(
218 read_fixed_stride_array(
219 reader,
220 value_type.as_ref(),
221 dict_info.offset,
222 dict_info.length,
223 ..,
224 )
225 .await?,
226 );
227 }
228 _ => {
229 return Err(Error::Schema {
230 message: format!(
231 "Does not support {} as dictionary value type",
232 value_type
233 ),
234 location: location!(),
235 });
236 }
237 }
238 } else {
239 panic!("Should not reach here: dictionary field does not load dictionary info")
240 }
241 Ok(())
242 } else {
243 for child in field.children.as_mut_slice() {
244 load_field_dictionary(child, reader).await?;
245 }
246 Ok(())
247 }
248}
249
250pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
253 for field in schema.fields.as_mut_slice() {
254 load_field_dictionary(field, reader).await?;
255 }
256 Ok(())
257}
258
259#[cfg(test)]
260mod tests {
261 use arrow_schema::DataType;
262 use arrow_schema::Field as ArrowField;
263 use arrow_schema::Fields as ArrowFields;
264 use arrow_schema::Schema as ArrowSchema;
265 use lance_core::datatypes::Schema;
266 use std::collections::HashMap;
267
268 use super::{Fields, FieldsWithMeta};
269
270 #[test]
271 fn test_schema_set_ids() {
272 let arrow_schema = ArrowSchema::new(vec![
273 ArrowField::new("a", DataType::Int32, false),
274 ArrowField::new(
275 "b",
276 DataType::Struct(ArrowFields::from(vec![
277 ArrowField::new("f1", DataType::Utf8, true),
278 ArrowField::new("f2", DataType::Boolean, false),
279 ArrowField::new("f3", DataType::Float32, false),
280 ])),
281 true,
282 ),
283 ArrowField::new("c", DataType::Float64, false),
284 ]);
285 let schema = Schema::try_from(&arrow_schema).unwrap();
286
287 let protos: Fields = (&schema).into();
288 assert_eq!(
289 protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
290 (0..6).collect::<Vec<_>>()
291 );
292 }
293
294 #[test]
295 fn test_schema_metadata() {
296 let mut metadata: HashMap<String, String> = HashMap::new();
297 metadata.insert(String::from("k1"), String::from("v1"));
298 metadata.insert(String::from("k2"), String::from("v2"));
299
300 let arrow_schema = ArrowSchema::new_with_metadata(
301 vec![ArrowField::new("a", DataType::Int32, false)],
302 metadata,
303 );
304
305 let expected_schema = Schema::try_from(&arrow_schema).unwrap();
306 let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
307
308 let schema = Schema::from(fields_with_meta);
309 assert_eq!(expected_schema, schema);
310 }
311}