typed_arrow_dyn/
rows.rs

1//! Dynamic row wrapper.
2
3use arrow_schema::{DataType, Field, UnionFields};
4
5use crate::{DynError, cell::DynCell, dyn_builder::DynColumnBuilder};
6
7/// A thin row wrapper used to append into a set of dynamic column builders.
8pub struct DynRow(pub Vec<Option<DynCell>>);
9
10impl DynRow {
11    /// Append this row into the builders (1:1 by index).
12    /// Returns an error if the number of cells does not match the number of
13    /// columns, or if any cell fails type validation for the target column.
14    ///
15    /// # Errors
16    /// Returns a `DynError` for arity mismatches or type/builder errors while appending.
17    pub fn append_into(self, cols: &mut [Box<dyn DynColumnBuilder>]) -> Result<(), DynError> {
18        // 1) Validate arity
19        if self.0.len() != cols.len() {
20            return Err(DynError::ArityMismatch {
21                expected: cols.len(),
22                got: self.0.len(),
23            });
24        }
25
26        // 2) Lightweight pre-validation to avoid partial writes when possible.
27        // Only validate type compatibility here; Arrow enforces nullability at finish.
28        for (i, (cell_opt, b)) in self.0.iter().zip(cols.iter()).enumerate() {
29            match cell_opt {
30                None => {}
31                Some(cell) => {
32                    let dt = b.data_type();
33                    if !accepts_cell(dt, cell) {
34                        return Err(DynError::TypeMismatch {
35                            col: i,
36                            expected: dt.clone(),
37                        });
38                    }
39                }
40            }
41        }
42
43        // 3) Perform the actual appends
44        let mut cells = self.0.into_iter();
45        for (i, b) in cols.iter_mut().enumerate() {
46            match cells.next() {
47                // End of iterator should be impossible due to arity check
48                None => unreachable!("cells length pre-checked to match columns"),
49                Some(None) => b.append_null(),
50                Some(Some(v)) => {
51                    b.append_dyn(v).map_err(|e| e.at_col(i))?;
52                }
53            }
54        }
55        Ok(())
56    }
57
58    /// Append this row into the builders using field metadata to enrich errors.
59    ///
60    /// Use this from `DynBuilders` so type mismatches can report column names
61    /// and expected vs found types.
62    ///
63    /// # Errors
64    /// Returns a `DynError` for arity mismatches or type/builder errors while appending.
65    pub fn append_into_with_fields(
66        self,
67        fields: &arrow_schema::Fields,
68        cols: &mut [Box<dyn DynColumnBuilder>],
69    ) -> Result<(), DynError> {
70        // 1) Validate arity
71        if self.0.len() != cols.len() {
72            return Err(DynError::ArityMismatch {
73                expected: cols.len(),
74                got: self.0.len(),
75            });
76        }
77
78        // 2) Pre-validate types to avoid partial writes
79        for (i, (cell_opt, b)) in self.0.iter().zip(cols.iter()).enumerate() {
80            if let Some(cell) = cell_opt {
81                let dt = b.data_type();
82                if let Err(message) = validate_cell_against_field(dt, cell) {
83                    let name = fields.get(i).map_or("?", |f| f.name().as_str());
84                    return Err(DynError::Append {
85                        col: i,
86                        message: format!("{} at column '{}'", message, name),
87                    });
88                }
89            }
90        }
91
92        // 3) Perform the actual appends
93        let mut cells = self.0.into_iter();
94        for (i, b) in cols.iter_mut().enumerate() {
95            match cells.next() {
96                None => unreachable!("cells length pre-checked to match columns"),
97                Some(None) => b.append_null(),
98                Some(Some(v)) => {
99                    b.append_dyn(v).map_err(|e| e.at_col(i))?;
100                }
101            }
102        }
103        Ok(())
104    }
105}
106
107#[allow(clippy::match_same_arms)]
108fn accepts_cell(dt: &DataType, cell: &DynCell) -> bool {
109    match (dt, cell) {
110        (_, DynCell::Null) => true,
111        (DataType::Boolean, DynCell::Bool(_)) => true,
112        (DataType::Int8, DynCell::I8(_)) => true,
113        (DataType::Int16, DynCell::I16(_)) => true,
114        (DataType::Int32, DynCell::I32(_)) => true,
115        (DataType::Int64, DynCell::I64(_)) => true,
116        (DataType::UInt8, DynCell::U8(_)) => true,
117        (DataType::UInt16, DynCell::U16(_)) => true,
118        (DataType::UInt32, DynCell::U32(_)) => true,
119        (DataType::UInt64, DynCell::U64(_)) => true,
120        (DataType::Float32, DynCell::F32(_)) => true,
121        (DataType::Float64, DynCell::F64(_)) => true,
122        (DataType::Date32, DynCell::I32(_)) => true,
123        (DataType::Date64, DynCell::I64(_)) => true,
124        (DataType::Timestamp(_, _), DynCell::I64(_)) => true,
125        (DataType::Time32(_), DynCell::I32(_)) => true,
126        (DataType::Time64(_), DynCell::I64(_)) => true,
127        (DataType::Duration(_), DynCell::I64(_)) => true,
128        (DataType::Utf8, DynCell::Str(_)) => true,
129        (DataType::Binary, DynCell::Bin(_)) => true,
130        (DataType::Struct(_), DynCell::Struct(_)) => true,
131        (DataType::List(_), DynCell::List(_)) => true,
132        (DataType::LargeList(_), DynCell::List(_)) => true,
133        (DataType::FixedSizeList(_, _), DynCell::FixedSizeList(_)) => true,
134        (DataType::Map(entry_field, _), DynCell::Map(entries)) => {
135            let DataType::Struct(entry_fields) = entry_field.data_type() else {
136                return false;
137            };
138            if entry_fields.len() != 2 {
139                return false;
140            }
141            let Some(key_field) = entry_fields.first() else {
142                return false;
143            };
144            let Some(value_field) = entry_fields.get(1) else {
145                return false;
146            };
147            entries.iter().all(|(key_cell, value_cell)| {
148                if matches!(key_cell, DynCell::Null) {
149                    return false;
150                }
151                if !accepts_cell(key_field.data_type(), key_cell) {
152                    return false;
153                }
154                match value_cell {
155                    Some(cell) => accepts_cell(value_field.data_type(), cell),
156                    None => true,
157                }
158            })
159        }
160        (DataType::Union(fields, _), DynCell::Union { type_id, value }) => {
161            let field = fields
162                .iter()
163                .find_map(|(tag, field)| if tag == *type_id { Some(field) } else { None });
164            match field {
165                None => false,
166                Some(field) => match value.as_deref() {
167                    None => true,
168                    Some(inner) => accepts_cell(field.data_type(), inner),
169                },
170            }
171        }
172        // Dictionary value-side validation (key width irrelevant here).
173        (DataType::Dictionary(_, value), c) => match &**value {
174            DataType::Utf8 | DataType::LargeUtf8 => matches!(c, DynCell::Str(_)),
175            DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
176                matches!(c, DynCell::Bin(_))
177            }
178            // Primitive dictionary values
179            DataType::Int8 => matches!(c, DynCell::I8(_)),
180            DataType::Int16 => matches!(c, DynCell::I16(_)),
181            DataType::Int32 => matches!(c, DynCell::I32(_)),
182            DataType::Int64 => matches!(c, DynCell::I64(_)),
183            DataType::UInt8 => matches!(c, DynCell::U8(_)),
184            DataType::UInt16 => matches!(c, DynCell::U16(_)),
185            DataType::UInt32 => matches!(c, DynCell::U32(_)),
186            DataType::UInt64 => matches!(c, DynCell::U64(_)),
187            DataType::Float32 => matches!(c, DynCell::F32(_)),
188            DataType::Float64 => matches!(c, DynCell::F64(_)),
189            _ => false,
190        },
191        _ => false,
192    }
193}
194
195fn validate_map_cell(cell: &DynCell, entry_field: &Field) -> Result<(), String> {
196    let entries = match cell {
197        DynCell::Map(entries) => entries,
198        other => return Err(format!("expected map value, found {}", other.type_name())),
199    };
200
201    let DataType::Struct(children) = entry_field.data_type() else {
202        return Err("map entry field is not a struct".to_string());
203    };
204    if children.len() != 2 {
205        return Err(format!(
206            "map entry struct must have 2 fields (keys, values), found {}",
207            children.len()
208        ));
209    }
210
211    let key_field = &children[0];
212    let value_field = &children[1];
213    let value_nullable = value_field.is_nullable();
214
215    for (idx, (key_cell, value_cell)) in entries.iter().enumerate() {
216        if matches!(key_cell, DynCell::Null) {
217            return Err(format!("entry {} has a null map key", idx));
218        }
219        if !accepts_cell(key_field.data_type(), key_cell) {
220            return Err(format!(
221                "map key {} expected {:?}, found {}",
222                idx,
223                key_field.data_type(),
224                key_cell.type_name()
225            ));
226        }
227
228        match value_cell {
229            None => {
230                if !value_nullable {
231                    return Err(format!(
232                        "map value {} is null but '{}' is not nullable",
233                        idx,
234                        value_field.name()
235                    ));
236                }
237            }
238            Some(DynCell::Null) => {
239                if !value_nullable {
240                    return Err(format!(
241                        "map value {} is null but '{}' is not nullable",
242                        idx,
243                        value_field.name()
244                    ));
245                }
246            }
247            Some(inner) => {
248                if !accepts_cell(value_field.data_type(), inner) {
249                    return Err(format!(
250                        "map value {} expected {:?}, found {}",
251                        idx,
252                        value_field.data_type(),
253                        inner.type_name()
254                    ));
255                }
256            }
257        }
258    }
259    Ok(())
260}
261
262fn validate_union_cell(cell: &DynCell, fields: &UnionFields) -> Result<(), String> {
263    let DynCell::Union { type_id, value } = cell else {
264        return Err(format!("expected union value, found {}", cell.type_name()));
265    };
266
267    let Some(field) = fields
268        .iter()
269        .find_map(|(tag, field)| if tag == *type_id { Some(field) } else { None })
270    else {
271        return Err(format!("union value uses unknown type id {}", type_id));
272    };
273
274    match value.as_deref() {
275        None => Ok(()),
276        Some(inner) => validate_cell_against_field(field.data_type(), inner),
277    }
278}
279
280fn validate_cell_against_field(dt: &DataType, cell: &DynCell) -> Result<(), String> {
281    match dt {
282        DataType::Map(entry_field, _) => validate_map_cell(cell, entry_field.as_ref()),
283        DataType::Union(fields, _) => validate_union_cell(cell, fields),
284        _ if accepts_cell(dt, cell) => Ok(()),
285        _ => Err(format!(
286            "type mismatch: expected {:?}, found {}",
287            dt,
288            cell.type_name()
289        )),
290    }
291}