1use std::collections::HashMap;
6
7use nodedb_types::DatabaseId;
8
9use crate::error::{Result, SqlError};
10use crate::parser::normalize::{
11 normalize_ident, normalize_object_name_checked, table_name_from_factor,
12};
13use crate::types::{
14 ArrayCatalogView, CollectionInfo, ColumnInfo, EngineType, SqlCatalog, SqlDataType,
15};
16use crate::types_array::{ArrayAttrType, ArrayDimType};
17
18#[derive(Debug, Clone)]
20pub struct ResolvedTable {
21 pub name: String,
22 pub alias: Option<String>,
23 pub info: CollectionInfo,
24}
25
26impl ResolvedTable {
27 pub fn ref_name(&self) -> &str {
29 self.alias.as_deref().unwrap_or(&self.name)
30 }
31}
32
33#[derive(Debug, Default)]
35pub struct TableScope {
36 pub tables: HashMap<String, ResolvedTable>,
38 order: Vec<String>,
40}
41
42impl TableScope {
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn add(&mut self, table: ResolvedTable) -> Result<()> {
49 let key = table.ref_name().to_string();
50 if self.tables.contains_key(&key) {
51 return Err(SqlError::Parse {
52 detail: format!("duplicate table reference: {key}"),
53 });
54 }
55 self.order.push(key.clone());
56 self.tables.insert(key, table);
57 Ok(())
58 }
59
60 pub fn resolve_column(
65 &self,
66 table_ref: Option<&str>,
67 column: &str,
68 ) -> Result<(String, String)> {
69 let col = column.to_lowercase();
70
71 if let Some(tref) = table_ref {
72 let tref_lower = tref.to_lowercase();
73 let table = self
74 .tables
75 .get(&tref_lower)
76 .ok_or_else(|| SqlError::UnknownTable {
77 name: tref_lower.clone(),
78 })?;
79 self.validate_column(table, &col)?;
80 return Ok((table.name.clone(), col));
81 }
82
83 let mut matches = Vec::new();
85 for key in &self.order {
86 let table = &self.tables[key];
87 if self.column_exists(table, &col) {
88 matches.push(table.name.clone());
89 }
90 }
91
92 match matches.len() {
93 0 => {
94 if self.tables.len() == 1 {
96 let table = self
97 .tables
98 .values()
99 .next()
100 .expect("invariant: self.tables.len() == 1 checked immediately above");
101 if table.info.engine == EngineType::DocumentSchemaless {
102 return Ok((table.name.clone(), col));
103 }
104 }
105 Err(SqlError::UnknownColumn {
106 table: self
107 .order
108 .first()
109 .cloned()
110 .unwrap_or_else(|| "<unknown>".into()),
111 column: col,
112 })
113 }
114 1 => Ok((
115 matches
116 .into_iter()
117 .next()
118 .expect("invariant: matches.len() == 1 guaranteed by this match arm"),
119 col,
120 )),
121 _ => Err(SqlError::AmbiguousColumn { column: col }),
122 }
123 }
124
125 fn column_exists(&self, table: &ResolvedTable, column: &str) -> bool {
126 if table.info.engine == EngineType::DocumentSchemaless {
128 return true;
129 }
130 table.info.columns.iter().any(|c| c.name == column)
131 }
132
133 fn validate_column(&self, table: &ResolvedTable, column: &str) -> Result<()> {
134 if self.column_exists(table, column) {
135 Ok(())
136 } else {
137 Err(SqlError::UnknownColumn {
138 table: table.name.clone(),
139 column: column.into(),
140 })
141 }
142 }
143
144 pub fn single_table(&self) -> Option<&ResolvedTable> {
146 if self.tables.len() == 1 {
147 self.tables.values().next()
148 } else {
149 Option::None
150 }
151 }
152
153 pub fn resolve_from(
155 catalog: &dyn SqlCatalog,
156 from: &[sqlparser::ast::TableWithJoins],
157 ) -> Result<Self> {
158 let mut scope = Self::new();
159 for table_with_joins in from {
160 scope.resolve_table_factor(catalog, &table_with_joins.relation)?;
161 for join in &table_with_joins.joins {
162 scope.resolve_table_factor(catalog, &join.relation)?;
163 }
164 }
165 Ok(scope)
166 }
167
168 fn resolve_table_factor(
169 &mut self,
170 catalog: &dyn SqlCatalog,
171 factor: &sqlparser::ast::TableFactor,
172 ) -> Result<()> {
173 if let Some(resolved) = resolve_array_tvf(catalog, factor)? {
177 self.add(resolved)?;
178 return Ok(());
179 }
180 if let sqlparser::ast::TableFactor::Derived {
184 lateral: true,
185 alias: Some(alias),
186 ..
187 } = factor
188 {
189 let alias_str = normalize_ident(&alias.name);
190 self.add(ResolvedTable {
191 name: alias_str.clone(),
192 alias: Some(alias_str.clone()),
193 info: CollectionInfo {
194 name: alias_str,
195 engine: EngineType::DocumentSchemaless,
196 columns: Vec::new(),
197 primary_key: None,
198 has_auto_tier: false,
199 indexes: Vec::new(),
200 bitemporal: false,
201 primary: nodedb_types::PrimaryEngine::Document,
202 vector_primary: None,
203 },
204 })?;
205 return Ok(());
206 }
207 if let Some((name, alias)) = table_name_from_factor(factor)? {
208 let info = catalog
209 .get_collection(DatabaseId::DEFAULT, &name)?
210 .ok_or_else(|| SqlError::UnknownTable { name: name.clone() })?;
211 self.add(ResolvedTable { name, alias, info })?;
212 }
213 Ok(())
214 }
215}
216
217fn resolve_array_tvf(
221 catalog: &dyn SqlCatalog,
222 factor: &sqlparser::ast::TableFactor,
223) -> Result<Option<ResolvedTable>> {
224 let (fn_name, args, alias) = match factor {
225 sqlparser::ast::TableFactor::Table {
226 name,
227 args: Some(args),
228 alias,
229 ..
230 } => (
231 normalize_object_name_checked(name)?,
232 args,
233 alias.as_ref().map(|a| normalize_ident(&a.name)),
234 ),
235 _ => return Ok(None),
236 };
237 if !matches!(
238 fn_name.as_str(),
239 "array_slice" | "array_project" | "array_agg" | "array_elementwise"
240 ) {
241 return Ok(None);
242 }
243
244 let first = args.args.first().ok_or_else(|| SqlError::Unsupported {
246 detail: format!("{fn_name}: missing array-name argument"),
247 })?;
248 let array_name = extract_string_literal_arg(first).ok_or_else(|| SqlError::Unsupported {
249 detail: format!("{fn_name}: array-name argument must be a string literal"),
250 })?;
251 let view = catalog
252 .lookup_array(&array_name)
253 .ok_or_else(|| SqlError::UnknownTable {
254 name: array_name.clone(),
255 })?;
256
257 let info = CollectionInfo {
258 name: view.name.clone(),
259 engine: EngineType::Array,
260 columns: array_columns(&view),
261 primary_key: None,
262 has_auto_tier: false,
263 indexes: Vec::new(),
264 bitemporal: false,
265 primary: nodedb_types::PrimaryEngine::Document,
266 vector_primary: None,
267 };
268 Ok(Some(ResolvedTable {
269 name: view.name,
270 alias,
271 info,
272 }))
273}
274
275fn array_columns(view: &ArrayCatalogView) -> Vec<ColumnInfo> {
276 let mut cols = Vec::with_capacity(view.dims.len() + view.attrs.len());
277 for d in &view.dims {
278 cols.push(ColumnInfo {
279 name: d.name.clone(),
280 data_type: dim_type_to_sql(d.dtype),
281 nullable: false,
282 is_primary_key: false,
283 default: None,
284 });
285 }
286 for a in &view.attrs {
287 cols.push(ColumnInfo {
288 name: a.name.clone(),
289 data_type: attr_type_to_sql(a.dtype),
290 nullable: a.nullable,
291 is_primary_key: false,
292 default: None,
293 });
294 }
295 cols
296}
297
298fn dim_type_to_sql(t: ArrayDimType) -> SqlDataType {
299 match t {
300 ArrayDimType::Int64 => SqlDataType::Int64,
301 ArrayDimType::Float64 => SqlDataType::Float64,
302 ArrayDimType::TimestampMs => SqlDataType::Timestamp,
303 ArrayDimType::String => SqlDataType::String,
304 }
305}
306
307fn attr_type_to_sql(t: ArrayAttrType) -> SqlDataType {
308 match t {
309 ArrayAttrType::Int64 => SqlDataType::Int64,
310 ArrayAttrType::Float64 => SqlDataType::Float64,
311 ArrayAttrType::String => SqlDataType::String,
312 ArrayAttrType::Bytes => SqlDataType::Bytes,
313 }
314}
315
316fn extract_string_literal_arg(arg: &sqlparser::ast::FunctionArg) -> Option<String> {
317 use sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Value};
318 let expr = match arg {
319 FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
320 FunctionArg::Named {
321 arg: FunctionArgExpr::Expr(e),
322 ..
323 } => e,
324 _ => return None,
325 };
326 match expr {
327 Expr::Value(v) => match &v.value {
328 Value::SingleQuotedString(s) => Some(s.clone()),
329 _ => None,
330 },
331 _ => None,
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::types::{CollectionInfo, ColumnInfo, EngineType, SqlDataType};
339 use nodedb_types::PrimaryEngine;
340
341 fn strict_collection(name: &str, columns: Vec<&str>) -> CollectionInfo {
342 CollectionInfo {
343 name: name.into(),
344 engine: EngineType::DocumentStrict,
345 columns: columns
346 .into_iter()
347 .map(|c| ColumnInfo {
348 name: c.into(),
349 data_type: SqlDataType::String,
350 nullable: true,
351 is_primary_key: false,
352 default: None,
353 })
354 .collect(),
355 primary_key: None,
356 has_auto_tier: false,
357 indexes: Vec::new(),
358 bitemporal: false,
359 primary: PrimaryEngine::Document,
360 vector_primary: None,
361 }
362 }
363
364 fn schemaless_collection(name: &str) -> CollectionInfo {
365 CollectionInfo {
366 name: name.into(),
367 engine: EngineType::DocumentSchemaless,
368 columns: Vec::new(),
369 primary_key: None,
370 has_auto_tier: false,
371 indexes: Vec::new(),
372 bitemporal: false,
373 primary: PrimaryEngine::Document,
374 vector_primary: None,
375 }
376 }
377
378 fn scope_with(info: CollectionInfo) -> TableScope {
379 let mut scope = TableScope::new();
380 scope
381 .add(ResolvedTable {
382 name: info.name.clone(),
383 alias: None,
384 info,
385 })
386 .expect("add failed");
387 scope
388 }
389
390 #[test]
398 fn quoted_identifier_resolves_as_column() {
399 let scope = scope_with(strict_collection("users", vec!["userid", "email"]));
400 let (table, col) = scope
401 .resolve_column(None, "userid")
402 .expect("should resolve");
403 assert_eq!(table, "users");
404 assert_eq!(col, "userid");
405 }
406
407 #[test]
413 fn unknown_column_in_strict_collection_yields_unknown_column_error() {
414 let scope = scope_with(strict_collection("users", vec!["id", "email"]));
415 let err = scope
416 .resolve_column(None, "ghost_col")
417 .expect_err("should fail for unknown column");
418 assert!(
419 matches!(err, SqlError::UnknownColumn { ref column, .. } if column == "ghost_col"),
420 "expected UnknownColumn(ghost_col), got {err:?}"
421 );
422 assert!(
424 !matches!(err, SqlError::Unsupported { .. }),
425 "must not surface Unsupported for a missing column"
426 );
427 }
428
429 #[test]
432 fn any_column_accepted_in_schemaless_collection() {
433 let scope = scope_with(schemaless_collection("events"));
434 let (table, col) = scope
435 .resolve_column(None, "ghost_col")
436 .expect("schemaless should accept any column");
437 assert_eq!(table, "events");
438 assert_eq!(col, "ghost_col");
439 }
440
441 #[test]
443 fn qualified_column_resolves_correctly() {
444 let scope = scope_with(strict_collection("t", vec!["col", "other"]));
445 let (table, col) = scope
446 .resolve_column(Some("t"), "col")
447 .expect("qualified column should resolve");
448 assert_eq!(table, "t");
449 assert_eq!(col, "col");
450 }
451
452 #[test]
455 fn qualified_unknown_column_in_strict_collection() {
456 let scope = scope_with(strict_collection("t", vec!["id"]));
457 let err = scope
458 .resolve_column(Some("t"), "missing")
459 .expect_err("should fail");
460 assert!(
461 matches!(err, SqlError::UnknownColumn { .. }),
462 "expected UnknownColumn, got {err:?}"
463 );
464 }
465}