1use arrow_schema::DataType;
5use async_recursion::async_recursion;
6use lance_arrow::DataTypeExt;
7use lance_arrow::ARROW_EXT_NAME_KEY;
8use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
9use lance_core::{Error, Result};
10use lance_io::traits::Reader;
11use lance_io::utils::{read_binary_array, read_fixed_stride_array};
12use snafu::location;
13use std::collections::HashMap;
14
15use crate::format::pb;
16
17#[allow(clippy::fallible_impl_from)]
18impl From<&pb::Field> for Field {
19 fn from(field: &pb::Field) -> Self {
20 let lance_metadata: HashMap<String, String> = field
21 .metadata
22 .iter()
23 .map(|(key, value)| {
24 let string_value = String::from_utf8_lossy(value).to_string();
25 (key.clone(), string_value)
26 })
27 .collect();
28 let mut lance_metadata = lance_metadata;
29 if !field.extension_name.is_empty() {
30 lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
31 }
32 Self {
33 name: field.name.clone(),
34 id: field.id,
35 parent_id: field.parent_id,
36 logical_type: LogicalType::from(field.logical_type.as_str()),
37 metadata: lance_metadata,
38 encoding: match field.encoding {
39 1 => Some(Encoding::Plain),
40 2 => Some(Encoding::VarBinary),
41 3 => Some(Encoding::Dictionary),
42 4 => Some(Encoding::RLE),
43 _ => None,
44 },
45 nullable: field.nullable,
46 children: vec![],
47 dictionary: field.dictionary.as_ref().map(Dictionary::from),
48 unenforced_primary_key_position: if field.unenforced_primary_key_position > 0 {
49 Some(field.unenforced_primary_key_position)
50 } else if field.unenforced_primary_key {
51 Some(0)
52 } else {
53 None
54 },
55 }
56 }
57}
58
59impl From<&Field> for pb::Field {
60 fn from(field: &Field) -> Self {
61 let pb_metadata = field
62 .metadata
63 .iter()
64 .map(|(key, value)| (key.clone(), value.clone().into_bytes()))
65 .collect();
66 Self {
67 id: field.id,
68 parent_id: field.parent_id,
69 name: field.name.clone(),
70 logical_type: field.logical_type.to_string(),
71 encoding: match field.encoding {
72 Some(Encoding::Plain) => 1,
73 Some(Encoding::VarBinary) => 2,
74 Some(Encoding::Dictionary) => 3,
75 Some(Encoding::RLE) => 4,
76 _ => 0,
77 },
78 nullable: field.nullable,
79 dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
80 metadata: pb_metadata,
81 extension_name: field
82 .extension_name()
83 .map(|name| name.to_owned())
84 .unwrap_or_default(),
85 r#type: 0,
86 unenforced_primary_key: field.unenforced_primary_key_position.is_some(),
87 unenforced_primary_key_position: field.unenforced_primary_key_position.unwrap_or(0),
88 }
89 }
90}
91
92pub struct Fields(pub Vec<pb::Field>);
93
94impl From<&Field> for Fields {
95 fn from(field: &Field) -> Self {
96 let mut protos = vec![pb::Field::from(field)];
97 protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
98 Self(protos)
99 }
100}
101
102impl From<&Fields> for Schema {
104 fn from(fields: &Fields) -> Self {
105 let mut schema = Self {
106 fields: vec![],
107 metadata: HashMap::default(),
108 };
109
110 fields.0.iter().for_each(|f| {
111 if f.parent_id == -1 {
112 schema.fields.push(Field::from(f));
113 } else {
114 let parent = schema.mut_field_by_id(f.parent_id).unwrap();
115 parent.children.push(Field::from(f));
116 }
117 });
118
119 schema
120 }
121}
122
123pub struct FieldsWithMeta {
124 pub fields: Fields,
125 pub metadata: HashMap<String, Vec<u8>>,
126}
127
128impl From<FieldsWithMeta> for Schema {
130 fn from(fields_with_meta: FieldsWithMeta) -> Self {
131 let lance_metadata = fields_with_meta
132 .metadata
133 .into_iter()
134 .map(|(key, value)| {
135 let string_value = String::from_utf8_lossy(&value).to_string();
136 (key, string_value)
137 })
138 .collect();
139
140 let schema_with_fields = Self::from(&fields_with_meta.fields);
141 Self {
142 fields: schema_with_fields.fields,
143 metadata: lance_metadata,
144 }
145 }
146}
147
148impl From<&Schema> for Fields {
150 fn from(schema: &Schema) -> Self {
151 let mut protos = vec![];
152 schema.fields.iter().for_each(|f| {
153 protos.extend(Self::from(f).0);
154 });
155 Self(protos)
156 }
157}
158
159impl From<&Schema> for FieldsWithMeta {
161 fn from(schema: &Schema) -> Self {
162 let fields = schema.into();
163 let metadata = schema
164 .metadata
165 .clone()
166 .into_iter()
167 .map(|(key, value)| (key, value.into_bytes()))
168 .collect();
169 Self { fields, metadata }
170 }
171}
172
173impl From<&pb::Dictionary> for Dictionary {
174 fn from(proto: &pb::Dictionary) -> Self {
175 Self {
176 offset: proto.offset as usize,
177 length: proto.length as usize,
178 values: None,
179 }
180 }
181}
182
183impl From<&Dictionary> for pb::Dictionary {
184 fn from(d: &Dictionary) -> Self {
185 Self {
186 offset: d.offset as i64,
187 length: d.length as i64,
188 }
189 }
190}
191
192impl From<Encoding> for pb::Encoding {
193 fn from(e: Encoding) -> Self {
194 match e {
195 Encoding::Plain => Self::Plain,
196 Encoding::VarBinary => Self::VarBinary,
197 Encoding::Dictionary => Self::Dictionary,
198 Encoding::RLE => Self::Rle,
199 }
200 }
201}
202
203#[async_recursion]
204async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
205 if let DataType::Dictionary(_, value_type) = field.data_type() {
206 assert!(field.dictionary.is_some());
207 if let Some(dict_info) = field.dictionary.as_mut() {
208 use DataType::*;
209 match value_type.as_ref() {
210 _ if value_type.is_binary_like() => {
211 dict_info.values = Some(
212 read_binary_array(
213 reader,
214 value_type.as_ref(),
215 true, dict_info.offset,
217 dict_info.length,
218 ..,
219 )
220 .await?,
221 );
222 }
223 Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
224 dict_info.values = Some(
225 read_fixed_stride_array(
226 reader,
227 value_type.as_ref(),
228 dict_info.offset,
229 dict_info.length,
230 ..,
231 )
232 .await?,
233 );
234 }
235 _ => {
236 return Err(Error::Schema {
237 message: format!(
238 "Does not support {} as dictionary value type",
239 value_type
240 ),
241 location: location!(),
242 });
243 }
244 }
245 } else {
246 panic!("Should not reach here: dictionary field does not load dictionary info")
247 }
248 Ok(())
249 } else {
250 for child in field.children.as_mut_slice() {
251 load_field_dictionary(child, reader).await?;
252 }
253 Ok(())
254 }
255}
256
257pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
260 for field in schema.fields.as_mut_slice() {
261 load_field_dictionary(field, reader).await?;
262 }
263 Ok(())
264}
265
266#[cfg(test)]
267mod tests {
268 use arrow_schema::DataType;
269 use arrow_schema::Field as ArrowField;
270 use arrow_schema::Fields as ArrowFields;
271 use arrow_schema::Schema as ArrowSchema;
272 use lance_core::datatypes::Schema;
273 use std::collections::HashMap;
274
275 use super::{Fields, FieldsWithMeta};
276
277 #[test]
278 fn test_schema_set_ids() {
279 let arrow_schema = ArrowSchema::new(vec![
280 ArrowField::new("a", DataType::Int32, false),
281 ArrowField::new(
282 "b",
283 DataType::Struct(ArrowFields::from(vec![
284 ArrowField::new("f1", DataType::Utf8, true),
285 ArrowField::new("f2", DataType::Boolean, false),
286 ArrowField::new("f3", DataType::Float32, false),
287 ])),
288 true,
289 ),
290 ArrowField::new("c", DataType::Float64, false),
291 ]);
292 let schema = Schema::try_from(&arrow_schema).unwrap();
293
294 let protos: Fields = (&schema).into();
295 assert_eq!(
296 protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
297 (0..6).collect::<Vec<_>>()
298 );
299 }
300
301 #[test]
302 fn test_schema_metadata() {
303 let mut metadata: HashMap<String, String> = HashMap::new();
304 metadata.insert(String::from("k1"), String::from("v1"));
305 metadata.insert(String::from("k2"), String::from("v2"));
306
307 let arrow_schema = ArrowSchema::new_with_metadata(
308 vec![ArrowField::new("a", DataType::Int32, false)],
309 metadata,
310 );
311
312 let expected_schema = Schema::try_from(&arrow_schema).unwrap();
313 let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
314
315 let schema = Schema::from(fields_with_meta);
316 assert_eq!(expected_schema, schema);
317 }
318}