1use arrow_schema::{DataType, Field, UnionFields};
4
5use crate::{DynError, cell::DynCell, dyn_builder::DynColumnBuilder};
6
7pub struct DynRow(pub Vec<Option<DynCell>>);
9
10impl DynRow {
11 pub fn append_into(self, cols: &mut [Box<dyn DynColumnBuilder>]) -> Result<(), DynError> {
18 if self.0.len() != cols.len() {
20 return Err(DynError::ArityMismatch {
21 expected: cols.len(),
22 got: self.0.len(),
23 });
24 }
25
26 for (i, (cell_opt, b)) in self.0.iter().zip(cols.iter()).enumerate() {
29 match cell_opt {
30 None => {}
31 Some(cell) => {
32 let dt = b.data_type();
33 if !accepts_cell(dt, cell) {
34 return Err(DynError::TypeMismatch {
35 col: i,
36 expected: dt.clone(),
37 });
38 }
39 }
40 }
41 }
42
43 let mut cells = self.0.into_iter();
45 for (i, b) in cols.iter_mut().enumerate() {
46 match cells.next() {
47 None => unreachable!("cells length pre-checked to match columns"),
49 Some(None) => b.append_null(),
50 Some(Some(v)) => {
51 b.append_dyn(v).map_err(|e| e.at_col(i))?;
52 }
53 }
54 }
55 Ok(())
56 }
57
58 pub fn append_into_with_fields(
66 self,
67 fields: &arrow_schema::Fields,
68 cols: &mut [Box<dyn DynColumnBuilder>],
69 ) -> Result<(), DynError> {
70 if self.0.len() != cols.len() {
72 return Err(DynError::ArityMismatch {
73 expected: cols.len(),
74 got: self.0.len(),
75 });
76 }
77
78 for (i, (cell_opt, b)) in self.0.iter().zip(cols.iter()).enumerate() {
80 if let Some(cell) = cell_opt {
81 let dt = b.data_type();
82 if let Err(message) = validate_cell_against_field(dt, cell) {
83 let name = fields.get(i).map_or("?", |f| f.name().as_str());
84 return Err(DynError::Append {
85 col: i,
86 message: format!("{} at column '{}'", message, name),
87 });
88 }
89 }
90 }
91
92 let mut cells = self.0.into_iter();
94 for (i, b) in cols.iter_mut().enumerate() {
95 match cells.next() {
96 None => unreachable!("cells length pre-checked to match columns"),
97 Some(None) => b.append_null(),
98 Some(Some(v)) => {
99 b.append_dyn(v).map_err(|e| e.at_col(i))?;
100 }
101 }
102 }
103 Ok(())
104 }
105}
106
107#[allow(clippy::match_same_arms)]
108fn accepts_cell(dt: &DataType, cell: &DynCell) -> bool {
109 match (dt, cell) {
110 (_, DynCell::Null) => true,
111 (DataType::Boolean, DynCell::Bool(_)) => true,
112 (DataType::Int8, DynCell::I8(_)) => true,
113 (DataType::Int16, DynCell::I16(_)) => true,
114 (DataType::Int32, DynCell::I32(_)) => true,
115 (DataType::Int64, DynCell::I64(_)) => true,
116 (DataType::UInt8, DynCell::U8(_)) => true,
117 (DataType::UInt16, DynCell::U16(_)) => true,
118 (DataType::UInt32, DynCell::U32(_)) => true,
119 (DataType::UInt64, DynCell::U64(_)) => true,
120 (DataType::Float32, DynCell::F32(_)) => true,
121 (DataType::Float64, DynCell::F64(_)) => true,
122 (DataType::Date32, DynCell::I32(_)) => true,
123 (DataType::Date64, DynCell::I64(_)) => true,
124 (DataType::Timestamp(_, _), DynCell::I64(_)) => true,
125 (DataType::Time32(_), DynCell::I32(_)) => true,
126 (DataType::Time64(_), DynCell::I64(_)) => true,
127 (DataType::Duration(_), DynCell::I64(_)) => true,
128 (DataType::Utf8, DynCell::Str(_)) => true,
129 (DataType::Binary, DynCell::Bin(_)) => true,
130 (DataType::Struct(_), DynCell::Struct(_)) => true,
131 (DataType::List(_), DynCell::List(_)) => true,
132 (DataType::LargeList(_), DynCell::List(_)) => true,
133 (DataType::FixedSizeList(_, _), DynCell::FixedSizeList(_)) => true,
134 (DataType::Map(entry_field, _), DynCell::Map(entries)) => {
135 let DataType::Struct(entry_fields) = entry_field.data_type() else {
136 return false;
137 };
138 if entry_fields.len() != 2 {
139 return false;
140 }
141 let Some(key_field) = entry_fields.first() else {
142 return false;
143 };
144 let Some(value_field) = entry_fields.get(1) else {
145 return false;
146 };
147 entries.iter().all(|(key_cell, value_cell)| {
148 if matches!(key_cell, DynCell::Null) {
149 return false;
150 }
151 if !accepts_cell(key_field.data_type(), key_cell) {
152 return false;
153 }
154 match value_cell {
155 Some(cell) => accepts_cell(value_field.data_type(), cell),
156 None => true,
157 }
158 })
159 }
160 (DataType::Union(fields, _), DynCell::Union { type_id, value }) => {
161 let field = fields
162 .iter()
163 .find_map(|(tag, field)| if tag == *type_id { Some(field) } else { None });
164 match field {
165 None => false,
166 Some(field) => match value.as_deref() {
167 None => true,
168 Some(inner) => accepts_cell(field.data_type(), inner),
169 },
170 }
171 }
172 (DataType::Dictionary(_, value), c) => match &**value {
174 DataType::Utf8 | DataType::LargeUtf8 => matches!(c, DynCell::Str(_)),
175 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
176 matches!(c, DynCell::Bin(_))
177 }
178 DataType::Int8 => matches!(c, DynCell::I8(_)),
180 DataType::Int16 => matches!(c, DynCell::I16(_)),
181 DataType::Int32 => matches!(c, DynCell::I32(_)),
182 DataType::Int64 => matches!(c, DynCell::I64(_)),
183 DataType::UInt8 => matches!(c, DynCell::U8(_)),
184 DataType::UInt16 => matches!(c, DynCell::U16(_)),
185 DataType::UInt32 => matches!(c, DynCell::U32(_)),
186 DataType::UInt64 => matches!(c, DynCell::U64(_)),
187 DataType::Float32 => matches!(c, DynCell::F32(_)),
188 DataType::Float64 => matches!(c, DynCell::F64(_)),
189 _ => false,
190 },
191 _ => false,
192 }
193}
194
195fn validate_map_cell(cell: &DynCell, entry_field: &Field) -> Result<(), String> {
196 let entries = match cell {
197 DynCell::Map(entries) => entries,
198 other => return Err(format!("expected map value, found {}", other.type_name())),
199 };
200
201 let DataType::Struct(children) = entry_field.data_type() else {
202 return Err("map entry field is not a struct".to_string());
203 };
204 if children.len() != 2 {
205 return Err(format!(
206 "map entry struct must have 2 fields (keys, values), found {}",
207 children.len()
208 ));
209 }
210
211 let key_field = &children[0];
212 let value_field = &children[1];
213 let value_nullable = value_field.is_nullable();
214
215 for (idx, (key_cell, value_cell)) in entries.iter().enumerate() {
216 if matches!(key_cell, DynCell::Null) {
217 return Err(format!("entry {} has a null map key", idx));
218 }
219 if !accepts_cell(key_field.data_type(), key_cell) {
220 return Err(format!(
221 "map key {} expected {:?}, found {}",
222 idx,
223 key_field.data_type(),
224 key_cell.type_name()
225 ));
226 }
227
228 match value_cell {
229 None => {
230 if !value_nullable {
231 return Err(format!(
232 "map value {} is null but '{}' is not nullable",
233 idx,
234 value_field.name()
235 ));
236 }
237 }
238 Some(DynCell::Null) => {
239 if !value_nullable {
240 return Err(format!(
241 "map value {} is null but '{}' is not nullable",
242 idx,
243 value_field.name()
244 ));
245 }
246 }
247 Some(inner) => {
248 if !accepts_cell(value_field.data_type(), inner) {
249 return Err(format!(
250 "map value {} expected {:?}, found {}",
251 idx,
252 value_field.data_type(),
253 inner.type_name()
254 ));
255 }
256 }
257 }
258 }
259 Ok(())
260}
261
262fn validate_union_cell(cell: &DynCell, fields: &UnionFields) -> Result<(), String> {
263 let DynCell::Union { type_id, value } = cell else {
264 return Err(format!("expected union value, found {}", cell.type_name()));
265 };
266
267 let Some(field) = fields
268 .iter()
269 .find_map(|(tag, field)| if tag == *type_id { Some(field) } else { None })
270 else {
271 return Err(format!("union value uses unknown type id {}", type_id));
272 };
273
274 match value.as_deref() {
275 None => Ok(()),
276 Some(inner) => validate_cell_against_field(field.data_type(), inner),
277 }
278}
279
280fn validate_cell_against_field(dt: &DataType, cell: &DynCell) -> Result<(), String> {
281 match dt {
282 DataType::Map(entry_field, _) => validate_map_cell(cell, entry_field.as_ref()),
283 DataType::Union(fields, _) => validate_union_cell(cell, fields),
284 _ if accepts_cell(dt, cell) => Ok(()),
285 _ => Err(format!(
286 "type mismatch: expected {:?}, found {}",
287 dt,
288 cell.type_name()
289 )),
290 }
291}