Skip to main content

nodedb_sql/planner/
array_dml.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Planner for `INSERT INTO ARRAY` and `DELETE FROM ARRAY`.
4//!
5//! Validation against the catalog: array exists, coord arity matches
6//! dim count, attr arity matches attr count, type tags coerce. Type
7//! coercion is purposely loose at the SQL level — `Int → Float` is
8//! accepted, the converter performs the actual cast on the way to the
9//! engine's typed `CoordValue` / `CellValue`.
10
11use crate::catalog::{ArrayCatalogView, SqlCatalog};
12use crate::error::{Result, SqlError};
13use crate::parser::array_stmt::{DeleteArrayAst, InsertArrayAst};
14use crate::types::SqlPlan;
15use crate::types_array::{ArrayAttrLiteral, ArrayAttrType, ArrayCoordLiteral, ArrayDimType};
16
17pub fn plan_insert_array(ast: &InsertArrayAst, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
18    let view = catalog
19        .lookup_array(&ast.name)
20        .ok_or_else(|| SqlError::Parse {
21            detail: format!("INSERT INTO ARRAY {}: array not found", ast.name),
22        })?;
23    if ast.rows.is_empty() {
24        return Err(SqlError::Parse {
25            detail: format!("INSERT INTO ARRAY {}: at least one row required", ast.name),
26        });
27    }
28    for (ri, row) in ast.rows.iter().enumerate() {
29        validate_coords(&ast.name, ri, &row.coords, &view)?;
30        validate_attrs(&ast.name, ri, &row.attrs, &view)?;
31    }
32    Ok(vec![SqlPlan::InsertArray {
33        name: ast.name.clone(),
34        rows: ast.rows.clone(),
35    }])
36}
37
38pub fn plan_delete_array(ast: &DeleteArrayAst, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
39    let view = catalog
40        .lookup_array(&ast.name)
41        .ok_or_else(|| SqlError::Parse {
42            detail: format!("DELETE FROM ARRAY {}: array not found", ast.name),
43        })?;
44    if ast.coords.is_empty() {
45        return Err(SqlError::Parse {
46            detail: format!(
47                "DELETE FROM ARRAY {}: at least one coord tuple required",
48                ast.name
49            ),
50        });
51    }
52    for (ri, row) in ast.coords.iter().enumerate() {
53        validate_coords(&ast.name, ri, row, &view)?;
54    }
55    Ok(vec![SqlPlan::DeleteArray {
56        name: ast.name.clone(),
57        coords: ast.coords.clone(),
58    }])
59}
60
61fn validate_coords(
62    array: &str,
63    row: usize,
64    coords: &[ArrayCoordLiteral],
65    view: &ArrayCatalogView,
66) -> Result<()> {
67    if coords.len() != view.dims.len() {
68        return Err(SqlError::Parse {
69            detail: format!(
70                "ARRAY {array} row {row}: coord arity {} != dim count {}",
71                coords.len(),
72                view.dims.len()
73            ),
74        });
75    }
76    for (i, c) in coords.iter().enumerate() {
77        if !coord_compatible(c, view.dims[i].dtype) {
78            return Err(SqlError::TypeMismatch {
79                detail: format!(
80                    "ARRAY {array} row {row}: coord for dim `{}` (declared {:?}) is incompatible",
81                    view.dims[i].name, view.dims[i].dtype
82                ),
83            });
84        }
85    }
86    Ok(())
87}
88
89fn validate_attrs(
90    array: &str,
91    row: usize,
92    attrs: &[ArrayAttrLiteral],
93    view: &ArrayCatalogView,
94) -> Result<()> {
95    if attrs.len() != view.attrs.len() {
96        return Err(SqlError::Parse {
97            detail: format!(
98                "ARRAY {array} row {row}: attr arity {} != attr count {}",
99                attrs.len(),
100                view.attrs.len()
101            ),
102        });
103    }
104    for (i, a) in attrs.iter().enumerate() {
105        let spec = &view.attrs[i];
106        match a {
107            ArrayAttrLiteral::Null if !spec.nullable => {
108                return Err(SqlError::TypeMismatch {
109                    detail: format!("ARRAY {array} row {row}: attr `{}` is NOT NULL", spec.name),
110                });
111            }
112            ArrayAttrLiteral::Null => {}
113            other if !attr_compatible(other, spec.dtype) => {
114                return Err(SqlError::TypeMismatch {
115                    detail: format!(
116                        "ARRAY {array} row {row}: attr `{}` (declared {:?}) is incompatible",
117                        spec.name, spec.dtype
118                    ),
119                });
120            }
121            _ => {}
122        }
123    }
124    Ok(())
125}
126
127fn coord_compatible(c: &ArrayCoordLiteral, dtype: ArrayDimType) -> bool {
128    matches!(
129        (c, dtype),
130        (ArrayCoordLiteral::Int64(_), ArrayDimType::Int64)
131            | (ArrayCoordLiteral::Int64(_), ArrayDimType::TimestampMs)
132            | (ArrayCoordLiteral::Int64(_), ArrayDimType::Float64)
133            | (ArrayCoordLiteral::Float64(_), ArrayDimType::Float64)
134            | (ArrayCoordLiteral::String(_), ArrayDimType::String)
135    )
136}
137
138fn attr_compatible(a: &ArrayAttrLiteral, dtype: ArrayAttrType) -> bool {
139    matches!(
140        (a, dtype),
141        (ArrayAttrLiteral::Int64(_), ArrayAttrType::Int64)
142            | (ArrayAttrLiteral::Int64(_), ArrayAttrType::Float64)
143            | (ArrayAttrLiteral::Float64(_), ArrayAttrType::Float64)
144            | (ArrayAttrLiteral::String(_), ArrayAttrType::String)
145            | (ArrayAttrLiteral::Bytes(_), ArrayAttrType::Bytes)
146    )
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::catalog::SqlCatalogError;
153    use crate::types::CollectionInfo;
154    use crate::types_array::{ArrayAttrAst, ArrayDimAst};
155
156    struct StubCatalog {
157        view: Option<ArrayCatalogView>,
158    }
159    impl SqlCatalog for StubCatalog {
160        fn get_collection(
161            &self,
162            _: nodedb_types::DatabaseId,
163            _: &str,
164        ) -> std::result::Result<Option<CollectionInfo>, SqlCatalogError> {
165            Ok(None)
166        }
167        fn lookup_array(&self, _: &str) -> Option<ArrayCatalogView> {
168            self.view.clone()
169        }
170    }
171
172    fn view_2d() -> ArrayCatalogView {
173        ArrayCatalogView {
174            name: "g".into(),
175            dims: vec![
176                ArrayDimAst {
177                    name: "chrom".into(),
178                    dtype: ArrayDimType::Int64,
179                    lo: crate::types_array::ArrayDomainBound::Int64(1),
180                    hi: crate::types_array::ArrayDomainBound::Int64(23),
181                },
182                ArrayDimAst {
183                    name: "pos".into(),
184                    dtype: ArrayDimType::Int64,
185                    lo: crate::types_array::ArrayDomainBound::Int64(0),
186                    hi: crate::types_array::ArrayDomainBound::Int64(10_000_000),
187                },
188            ],
189            attrs: vec![ArrayAttrAst {
190                name: "v".into(),
191                dtype: ArrayAttrType::Float64,
192                nullable: true,
193            }],
194            tile_extents: vec![1, 1_000_000],
195        }
196    }
197
198    #[test]
199    fn insert_unknown_array() {
200        let cat = StubCatalog { view: None };
201        let ast = InsertArrayAst {
202            name: "g".into(),
203            rows: vec![],
204        };
205        assert!(plan_insert_array(&ast, &cat).is_err());
206    }
207
208    #[test]
209    fn insert_arity_mismatch_rejected() {
210        let cat = StubCatalog {
211            view: Some(view_2d()),
212        };
213        let ast = InsertArrayAst {
214            name: "g".into(),
215            rows: vec![crate::types_array::ArrayInsertRow {
216                coords: vec![ArrayCoordLiteral::Int64(1)],
217                attrs: vec![ArrayAttrLiteral::Float64(1.0)],
218            }],
219        };
220        assert!(plan_insert_array(&ast, &cat).is_err());
221    }
222
223    #[test]
224    fn insert_happy() {
225        let cat = StubCatalog {
226            view: Some(view_2d()),
227        };
228        let ast = InsertArrayAst {
229            name: "g".into(),
230            rows: vec![crate::types_array::ArrayInsertRow {
231                coords: vec![ArrayCoordLiteral::Int64(1), ArrayCoordLiteral::Int64(100)],
232                attrs: vec![ArrayAttrLiteral::Float64(99.5)],
233            }],
234        };
235        let plans = plan_insert_array(&ast, &cat).unwrap();
236        assert_eq!(plans.len(), 1);
237        assert!(matches!(plans[0], SqlPlan::InsertArray { .. }));
238    }
239
240    #[test]
241    fn delete_happy() {
242        let cat = StubCatalog {
243            view: Some(view_2d()),
244        };
245        let ast = DeleteArrayAst {
246            name: "g".into(),
247            coords: vec![vec![
248                ArrayCoordLiteral::Int64(1),
249                ArrayCoordLiteral::Int64(100),
250            ]],
251        };
252        let plans = plan_delete_array(&ast, &cat).unwrap();
253        assert_eq!(plans.len(), 1);
254        assert!(matches!(plans[0], SqlPlan::DeleteArray { .. }));
255    }
256}