1use std::collections::HashMap;
6
7use nodedb_types::DatabaseId;
8
9use crate::error::{Result, SqlError};
10use crate::parser::normalize::{
11 normalize_ident, normalize_object_name_checked, table_name_from_factor,
12};
13use crate::types::{
14 ArrayCatalogView, CollectionInfo, ColumnInfo, EngineType, SqlCatalog, SqlDataType,
15};
16use crate::types_array::{ArrayAttrType, ArrayDimType};
17
18#[derive(Debug, Clone)]
20pub struct ResolvedTable {
21 pub name: String,
22 pub alias: Option<String>,
23 pub info: CollectionInfo,
24}
25
26impl ResolvedTable {
27 pub fn ref_name(&self) -> &str {
29 self.alias.as_deref().unwrap_or(&self.name)
30 }
31}
32
33#[derive(Debug, Default)]
35pub struct TableScope {
36 pub tables: HashMap<String, ResolvedTable>,
38 order: Vec<String>,
40}
41
42impl TableScope {
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn add(&mut self, table: ResolvedTable) -> Result<()> {
49 let key = table.ref_name().to_string();
50 if self.tables.contains_key(&key) {
51 return Err(SqlError::Parse {
52 detail: format!("duplicate table reference: {key}"),
53 });
54 }
55 self.order.push(key.clone());
56 self.tables.insert(key, table);
57 Ok(())
58 }
59
60 pub fn resolve_column(
65 &self,
66 table_ref: Option<&str>,
67 column: &str,
68 ) -> Result<(String, String)> {
69 let col = column.to_lowercase();
70
71 if let Some(tref) = table_ref {
72 let tref_lower = tref.to_lowercase();
73 let table = self
74 .tables
75 .get(&tref_lower)
76 .ok_or_else(|| SqlError::UnknownTable {
77 name: tref_lower.clone(),
78 })?;
79 self.validate_column(table, &col)?;
80 return Ok((table.name.clone(), col));
81 }
82
83 let mut matches = Vec::new();
85 for key in &self.order {
86 let table = &self.tables[key];
87 if self.column_exists(table, &col) {
88 matches.push(table.name.clone());
89 }
90 }
91
92 match matches.len() {
93 0 => {
94 if self.tables.len() == 1 {
96 let table = self
97 .tables
98 .values()
99 .next()
100 .expect("invariant: self.tables.len() == 1 checked immediately above");
101 if table.info.engine == EngineType::DocumentSchemaless {
102 return Ok((table.name.clone(), col));
103 }
104 }
105 Err(SqlError::UnknownColumn {
106 table: self
107 .order
108 .first()
109 .cloned()
110 .unwrap_or_else(|| "<unknown>".into()),
111 column: col,
112 })
113 }
114 1 => Ok((
115 matches
116 .into_iter()
117 .next()
118 .expect("invariant: matches.len() == 1 guaranteed by this match arm"),
119 col,
120 )),
121 _ => Err(SqlError::AmbiguousColumn { column: col }),
122 }
123 }
124
125 fn column_exists(&self, table: &ResolvedTable, column: &str) -> bool {
126 if table.info.engine == EngineType::DocumentSchemaless {
128 return true;
129 }
130 table.info.columns.iter().any(|c| c.name == column)
131 }
132
133 fn validate_column(&self, table: &ResolvedTable, column: &str) -> Result<()> {
134 if self.column_exists(table, column) {
135 Ok(())
136 } else {
137 Err(SqlError::UnknownColumn {
138 table: table.name.clone(),
139 column: column.into(),
140 })
141 }
142 }
143
144 pub fn single_table(&self) -> Option<&ResolvedTable> {
146 if self.tables.len() == 1 {
147 self.tables.values().next()
148 } else {
149 Option::None
150 }
151 }
152
153 pub fn resolve_from(
155 catalog: &dyn SqlCatalog,
156 from: &[sqlparser::ast::TableWithJoins],
157 ) -> Result<Self> {
158 let mut scope = Self::new();
159 for table_with_joins in from {
160 scope.resolve_table_factor(catalog, &table_with_joins.relation)?;
161 for join in &table_with_joins.joins {
162 scope.resolve_table_factor(catalog, &join.relation)?;
163 }
164 }
165 Ok(scope)
166 }
167
168 fn resolve_table_factor(
169 &mut self,
170 catalog: &dyn SqlCatalog,
171 factor: &sqlparser::ast::TableFactor,
172 ) -> Result<()> {
173 if let Some(resolved) = resolve_array_tvf(catalog, factor)? {
177 self.add(resolved)?;
178 return Ok(());
179 }
180 if let sqlparser::ast::TableFactor::Derived {
184 lateral: true,
185 alias: Some(alias),
186 ..
187 } = factor
188 {
189 let alias_str = normalize_ident(&alias.name);
190 self.add(ResolvedTable {
191 name: alias_str.clone(),
192 alias: Some(alias_str.clone()),
193 info: CollectionInfo {
194 name: alias_str,
195 engine: EngineType::DocumentSchemaless,
196 columns: Vec::new(),
197 primary_key: None,
198 has_auto_tier: false,
199 indexes: Vec::new(),
200 bitemporal: false,
201 primary: nodedb_types::PrimaryEngine::Document,
202 vector_primary: None,
203 },
204 })?;
205 return Ok(());
206 }
207 if let Some((name, alias)) = table_name_from_factor(factor)? {
208 let info = catalog
209 .get_collection(DatabaseId::DEFAULT, &name)?
210 .ok_or_else(|| SqlError::UnknownTable { name: name.clone() })?;
211 self.add(ResolvedTable { name, alias, info })?;
212 }
213 Ok(())
214 }
215}
216
217fn resolve_array_tvf(
221 catalog: &dyn SqlCatalog,
222 factor: &sqlparser::ast::TableFactor,
223) -> Result<Option<ResolvedTable>> {
224 let (fn_name, args, alias) = match factor {
225 sqlparser::ast::TableFactor::Table {
226 name,
227 args: Some(args),
228 alias,
229 ..
230 } => (
231 normalize_object_name_checked(name)?,
232 args,
233 alias.as_ref().map(|a| normalize_ident(&a.name)),
234 ),
235 _ => return Ok(None),
236 };
237 if !matches!(
238 fn_name.as_str(),
239 "array_slice" | "array_project" | "array_agg" | "array_elementwise"
240 ) {
241 return Ok(None);
242 }
243
244 let first = args.args.first().ok_or_else(|| SqlError::Unsupported {
246 detail: format!("{fn_name}: missing array-name argument"),
247 })?;
248 let array_name = extract_string_literal_arg(first).ok_or_else(|| SqlError::Unsupported {
249 detail: format!("{fn_name}: array-name argument must be a string literal"),
250 })?;
251 let view = catalog
252 .lookup_array(&array_name)
253 .ok_or_else(|| SqlError::UnknownTable {
254 name: array_name.clone(),
255 })?;
256
257 let info = CollectionInfo {
258 name: view.name.clone(),
259 engine: EngineType::Array,
260 columns: array_columns(&view),
261 primary_key: None,
262 has_auto_tier: false,
263 indexes: Vec::new(),
264 bitemporal: false,
265 primary: nodedb_types::PrimaryEngine::Document,
266 vector_primary: None,
267 };
268 Ok(Some(ResolvedTable {
269 name: view.name,
270 alias,
271 info,
272 }))
273}
274
275fn array_columns(view: &ArrayCatalogView) -> Vec<ColumnInfo> {
276 let mut cols = Vec::with_capacity(view.dims.len() + view.attrs.len());
277 for d in &view.dims {
278 cols.push(ColumnInfo {
279 name: d.name.clone(),
280 data_type: dim_type_to_sql(d.dtype),
281 nullable: false,
282 is_primary_key: false,
283 default: None,
284 raw_type: None,
285 });
286 }
287 for a in &view.attrs {
288 cols.push(ColumnInfo {
289 name: a.name.clone(),
290 data_type: attr_type_to_sql(a.dtype),
291 nullable: a.nullable,
292 is_primary_key: false,
293 default: None,
294 raw_type: None,
295 });
296 }
297 cols
298}
299
300fn dim_type_to_sql(t: ArrayDimType) -> SqlDataType {
301 match t {
302 ArrayDimType::Int64 => SqlDataType::Int64,
303 ArrayDimType::Float64 => SqlDataType::Float64,
304 ArrayDimType::TimestampMs => SqlDataType::Timestamp,
305 ArrayDimType::String => SqlDataType::String,
306 }
307}
308
309fn attr_type_to_sql(t: ArrayAttrType) -> SqlDataType {
310 match t {
311 ArrayAttrType::Int64 => SqlDataType::Int64,
312 ArrayAttrType::Float64 => SqlDataType::Float64,
313 ArrayAttrType::String => SqlDataType::String,
314 ArrayAttrType::Bytes => SqlDataType::Bytes,
315 }
316}
317
318fn extract_string_literal_arg(arg: &sqlparser::ast::FunctionArg) -> Option<String> {
319 use sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Value};
320 let expr = match arg {
321 FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
322 FunctionArg::Named {
323 arg: FunctionArgExpr::Expr(e),
324 ..
325 } => e,
326 _ => return None,
327 };
328 match expr {
329 Expr::Value(v) => match &v.value {
330 Value::SingleQuotedString(s) => Some(s.clone()),
331 _ => None,
332 },
333 _ => None,
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::types::{CollectionInfo, ColumnInfo, EngineType, SqlDataType};
341 use nodedb_types::PrimaryEngine;
342
343 fn strict_collection(name: &str, columns: Vec<&str>) -> CollectionInfo {
344 CollectionInfo {
345 name: name.into(),
346 engine: EngineType::DocumentStrict,
347 columns: columns
348 .into_iter()
349 .map(|c| ColumnInfo {
350 name: c.into(),
351 data_type: SqlDataType::String,
352 nullable: true,
353 is_primary_key: false,
354 default: None,
355 raw_type: None,
356 })
357 .collect(),
358 primary_key: None,
359 has_auto_tier: false,
360 indexes: Vec::new(),
361 bitemporal: false,
362 primary: PrimaryEngine::Document,
363 vector_primary: None,
364 }
365 }
366
367 fn schemaless_collection(name: &str) -> CollectionInfo {
368 CollectionInfo {
369 name: name.into(),
370 engine: EngineType::DocumentSchemaless,
371 columns: Vec::new(),
372 primary_key: None,
373 has_auto_tier: false,
374 indexes: Vec::new(),
375 bitemporal: false,
376 primary: PrimaryEngine::Document,
377 vector_primary: None,
378 }
379 }
380
381 fn scope_with(info: CollectionInfo) -> TableScope {
382 let mut scope = TableScope::new();
383 scope
384 .add(ResolvedTable {
385 name: info.name.clone(),
386 alias: None,
387 info,
388 })
389 .expect("add failed");
390 scope
391 }
392
393 #[test]
401 fn quoted_identifier_resolves_as_column() {
402 let scope = scope_with(strict_collection("users", vec!["userid", "email"]));
403 let (table, col) = scope
404 .resolve_column(None, "userid")
405 .expect("should resolve");
406 assert_eq!(table, "users");
407 assert_eq!(col, "userid");
408 }
409
410 #[test]
416 fn unknown_column_in_strict_collection_yields_unknown_column_error() {
417 let scope = scope_with(strict_collection("users", vec!["id", "email"]));
418 let err = scope
419 .resolve_column(None, "ghost_col")
420 .expect_err("should fail for unknown column");
421 assert!(
422 matches!(err, SqlError::UnknownColumn { ref column, .. } if column == "ghost_col"),
423 "expected UnknownColumn(ghost_col), got {err:?}"
424 );
425 assert!(
427 !matches!(err, SqlError::Unsupported { .. }),
428 "must not surface Unsupported for a missing column"
429 );
430 }
431
432 #[test]
435 fn any_column_accepted_in_schemaless_collection() {
436 let scope = scope_with(schemaless_collection("events"));
437 let (table, col) = scope
438 .resolve_column(None, "ghost_col")
439 .expect("schemaless should accept any column");
440 assert_eq!(table, "events");
441 assert_eq!(col, "ghost_col");
442 }
443
444 #[test]
446 fn qualified_column_resolves_correctly() {
447 let scope = scope_with(strict_collection("t", vec!["col", "other"]));
448 let (table, col) = scope
449 .resolve_column(Some("t"), "col")
450 .expect("qualified column should resolve");
451 assert_eq!(table, "t");
452 assert_eq!(col, "col");
453 }
454
455 #[test]
458 fn qualified_unknown_column_in_strict_collection() {
459 let scope = scope_with(strict_collection("t", vec!["id"]));
460 let err = scope
461 .resolve_column(Some("t"), "missing")
462 .expect_err("should fail");
463 assert!(
464 matches!(err, SqlError::UnknownColumn { .. }),
465 "expected UnknownColumn, got {err:?}"
466 );
467 }
468}