1use std::collections::HashSet;
12
13use crate::error::{Result, SqlError};
14use crate::parser::array_stmt::{AlterArrayAst, CreateArrayAst, DropArrayAst};
15use crate::types::SqlPlan;
16use crate::types_array::{ArrayDimType, ArrayDomainBound};
17
18pub fn plan_create_array(ast: &CreateArrayAst) -> Result<SqlPlan> {
19 if ast.name.is_empty() {
20 return Err(SqlError::Parse {
21 detail: "CREATE ARRAY: array name must not be empty".into(),
22 });
23 }
24 if ast.dims.is_empty() {
25 return Err(SqlError::Parse {
26 detail: format!("CREATE ARRAY {}: at least one dim is required", ast.name),
27 });
28 }
29 if ast.attrs.is_empty() {
30 return Err(SqlError::Parse {
31 detail: format!("CREATE ARRAY {}: at least one attr is required", ast.name),
32 });
33 }
34 if ast.tile_extents.len() != ast.dims.len() {
35 return Err(SqlError::Parse {
36 detail: format!(
37 "CREATE ARRAY {}: TILE_EXTENTS arity {} != DIMS arity {}",
38 ast.name,
39 ast.tile_extents.len(),
40 ast.dims.len()
41 ),
42 });
43 }
44 let mut seen_dims: HashSet<&str> = HashSet::new();
45 for d in &ast.dims {
46 if d.name.is_empty() {
47 return Err(SqlError::Parse {
48 detail: format!("CREATE ARRAY {}: empty dim name", ast.name),
49 });
50 }
51 if !seen_dims.insert(d.name.as_str()) {
52 return Err(SqlError::Parse {
53 detail: format!("CREATE ARRAY {}: duplicate dim name `{}`", ast.name, d.name),
54 });
55 }
56 if !domain_bounds_match_type(&d.lo, &d.hi, d.dtype) {
57 return Err(SqlError::Parse {
58 detail: format!(
59 "CREATE ARRAY {}: domain bounds for dim `{}` do not match dim type",
60 ast.name, d.name
61 ),
62 });
63 }
64 if !domain_lo_le_hi(&d.lo, &d.hi) {
65 return Err(SqlError::Parse {
66 detail: format!("CREATE ARRAY {}: dim `{}` lo > hi", ast.name, d.name),
67 });
68 }
69 }
70 let mut seen_attrs: HashSet<&str> = HashSet::new();
71 for a in &ast.attrs {
72 if a.name.is_empty() {
73 return Err(SqlError::Parse {
74 detail: format!("CREATE ARRAY {}: empty attr name", ast.name),
75 });
76 }
77 if !seen_attrs.insert(a.name.as_str()) {
78 return Err(SqlError::Parse {
79 detail: format!(
80 "CREATE ARRAY {}: duplicate attr name `{}`",
81 ast.name, a.name
82 ),
83 });
84 }
85 }
86 for (i, te) in ast.tile_extents.iter().enumerate() {
87 if *te <= 0 {
88 return Err(SqlError::Parse {
89 detail: format!(
90 "CREATE ARRAY {}: tile extent for dim `{}` must be > 0 (got {te})",
91 ast.name, ast.dims[i].name
92 ),
93 });
94 }
95 }
96
97 if !(1..=16).contains(&ast.prefix_bits) {
98 return Err(SqlError::Parse {
99 detail: format!(
100 "CREATE ARRAY {}: prefix_bits {} is not in range 1–16",
101 ast.name, ast.prefix_bits
102 ),
103 });
104 }
105 Ok(SqlPlan::CreateArray {
106 name: ast.name.clone(),
107 dims: ast.dims.clone(),
108 attrs: ast.attrs.clone(),
109 tile_extents: ast.tile_extents.clone(),
110 cell_order: ast.cell_order,
111 tile_order: ast.tile_order,
112 prefix_bits: ast.prefix_bits,
113 audit_retain_ms: ast.audit_retain_ms,
114 minimum_audit_retain_ms: ast.minimum_audit_retain_ms,
115 })
116}
117
118pub fn plan_alter_array(ast: &AlterArrayAst) -> Result<SqlPlan> {
119 if ast.name.is_empty() {
120 return Err(SqlError::Parse {
121 detail: "ALTER ARRAY: array name must not be empty".into(),
122 });
123 }
124 if ast.set.is_empty() {
125 return Err(SqlError::Parse {
126 detail: format!("ALTER ARRAY {}: SET clause is empty", ast.name),
127 });
128 }
129
130 let mut audit_retain_ms: Option<Option<i64>> = None;
131 let mut minimum_audit_retain_ms: Option<u64> = None;
132
133 for (key, value) in &ast.set {
134 match key.as_str() {
135 "audit_retain_ms" => {
136 audit_retain_ms = Some(*value);
137 }
138 "minimum_audit_retain_ms" => {
139 let n = value.ok_or_else(|| SqlError::Parse {
140 detail: format!(
141 "ALTER ARRAY {}: minimum_audit_retain_ms cannot be NULL",
142 ast.name
143 ),
144 })?;
145 minimum_audit_retain_ms = Some(n as u64);
146 }
147 other => {
148 return Err(SqlError::Parse {
149 detail: format!(
150 "ALTER ARRAY {}: unknown SET key `{other}`; \
151 expected `audit_retain_ms` or `minimum_audit_retain_ms`",
152 ast.name
153 ),
154 });
155 }
156 }
157 }
158
159 Ok(SqlPlan::AlterArray {
160 name: ast.name.clone(),
161 audit_retain_ms,
162 minimum_audit_retain_ms,
163 })
164}
165
166pub fn plan_drop_array(ast: &DropArrayAst) -> Result<SqlPlan> {
167 if ast.name.is_empty() {
168 return Err(SqlError::Parse {
169 detail: "DROP ARRAY: name must not be empty".into(),
170 });
171 }
172 Ok(SqlPlan::DropArray {
173 name: ast.name.clone(),
174 if_exists: ast.if_exists,
175 })
176}
177
178fn domain_bounds_match_type(
179 lo: &ArrayDomainBound,
180 hi: &ArrayDomainBound,
181 dtype: ArrayDimType,
182) -> bool {
183 matches!(
184 (lo, hi, dtype),
185 (
186 ArrayDomainBound::Int64(_),
187 ArrayDomainBound::Int64(_),
188 ArrayDimType::Int64
189 ) | (
190 ArrayDomainBound::Float64(_),
191 ArrayDomainBound::Float64(_),
192 ArrayDimType::Float64
193 ) | (
194 ArrayDomainBound::TimestampMs(_),
195 ArrayDomainBound::TimestampMs(_),
196 ArrayDimType::TimestampMs,
197 ) | (
198 ArrayDomainBound::String(_),
199 ArrayDomainBound::String(_),
200 ArrayDimType::String
201 )
202 )
203}
204
205fn domain_lo_le_hi(lo: &ArrayDomainBound, hi: &ArrayDomainBound) -> bool {
206 match (lo, hi) {
207 (ArrayDomainBound::Int64(a), ArrayDomainBound::Int64(b)) => a <= b,
208 (ArrayDomainBound::Float64(a), ArrayDomainBound::Float64(b)) => a <= b,
209 (ArrayDomainBound::TimestampMs(a), ArrayDomainBound::TimestampMs(b)) => a <= b,
210 (ArrayDomainBound::String(a), ArrayDomainBound::String(b)) => a <= b,
211 _ => false,
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::types_array::{ArrayAttrAst, ArrayAttrType, ArrayDimAst};
219
220 fn ok_ast() -> CreateArrayAst {
221 CreateArrayAst {
222 name: "g".into(),
223 dims: vec![ArrayDimAst {
224 name: "x".into(),
225 dtype: ArrayDimType::Int64,
226 lo: ArrayDomainBound::Int64(0),
227 hi: ArrayDomainBound::Int64(10),
228 }],
229 attrs: vec![ArrayAttrAst {
230 name: "v".into(),
231 dtype: ArrayAttrType::Int64,
232 nullable: false,
233 }],
234 tile_extents: vec![4],
235 cell_order: Default::default(),
236 tile_order: Default::default(),
237 prefix_bits: 8,
238 audit_retain_ms: None,
239 minimum_audit_retain_ms: None,
240 }
241 }
242
243 #[test]
244 fn happy_path() {
245 let plan = plan_create_array(&ok_ast()).unwrap();
246 assert!(matches!(plan, SqlPlan::CreateArray { .. }));
247 }
248
249 #[test]
250 fn rejects_extent_arity() {
251 let mut a = ok_ast();
252 a.tile_extents = vec![1, 2];
253 assert!(plan_create_array(&a).is_err());
254 }
255
256 #[test]
257 fn rejects_dup_dim() {
258 let mut a = ok_ast();
259 a.dims.push(a.dims[0].clone());
260 a.tile_extents = vec![4, 4];
261 assert!(plan_create_array(&a).is_err());
262 }
263
264 #[test]
265 fn rejects_lo_gt_hi() {
266 let mut a = ok_ast();
267 a.dims[0].lo = ArrayDomainBound::Int64(5);
268 a.dims[0].hi = ArrayDomainBound::Int64(1);
269 assert!(plan_create_array(&a).is_err());
270 }
271}