1use crate::linter::document::{LintDocument, LintStatement};
2use crate::linter::Linter;
3use crate::types::*;
4use sqlparser::ast::{CreateView, Statement};
5use std::borrow::Cow;
6use std::collections::HashSet;
7use std::ops::Range;
8use std::sync::Arc;
9#[cfg(feature = "tracing")]
10use tracing::info_span;
11
12const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
15
16mod complexity;
17mod context;
18pub(crate) mod cross_statement;
19mod ddl;
20mod descriptions;
21mod diagnostics;
22mod expression;
23mod functions;
24mod global;
25pub mod helpers;
26mod input;
27mod query;
28pub(crate) mod schema_registry;
29mod select_analyzer;
30mod statements;
31mod transform;
32pub mod visitor;
33
34use context::StatementContext;
35use cross_statement::CrossStatementTracker;
36use descriptions::DescriptionKey;
37use helpers::{
38 build_column_schemas_with_constraints, find_identifier_span, find_relation_occurrence_spans,
39};
40use input::{collect_statements, StatementInput};
41use schema_registry::SchemaRegistry;
42use statements::{
43 detect_dbt_model_materialization, extract_model_name, DbtMaterializationDetection,
44 StatementSource,
45};
46use std::collections::HashMap;
47
48pub(crate) use schema_registry::TableResolution;
50
51#[must_use]
53pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
54 #[cfg(feature = "tracing")]
55 let _span =
56 info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
57 .entered();
58 let mut analyzer = Analyzer::new(request);
59 analyzer.analyze()
60}
61
62#[must_use]
64pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
65 if request.sql.len() > MAX_SQL_LENGTH {
67 return StatementSplitResult::from_error(format!(
68 "SQL exceeds maximum length of {} bytes ({} bytes provided)",
69 MAX_SQL_LENGTH,
70 request.sql.len()
71 ));
72 }
73
74 StatementSplitResult {
75 statements: input::split_statement_spans_with_dialect(&request.sql, request.dialect),
76 error: None,
77 }
78}
79
80pub(crate) struct Analyzer<'a> {
88 pub(crate) request: &'a AnalyzeRequest,
89 pub(crate) issues: Vec<Issue>,
90 pub(crate) statement_lineages: Vec<StatementLineage>,
91 pub(crate) schema: SchemaRegistry,
93 pub(crate) tracker: CrossStatementTracker,
95 pub(crate) column_lineage_enabled: bool,
97 current_statement_source: Option<StatementSourceSlice<'a>>,
99 depth_limit_statements: HashSet<usize>,
101 linter: Option<Linter>,
103 pub(crate) descriptions: HashMap<DescriptionKey, Arc<str>>,
106}
107
108impl<'a> Analyzer<'a> {
109 fn new(request: &'a AnalyzeRequest) -> Self {
110 let column_lineage_enabled = request
112 .options
113 .as_ref()
114 .and_then(|o| o.enable_column_lineage)
115 .unwrap_or(true);
116
117 let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
118
119 let linter = request
121 .options
122 .as_ref()
123 .and_then(|o| o.lint.clone())
124 .filter(|c| c.enabled)
125 .map(Linter::new);
126
127 Self {
128 request,
129 issues: init_issues,
130 statement_lineages: Vec::new(),
131 schema,
132 tracker: CrossStatementTracker::new(),
133 column_lineage_enabled,
134 current_statement_source: None,
135 depth_limit_statements: HashSet::new(),
136 linter,
137 descriptions: HashMap::new(),
138 }
139 }
140
141 fn current_sql_slice(&self, _caller: &'static str) -> Option<(&str, usize)> {
143 if let Some(source) = &self.current_statement_source {
144 return match source.sql.get(source.range.start..source.range.end) {
145 Some(slice) => Some((slice, source.range.start)),
146 None => {
147 #[cfg(feature = "tracing")]
148 tracing::warn!(
149 caller = _caller,
150 start = source.range.start,
151 end = source.range.end,
152 sql_len = source.sql.len(),
153 "current statement source range is invalid"
154 );
155 None
156 }
157 };
158 }
159
160 Some((self.request.sql.as_str(), 0))
161 }
162
163 pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
167 let (sql, offset) = self.current_sql_slice("find_span")?;
168 find_identifier_span(sql, identifier, 0)
169 .map(|span| Span::new(offset + span.start, offset + span.end))
170 }
171
172 pub(crate) fn locate_statement_span<F>(
187 &self,
188 ctx: &mut StatementContext,
189 identifier: &str,
190 finder: F,
191 ) -> Option<Span>
192 where
193 F: Fn(&str, &str, usize) -> Option<Span>,
194 {
195 let search_start = ctx.span_search_cursor;
196
197 let (sql, offset) = self.current_sql_slice("locate_statement_span")?;
198
199 let span = if let Some(span) = finder(sql, identifier, search_start) {
200 span
201 } else {
202 if search_start > 0 {
203 if let Some(earlier) = finder(sql, identifier, 0) {
204 if earlier.end <= search_start {
205 #[cfg(feature = "tracing")]
206 tracing::warn!(
207 identifier,
208 search_start,
209 earlier_start = earlier.start,
210 earlier_end = earlier.end,
211 "locate_statement_span exhausted its cursor before matching; traversal may be out of lexical order"
212 );
213 }
214 }
215 }
216 return None;
217 };
218 debug_assert!(
219 span.end >= ctx.span_search_cursor,
220 "Span cursor moved backward: {} -> {} (identifier: '{}')",
221 ctx.span_search_cursor,
222 span.end,
223 identifier
224 );
225
226 ctx.span_search_cursor = span.end;
227 Some(Span::new(offset + span.start, offset + span.end))
228 }
229
230 pub(crate) fn locate_relation_name_span(
237 &self,
238 ctx: &mut StatementContext,
239 raw_name: &str,
240 ) -> Option<Span> {
241 let search_start = *ctx.relation_span_cursor(raw_name);
242
243 let (sql, offset) = self.current_sql_slice("locate_relation_name_span")?;
244
245 let (full_span, name_span) = find_relation_occurrence_spans(sql, raw_name, search_start)?;
246 *ctx.relation_span_cursor(raw_name) = full_span.end;
247 Some(Span::new(offset + name_span.start, offset + name_span.end))
248 }
249
250 pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
252 self.tracker.relation_identity(canonical)
253 }
254
255 pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
257 self.tracker.relation_node_id(canonical)
258 }
259
260 pub(crate) fn allow_implied(&self) -> bool {
262 self.schema.allow_implied()
263 }
264
265 pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
267 self.schema.canonicalize_table_reference(name)
268 }
269
270 pub(crate) fn normalize_identifier(&self, name: &str) -> String {
272 self.schema.normalize_identifier(name)
273 }
274
275 pub(crate) fn normalize_table_name(&self, name: &str) -> String {
277 self.schema.normalize_table_name(name)
278 }
279
280 pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
282 if self.depth_limit_statements.insert(statement_index) {
283 self.issues.push(
284 Issue::warning(
285 issue_codes::APPROXIMATE_LINEAGE,
286 format!(
287 "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
288 expression::MAX_RECURSION_DEPTH
289 ),
290 )
291 .with_statement(statement_index),
292 );
293 }
294 }
295
296 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
297 fn analyze(&mut self) -> AnalyzeResult {
298 let (all_statements, mut preflight_issues) = collect_statements(self.request);
299 self.issues.append(&mut preflight_issues);
300
301 #[cfg(feature = "tracing")]
302 tracing::Span::current().record("stmt_count", all_statements.len());
303
304 self.precollect_ddl(&all_statements);
305
306 if all_statements.is_empty() {
307 self.run_lint_documents_without_statements();
308 return self.build_result();
309 }
310
311 self.run_lint_documents(&all_statements);
312
313 for (
315 index,
316 StatementInput {
317 statement,
318 source_name,
319 source_sql,
320 source_range,
321 source_sql_untemplated,
322 source_range_untemplated,
323 templating_applied,
324 ..
325 },
326 ) in all_statements.into_iter().enumerate()
327 {
328 #[cfg(feature = "tracing")]
329 let _stmt_span = info_span!(
330 "analyze_statement",
331 index,
332 source = source_name.as_deref().map_or("inline", String::as_str),
333 stmt_type = ?statement
334 )
335 .entered();
336
337 let resolved_sql = if templating_applied {
339 Some(source_sql[source_range.clone()].to_string())
340 } else {
341 None
342 };
343 let original_sql = source_sql_untemplated.as_deref().and_then(|sql| {
344 source_range_untemplated
345 .as_ref()
346 .and_then(|range| sql.get(range.clone()).map(str::to_string))
347 });
348 self.current_statement_source = Some(StatementSourceSlice {
349 sql: source_sql,
350 range: source_range.clone(),
351 });
352
353 let source_name_owned = source_name.as_deref().map(String::from);
354 let result = self.analyze_statement(
355 index,
356 &statement,
357 source_name_owned,
358 StatementSource {
359 source_range,
360 original_source_range: source_range_untemplated,
361 original_sql,
362 resolved_sql,
363 },
364 );
365 self.current_statement_source = None;
366
367 match result {
368 Ok(lineage) => {
369 self.statement_lineages.push(lineage);
370 }
371 Err(e) => {
372 self.issues.push(
373 Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
374 );
375 }
376 }
377 }
378
379 self.build_result()
380 }
381}
382
383struct StatementSourceSlice<'a> {
384 sql: Cow<'a, str>,
385 range: Range<usize>,
386}
387
388impl<'a> Analyzer<'a> {
389 fn run_lint_documents(&mut self, statements: &[StatementInput<'a>]) {
390 let Some(linter) = self.linter.as_ref() else {
391 return;
392 };
393
394 let mut start = 0usize;
395 while start < statements.len() {
396 let source_name_key = statements[start]
397 .source_name
398 .as_deref()
399 .map(|name| name.as_str());
400 let source_sql_key = statements[start].source_sql.as_ref();
401 let source_untemplated_sql_key = statements[start].source_sql_untemplated.as_deref();
402
403 let mut end = start + 1;
404 while end < statements.len()
405 && statements[end]
406 .source_name
407 .as_deref()
408 .map(|name| name.as_str())
409 == source_name_key
410 && statements[end].source_sql.as_ref() == source_sql_key
411 && statements[end].source_sql_untemplated.as_deref() == source_untemplated_sql_key
412 {
413 end += 1;
414 }
415
416 let mut lint_statements = Vec::with_capacity(end - start);
417 let mut source_statement_ranges = Vec::with_capacity(end - start);
418 for (offset, statement_input) in statements[start..end].iter().enumerate() {
419 lint_statements.push(LintStatement {
420 statement: &statement_input.statement,
421 statement_index: offset,
422 statement_range: statement_input.source_range.clone(),
423 });
424 source_statement_ranges.push(statement_input.source_range_untemplated.clone());
425 }
426
427 let parser_fallback_used = statements[start..end]
428 .iter()
429 .any(|statement_input| statement_input.parser_fallback_used);
430 let document = LintDocument::new_with_parser_fallback_and_source(
431 source_sql_key,
432 source_untemplated_sql_key,
433 self.request.dialect,
434 lint_statements,
435 parser_fallback_used,
436 Some(source_statement_ranges),
437 );
438 self.issues.extend(linter.check_document(&document));
439
440 start = end;
441 }
442 }
443
444 fn run_lint_documents_without_statements(&mut self) {
445 let Some(linter) = self.linter.as_ref() else {
446 return;
447 };
448
449 if let Some(files) = &self.request.files {
450 if files.is_empty() {
451 return;
452 }
453 for file in files {
454 let document = LintDocument::new(&file.content, self.request.dialect, Vec::new());
455 self.issues.extend(linter.check_document(&document));
456 }
457 return;
458 }
459
460 if !self.request.sql.is_empty() {
461 let document = LintDocument::new(&self.request.sql, self.request.dialect, Vec::new());
462 self.issues.extend(linter.check_document(&document));
463 }
464 }
465
466 fn precollect_ddl(&mut self, statements: &[StatementInput]) {
468 self.descriptions = self.collect_description_map(statements);
469
470 if self.is_dbt_mode() {
471 self.precollect_dbt_models(statements);
472 }
473
474 for (index, stmt_input) in statements.iter().enumerate() {
475 match &stmt_input.statement {
476 Statement::CreateTable(create) => {
477 self.precollect_create_table(create, index);
478 }
479 Statement::CreateView(CreateView { name, .. }) => {
480 self.precollect_create_view(name);
481 }
482 _ => {}
483 }
484 }
485 }
486
487 fn precollect_dbt_models(&mut self, statements: &[StatementInput]) {
508 let mut first_source: HashMap<String, String> = HashMap::new();
509
510 for (index, stmt_input) in statements.iter().enumerate() {
511 let Statement::Query(_) = &stmt_input.statement else {
512 continue;
513 };
514
515 let Some(source_name) = stmt_input.source_name.as_deref() else {
516 continue;
517 };
518
519 let original_sql = stmt_input
520 .source_sql_untemplated
521 .as_deref()
522 .and_then(|sql| {
523 stmt_input
524 .source_range_untemplated
525 .as_ref()
526 .and_then(|range| sql.get(range.clone()))
527 });
528 let model_name = self.normalize_table_name(extract_model_name(source_name));
529
530 if let Some(existing) = first_source.get(&model_name) {
531 self.issues.push(
532 Issue::warning(
533 issue_codes::SCHEMA_CONFLICT,
534 format!(
535 "Duplicate dbt model '{model_name}' defined in both '{existing}' and '{source_name}'. Node identity (table/view/ephemeral) is taken from the first definition ('{existing}'); cross-statement lineage edges still point to the last statement that produces the model."
536 ),
537 )
538 .with_statement(index),
539 );
540 continue;
541 }
542 first_source.insert(model_name.clone(), source_name.to_string());
543
544 let detection = original_sql
545 .map(detect_dbt_model_materialization)
546 .unwrap_or(DbtMaterializationDetection::NotConfigured);
547
548 match detection.node_type() {
549 Some(NodeType::View) => self.tracker.declare_view(&model_name),
550 Some(NodeType::Cte) => self.tracker.declare_ephemeral(&model_name),
551 Some(NodeType::Table) | None => self.tracker.declare_table(&model_name),
557 Some(NodeType::Output | NodeType::Column) => {
558 unreachable!("dbt materializations must map to relation-like node types")
559 }
560 }
561
562 if matches!(detection, DbtMaterializationDetection::Unresolved) {
563 self.issues.push(
564 Issue::warning(
565 issue_codes::APPROXIMATE_LINEAGE,
566 format!(
567 "dbt model '{model_name}' has a dynamic or unsupported `materialized` value; defaulting to table"
568 ),
569 )
570 .with_statement(index),
571 );
572 }
573 }
574 }
575
576 fn precollect_create_table(
578 &mut self,
579 create: &sqlparser::ast::CreateTable,
580 statement_index: usize,
581 ) {
582 let canonical = self.normalize_table_name(&create.name.to_string());
583
584 if create.query.is_none() {
585 let (column_schemas, table_constraints) =
586 build_column_schemas_with_constraints(&create.columns, &create.constraints);
587
588 self.schema.seed_implied_schema_with_constraints(
589 &canonical,
590 column_schemas,
591 table_constraints,
592 create.temporary,
593 statement_index,
594 );
595 } else {
596 self.schema.mark_table_known(&canonical);
600 }
601 }
602
603 fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
605 let canonical = self.normalize_table_name(&name.to_string());
606 self.schema.mark_table_known(&canonical);
607 }
608}
609
610#[cfg(test)]
611mod tests;