Skip to main content

nodedb_sql/planner/
array_ddl.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Planner for `CREATE ARRAY` and `DROP ARRAY`.
4//!
5//! Validation is engine-agnostic: name non-empty, dims non-empty, attrs
6//! non-empty, `tile_extents.len() == dims.len()`, no duplicate dim/attr
7//! names, and per-dim domain bounds are well formed (lo <= hi). The
8//! schema-hash + the typed `ArraySchema` are computed in the Origin
9//! converter where `nodedb-array` is available.
10
11use std::collections::HashSet;
12
13use crate::error::{Result, SqlError};
14use crate::parser::array_stmt::{AlterArrayAst, CreateArrayAst, DropArrayAst};
15use crate::types::SqlPlan;
16use crate::types_array::{ArrayDimType, ArrayDomainBound};
17
18pub fn plan_create_array(ast: &CreateArrayAst) -> Result<SqlPlan> {
19    if ast.name.is_empty() {
20        return Err(SqlError::Parse {
21            detail: "CREATE ARRAY: array name must not be empty".into(),
22        });
23    }
24    if ast.dims.is_empty() {
25        return Err(SqlError::Parse {
26            detail: format!("CREATE ARRAY {}: at least one dim is required", ast.name),
27        });
28    }
29    if ast.attrs.is_empty() {
30        return Err(SqlError::Parse {
31            detail: format!("CREATE ARRAY {}: at least one attr is required", ast.name),
32        });
33    }
34    if ast.tile_extents.len() != ast.dims.len() {
35        return Err(SqlError::Parse {
36            detail: format!(
37                "CREATE ARRAY {}: TILE_EXTENTS arity {} != DIMS arity {}",
38                ast.name,
39                ast.tile_extents.len(),
40                ast.dims.len()
41            ),
42        });
43    }
44    let mut seen_dims: HashSet<&str> = HashSet::new();
45    for d in &ast.dims {
46        if d.name.is_empty() {
47            return Err(SqlError::Parse {
48                detail: format!("CREATE ARRAY {}: empty dim name", ast.name),
49            });
50        }
51        if !seen_dims.insert(d.name.as_str()) {
52            return Err(SqlError::Parse {
53                detail: format!("CREATE ARRAY {}: duplicate dim name `{}`", ast.name, d.name),
54            });
55        }
56        if !domain_bounds_match_type(&d.lo, &d.hi, d.dtype) {
57            return Err(SqlError::Parse {
58                detail: format!(
59                    "CREATE ARRAY {}: domain bounds for dim `{}` do not match dim type",
60                    ast.name, d.name
61                ),
62            });
63        }
64        if !domain_lo_le_hi(&d.lo, &d.hi) {
65            return Err(SqlError::Parse {
66                detail: format!("CREATE ARRAY {}: dim `{}` lo > hi", ast.name, d.name),
67            });
68        }
69    }
70    let mut seen_attrs: HashSet<&str> = HashSet::new();
71    for a in &ast.attrs {
72        if a.name.is_empty() {
73            return Err(SqlError::Parse {
74                detail: format!("CREATE ARRAY {}: empty attr name", ast.name),
75            });
76        }
77        if !seen_attrs.insert(a.name.as_str()) {
78            return Err(SqlError::Parse {
79                detail: format!(
80                    "CREATE ARRAY {}: duplicate attr name `{}`",
81                    ast.name, a.name
82                ),
83            });
84        }
85    }
86    for (i, te) in ast.tile_extents.iter().enumerate() {
87        if *te <= 0 {
88            return Err(SqlError::Parse {
89                detail: format!(
90                    "CREATE ARRAY {}: tile extent for dim `{}` must be > 0 (got {te})",
91                    ast.name, ast.dims[i].name
92                ),
93            });
94        }
95    }
96
97    if !(1..=16).contains(&ast.prefix_bits) {
98        return Err(SqlError::Parse {
99            detail: format!(
100                "CREATE ARRAY {}: prefix_bits {} is not in range 1–16",
101                ast.name, ast.prefix_bits
102            ),
103        });
104    }
105    Ok(SqlPlan::CreateArray {
106        name: ast.name.clone(),
107        dims: ast.dims.clone(),
108        attrs: ast.attrs.clone(),
109        tile_extents: ast.tile_extents.clone(),
110        cell_order: ast.cell_order,
111        tile_order: ast.tile_order,
112        prefix_bits: ast.prefix_bits,
113        audit_retain_ms: ast.audit_retain_ms,
114        minimum_audit_retain_ms: ast.minimum_audit_retain_ms,
115    })
116}
117
118pub fn plan_alter_array(ast: &AlterArrayAst) -> Result<SqlPlan> {
119    if ast.name.is_empty() {
120        return Err(SqlError::Parse {
121            detail: "ALTER ARRAY: array name must not be empty".into(),
122        });
123    }
124    if ast.set.is_empty() {
125        return Err(SqlError::Parse {
126            detail: format!("ALTER ARRAY {}: SET clause is empty", ast.name),
127        });
128    }
129
130    let mut audit_retain_ms: Option<Option<i64>> = None;
131    let mut minimum_audit_retain_ms: Option<u64> = None;
132
133    for (key, value) in &ast.set {
134        match key.as_str() {
135            "audit_retain_ms" => {
136                audit_retain_ms = Some(*value);
137            }
138            "minimum_audit_retain_ms" => {
139                let n = value.ok_or_else(|| SqlError::Parse {
140                    detail: format!(
141                        "ALTER ARRAY {}: minimum_audit_retain_ms cannot be NULL",
142                        ast.name
143                    ),
144                })?;
145                minimum_audit_retain_ms = Some(n as u64);
146            }
147            other => {
148                return Err(SqlError::Parse {
149                    detail: format!(
150                        "ALTER ARRAY {}: unknown SET key `{other}`; \
151                         expected `audit_retain_ms` or `minimum_audit_retain_ms`",
152                        ast.name
153                    ),
154                });
155            }
156        }
157    }
158
159    Ok(SqlPlan::AlterArray {
160        name: ast.name.clone(),
161        audit_retain_ms,
162        minimum_audit_retain_ms,
163    })
164}
165
166pub fn plan_drop_array(ast: &DropArrayAst) -> Result<SqlPlan> {
167    if ast.name.is_empty() {
168        return Err(SqlError::Parse {
169            detail: "DROP ARRAY: name must not be empty".into(),
170        });
171    }
172    Ok(SqlPlan::DropArray {
173        name: ast.name.clone(),
174        if_exists: ast.if_exists,
175    })
176}
177
178fn domain_bounds_match_type(
179    lo: &ArrayDomainBound,
180    hi: &ArrayDomainBound,
181    dtype: ArrayDimType,
182) -> bool {
183    matches!(
184        (lo, hi, dtype),
185        (
186            ArrayDomainBound::Int64(_),
187            ArrayDomainBound::Int64(_),
188            ArrayDimType::Int64
189        ) | (
190            ArrayDomainBound::Float64(_),
191            ArrayDomainBound::Float64(_),
192            ArrayDimType::Float64
193        ) | (
194            ArrayDomainBound::TimestampMs(_),
195            ArrayDomainBound::TimestampMs(_),
196            ArrayDimType::TimestampMs,
197        ) | (
198            ArrayDomainBound::String(_),
199            ArrayDomainBound::String(_),
200            ArrayDimType::String
201        )
202    )
203}
204
205fn domain_lo_le_hi(lo: &ArrayDomainBound, hi: &ArrayDomainBound) -> bool {
206    match (lo, hi) {
207        (ArrayDomainBound::Int64(a), ArrayDomainBound::Int64(b)) => a <= b,
208        (ArrayDomainBound::Float64(a), ArrayDomainBound::Float64(b)) => a <= b,
209        (ArrayDomainBound::TimestampMs(a), ArrayDomainBound::TimestampMs(b)) => a <= b,
210        (ArrayDomainBound::String(a), ArrayDomainBound::String(b)) => a <= b,
211        _ => false,
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::types_array::{ArrayAttrAst, ArrayAttrType, ArrayDimAst};
219
220    fn ok_ast() -> CreateArrayAst {
221        CreateArrayAst {
222            name: "g".into(),
223            dims: vec![ArrayDimAst {
224                name: "x".into(),
225                dtype: ArrayDimType::Int64,
226                lo: ArrayDomainBound::Int64(0),
227                hi: ArrayDomainBound::Int64(10),
228            }],
229            attrs: vec![ArrayAttrAst {
230                name: "v".into(),
231                dtype: ArrayAttrType::Int64,
232                nullable: false,
233            }],
234            tile_extents: vec![4],
235            cell_order: Default::default(),
236            tile_order: Default::default(),
237            prefix_bits: 8,
238            audit_retain_ms: None,
239            minimum_audit_retain_ms: None,
240        }
241    }
242
243    #[test]
244    fn happy_path() {
245        let plan = plan_create_array(&ok_ast()).unwrap();
246        assert!(matches!(plan, SqlPlan::CreateArray { .. }));
247    }
248
249    #[test]
250    fn rejects_extent_arity() {
251        let mut a = ok_ast();
252        a.tile_extents = vec![1, 2];
253        assert!(plan_create_array(&a).is_err());
254    }
255
256    #[test]
257    fn rejects_dup_dim() {
258        let mut a = ok_ast();
259        a.dims.push(a.dims[0].clone());
260        a.tile_extents = vec![4, 4];
261        assert!(plan_create_array(&a).is_err());
262    }
263
264    #[test]
265    fn rejects_lo_gt_hi() {
266        let mut a = ok_ast();
267        a.dims[0].lo = ArrayDomainBound::Int64(5);
268        a.dims[0].hi = ArrayDomainBound::Int64(1);
269        assert!(plan_create_array(&a).is_err());
270    }
271}