1use crate::catalog::{ArrayCatalogView, SqlCatalog};
12use crate::error::{Result, SqlError};
13use crate::parser::array_stmt::{DeleteArrayAst, InsertArrayAst};
14use crate::types::SqlPlan;
15use crate::types_array::{ArrayAttrLiteral, ArrayAttrType, ArrayCoordLiteral, ArrayDimType};
16
17pub fn plan_insert_array(ast: &InsertArrayAst, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
18 let view = catalog
19 .lookup_array(&ast.name)
20 .ok_or_else(|| SqlError::Parse {
21 detail: format!("INSERT INTO ARRAY {}: array not found", ast.name),
22 })?;
23 if ast.rows.is_empty() {
24 return Err(SqlError::Parse {
25 detail: format!("INSERT INTO ARRAY {}: at least one row required", ast.name),
26 });
27 }
28 for (ri, row) in ast.rows.iter().enumerate() {
29 validate_coords(&ast.name, ri, &row.coords, &view)?;
30 validate_attrs(&ast.name, ri, &row.attrs, &view)?;
31 }
32 Ok(vec![SqlPlan::InsertArray {
33 name: ast.name.clone(),
34 rows: ast.rows.clone(),
35 }])
36}
37
38pub fn plan_delete_array(ast: &DeleteArrayAst, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
39 let view = catalog
40 .lookup_array(&ast.name)
41 .ok_or_else(|| SqlError::Parse {
42 detail: format!("DELETE FROM ARRAY {}: array not found", ast.name),
43 })?;
44 if ast.coords.is_empty() {
45 return Err(SqlError::Parse {
46 detail: format!(
47 "DELETE FROM ARRAY {}: at least one coord tuple required",
48 ast.name
49 ),
50 });
51 }
52 for (ri, row) in ast.coords.iter().enumerate() {
53 validate_coords(&ast.name, ri, row, &view)?;
54 }
55 Ok(vec![SqlPlan::DeleteArray {
56 name: ast.name.clone(),
57 coords: ast.coords.clone(),
58 }])
59}
60
61fn validate_coords(
62 array: &str,
63 row: usize,
64 coords: &[ArrayCoordLiteral],
65 view: &ArrayCatalogView,
66) -> Result<()> {
67 if coords.len() != view.dims.len() {
68 return Err(SqlError::Parse {
69 detail: format!(
70 "ARRAY {array} row {row}: coord arity {} != dim count {}",
71 coords.len(),
72 view.dims.len()
73 ),
74 });
75 }
76 for (i, c) in coords.iter().enumerate() {
77 if !coord_compatible(c, view.dims[i].dtype) {
78 return Err(SqlError::TypeMismatch {
79 detail: format!(
80 "ARRAY {array} row {row}: coord for dim `{}` (declared {:?}) is incompatible",
81 view.dims[i].name, view.dims[i].dtype
82 ),
83 });
84 }
85 }
86 Ok(())
87}
88
89fn validate_attrs(
90 array: &str,
91 row: usize,
92 attrs: &[ArrayAttrLiteral],
93 view: &ArrayCatalogView,
94) -> Result<()> {
95 if attrs.len() != view.attrs.len() {
96 return Err(SqlError::Parse {
97 detail: format!(
98 "ARRAY {array} row {row}: attr arity {} != attr count {}",
99 attrs.len(),
100 view.attrs.len()
101 ),
102 });
103 }
104 for (i, a) in attrs.iter().enumerate() {
105 let spec = &view.attrs[i];
106 match a {
107 ArrayAttrLiteral::Null if !spec.nullable => {
108 return Err(SqlError::TypeMismatch {
109 detail: format!("ARRAY {array} row {row}: attr `{}` is NOT NULL", spec.name),
110 });
111 }
112 ArrayAttrLiteral::Null => {}
113 other if !attr_compatible(other, spec.dtype) => {
114 return Err(SqlError::TypeMismatch {
115 detail: format!(
116 "ARRAY {array} row {row}: attr `{}` (declared {:?}) is incompatible",
117 spec.name, spec.dtype
118 ),
119 });
120 }
121 _ => {}
122 }
123 }
124 Ok(())
125}
126
127fn coord_compatible(c: &ArrayCoordLiteral, dtype: ArrayDimType) -> bool {
128 matches!(
129 (c, dtype),
130 (ArrayCoordLiteral::Int64(_), ArrayDimType::Int64)
131 | (ArrayCoordLiteral::Int64(_), ArrayDimType::TimestampMs)
132 | (ArrayCoordLiteral::Int64(_), ArrayDimType::Float64)
133 | (ArrayCoordLiteral::Float64(_), ArrayDimType::Float64)
134 | (ArrayCoordLiteral::String(_), ArrayDimType::String)
135 )
136}
137
138fn attr_compatible(a: &ArrayAttrLiteral, dtype: ArrayAttrType) -> bool {
139 matches!(
140 (a, dtype),
141 (ArrayAttrLiteral::Int64(_), ArrayAttrType::Int64)
142 | (ArrayAttrLiteral::Int64(_), ArrayAttrType::Float64)
143 | (ArrayAttrLiteral::Float64(_), ArrayAttrType::Float64)
144 | (ArrayAttrLiteral::String(_), ArrayAttrType::String)
145 | (ArrayAttrLiteral::Bytes(_), ArrayAttrType::Bytes)
146 )
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::catalog::SqlCatalogError;
153 use crate::types::CollectionInfo;
154 use crate::types_array::{ArrayAttrAst, ArrayDimAst};
155
156 struct StubCatalog {
157 view: Option<ArrayCatalogView>,
158 }
159 impl SqlCatalog for StubCatalog {
160 fn get_collection(
161 &self,
162 _: nodedb_types::DatabaseId,
163 _: &str,
164 ) -> std::result::Result<Option<CollectionInfo>, SqlCatalogError> {
165 Ok(None)
166 }
167 fn lookup_array(&self, _: &str) -> Option<ArrayCatalogView> {
168 self.view.clone()
169 }
170 }
171
172 fn view_2d() -> ArrayCatalogView {
173 ArrayCatalogView {
174 name: "g".into(),
175 dims: vec![
176 ArrayDimAst {
177 name: "chrom".into(),
178 dtype: ArrayDimType::Int64,
179 lo: crate::types_array::ArrayDomainBound::Int64(1),
180 hi: crate::types_array::ArrayDomainBound::Int64(23),
181 },
182 ArrayDimAst {
183 name: "pos".into(),
184 dtype: ArrayDimType::Int64,
185 lo: crate::types_array::ArrayDomainBound::Int64(0),
186 hi: crate::types_array::ArrayDomainBound::Int64(10_000_000),
187 },
188 ],
189 attrs: vec![ArrayAttrAst {
190 name: "v".into(),
191 dtype: ArrayAttrType::Float64,
192 nullable: true,
193 }],
194 tile_extents: vec![1, 1_000_000],
195 }
196 }
197
198 #[test]
199 fn insert_unknown_array() {
200 let cat = StubCatalog { view: None };
201 let ast = InsertArrayAst {
202 name: "g".into(),
203 rows: vec![],
204 };
205 assert!(plan_insert_array(&ast, &cat).is_err());
206 }
207
208 #[test]
209 fn insert_arity_mismatch_rejected() {
210 let cat = StubCatalog {
211 view: Some(view_2d()),
212 };
213 let ast = InsertArrayAst {
214 name: "g".into(),
215 rows: vec![crate::types_array::ArrayInsertRow {
216 coords: vec![ArrayCoordLiteral::Int64(1)],
217 attrs: vec![ArrayAttrLiteral::Float64(1.0)],
218 }],
219 };
220 assert!(plan_insert_array(&ast, &cat).is_err());
221 }
222
223 #[test]
224 fn insert_happy() {
225 let cat = StubCatalog {
226 view: Some(view_2d()),
227 };
228 let ast = InsertArrayAst {
229 name: "g".into(),
230 rows: vec![crate::types_array::ArrayInsertRow {
231 coords: vec![ArrayCoordLiteral::Int64(1), ArrayCoordLiteral::Int64(100)],
232 attrs: vec![ArrayAttrLiteral::Float64(99.5)],
233 }],
234 };
235 let plans = plan_insert_array(&ast, &cat).unwrap();
236 assert_eq!(plans.len(), 1);
237 assert!(matches!(plans[0], SqlPlan::InsertArray { .. }));
238 }
239
240 #[test]
241 fn delete_happy() {
242 let cat = StubCatalog {
243 view: Some(view_2d()),
244 };
245 let ast = DeleteArrayAst {
246 name: "g".into(),
247 coords: vec![vec![
248 ArrayCoordLiteral::Int64(1),
249 ArrayCoordLiteral::Int64(100),
250 ]],
251 };
252 let plans = plan_delete_array(&ast, &cat).unwrap();
253 assert_eq!(plans.len(), 1);
254 assert!(matches!(plans[0], SqlPlan::DeleteArray { .. }));
255 }
256}