Skip to main content

flowscope_core/
analyzer.rs

1use crate::linter::document::{LintDocument, LintStatement};
2use crate::linter::Linter;
3use crate::types::*;
4use sqlparser::ast::{CreateView, Statement};
5use std::borrow::Cow;
6use std::collections::HashSet;
7use std::ops::Range;
8use std::sync::Arc;
9#[cfg(feature = "tracing")]
10use tracing::info_span;
11
12/// Maximum SQL input size (10MB) to prevent memory exhaustion.
13/// This matches the TypeScript validation limit.
14const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
15
16mod complexity;
17mod context;
18pub(crate) mod cross_statement;
19mod ddl;
20mod descriptions;
21mod diagnostics;
22mod expression;
23mod functions;
24mod global;
25pub mod helpers;
26mod input;
27mod query;
28pub(crate) mod schema_registry;
29mod select_analyzer;
30mod statements;
31mod transform;
32pub mod visitor;
33
34use context::StatementContext;
35use cross_statement::CrossStatementTracker;
36use descriptions::DescriptionKey;
37use helpers::{
38    build_column_schemas_with_constraints, find_identifier_span, find_relation_occurrence_spans,
39};
40use input::{collect_statements, StatementInput};
41use schema_registry::SchemaRegistry;
42use statements::{
43    detect_dbt_model_materialization, extract_model_name, DbtMaterializationDetection,
44    StatementSource,
45};
46use std::collections::HashMap;
47
48// Re-export for use in other analyzer modules
49pub(crate) use schema_registry::TableResolution;
50
51/// Main entry point for SQL analysis
52#[must_use]
53pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
54    #[cfg(feature = "tracing")]
55    let _span =
56        info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
57            .entered();
58    let mut analyzer = Analyzer::new(request);
59    analyzer.analyze()
60}
61
62/// Split SQL into statement spans.
63#[must_use]
64pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
65    // Validate input size to prevent memory exhaustion
66    if request.sql.len() > MAX_SQL_LENGTH {
67        return StatementSplitResult::from_error(format!(
68            "SQL exceeds maximum length of {} bytes ({} bytes provided)",
69            MAX_SQL_LENGTH,
70            request.sql.len()
71        ));
72    }
73
74    StatementSplitResult {
75        statements: input::split_statement_spans_with_dialect(&request.sql, request.dialect),
76        error: None,
77    }
78}
79
80/// Internal analyzer state.
81///
82/// The analyzer is organized into focused components:
83/// - `schema`: Manages schema metadata, resolution, and normalization
84/// - `tracker`: Tracks cross-statement dependencies and lineage
85/// - `issues`: Collects warnings and errors during analysis
86/// - `statement_lineages`: Stores per-statement analysis results
87pub(crate) struct Analyzer<'a> {
88    pub(crate) request: &'a AnalyzeRequest,
89    pub(crate) issues: Vec<Issue>,
90    pub(crate) statement_lineages: Vec<StatementLineage>,
91    /// Schema registry for table/column resolution.
92    pub(crate) schema: SchemaRegistry,
93    /// Cross-statement dependency tracker.
94    pub(crate) tracker: CrossStatementTracker,
95    /// Whether column lineage is enabled.
96    pub(crate) column_lineage_enabled: bool,
97    /// Source slice for the currently analyzed statement (for span lookups).
98    current_statement_source: Option<StatementSourceSlice<'a>>,
99    /// Statements that already emitted a recursion-depth warning.
100    depth_limit_statements: HashSet<usize>,
101    /// SQL linter (None if linting is disabled).
102    linter: Option<Linter>,
103    /// Descriptions harvested from structured SQL comments.
104    /// Keyed by canonical table path (plus column for column targets).
105    pub(crate) descriptions: HashMap<DescriptionKey, Arc<str>>,
106}
107
108impl<'a> Analyzer<'a> {
109    fn new(request: &'a AnalyzeRequest) -> Self {
110        // Check if column lineage is enabled (default: true)
111        let column_lineage_enabled = request
112            .options
113            .as_ref()
114            .and_then(|o| o.enable_column_lineage)
115            .unwrap_or(true);
116
117        let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
118
119        // Initialize linter only when explicitly requested via options.lint
120        let linter = request
121            .options
122            .as_ref()
123            .and_then(|o| o.lint.clone())
124            .filter(|c| c.enabled)
125            .map(Linter::new);
126
127        Self {
128            request,
129            issues: init_issues,
130            statement_lineages: Vec::new(),
131            schema,
132            tracker: CrossStatementTracker::new(),
133            column_lineage_enabled,
134            current_statement_source: None,
135            depth_limit_statements: HashSet::new(),
136            linter,
137            descriptions: HashMap::new(),
138        }
139    }
140
141    /// Returns the current statement SQL slice plus its absolute offset.
142    fn current_sql_slice(&self, _caller: &'static str) -> Option<(&str, usize)> {
143        if let Some(source) = &self.current_statement_source {
144            return match source.sql.get(source.range.start..source.range.end) {
145                Some(slice) => Some((slice, source.range.start)),
146                None => {
147                    #[cfg(feature = "tracing")]
148                    tracing::warn!(
149                        caller = _caller,
150                        start = source.range.start,
151                        end = source.range.end,
152                        sql_len = source.sql.len(),
153                        "current statement source range is invalid"
154                    );
155                    None
156                }
157            };
158        }
159
160        Some((self.request.sql.as_str(), 0))
161    }
162
163    /// Finds the span of an identifier in the SQL text.
164    ///
165    /// This is used to attach source locations to issues for better error reporting.
166    pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
167        let (sql, offset) = self.current_sql_slice("find_span")?;
168        find_identifier_span(sql, identifier, 0)
169            .map(|span| Span::new(offset + span.start, offset + span.end))
170    }
171
172    /// Locates the next identifier span inside the current statement using the
173    /// statement-local search cursor stored on `ctx`.
174    ///
175    /// # Traversal-order contract
176    ///
177    /// Callers must invoke this in roughly left-to-right lexical order within a
178    /// single statement. Each successful call advances `ctx.span_search_cursor`
179    /// past the matched span, so a caller that processes AST nodes out of text
180    /// order will either skip matches or associate them with the wrong node
181    /// instance (notably for self-joins and repeated names). The
182    /// `debug_assert!` below catches backward movement in debug builds; it is
183    /// intentionally silent in release so a mildly-out-of-order visitor does
184    /// not panic in production — but callers should still treat left-to-right
185    /// traversal as an invariant.
186    pub(crate) fn locate_statement_span<F>(
187        &self,
188        ctx: &mut StatementContext,
189        identifier: &str,
190        finder: F,
191    ) -> Option<Span>
192    where
193        F: Fn(&str, &str, usize) -> Option<Span>,
194    {
195        let search_start = ctx.span_search_cursor;
196
197        let (sql, offset) = self.current_sql_slice("locate_statement_span")?;
198
199        let span = if let Some(span) = finder(sql, identifier, search_start) {
200            span
201        } else {
202            if search_start > 0 {
203                if let Some(earlier) = finder(sql, identifier, 0) {
204                    if earlier.end <= search_start {
205                        #[cfg(feature = "tracing")]
206                        tracing::warn!(
207                            identifier,
208                            search_start,
209                            earlier_start = earlier.start,
210                            earlier_end = earlier.end,
211                            "locate_statement_span exhausted its cursor before matching; traversal may be out of lexical order"
212                        );
213                    }
214                }
215            }
216            return None;
217        };
218        debug_assert!(
219            span.end >= ctx.span_search_cursor,
220            "Span cursor moved backward: {} -> {} (identifier: '{}')",
221            ctx.span_search_cursor,
222            span.end,
223            identifier
224        );
225
226        ctx.span_search_cursor = span.end;
227        Some(Span::new(offset + span.start, offset + span.end))
228    }
229
230    /// Locates the next occurrence of a relation name and narrows the match to
231    /// the node label token (the final identifier component).
232    ///
233    /// For example, `public.users` maps to the span of `users`, not the whole
234    /// qualified path. This preserves the existing `nameSpans` semantics while
235    /// still assigning occurrences per node instance in lexical order.
236    pub(crate) fn locate_relation_name_span(
237        &self,
238        ctx: &mut StatementContext,
239        raw_name: &str,
240    ) -> Option<Span> {
241        let search_start = *ctx.relation_span_cursor(raw_name);
242
243        let (sql, offset) = self.current_sql_slice("locate_relation_name_span")?;
244
245        let (full_span, name_span) = find_relation_occurrence_spans(sql, raw_name, search_start)?;
246        *ctx.relation_span_cursor(raw_name) = full_span.end;
247        Some(Span::new(offset + name_span.start, offset + name_span.end))
248    }
249
250    /// Returns the correct node ID and type for a relation (view vs table).
251    pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
252        self.tracker.relation_identity(canonical)
253    }
254
255    /// Returns the node ID for a relation.
256    pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
257        self.tracker.relation_node_id(canonical)
258    }
259
260    /// Check if implied schema capture is allowed (default: true).
261    pub(crate) fn allow_implied(&self) -> bool {
262        self.schema.allow_implied()
263    }
264
265    /// Canonicalizes a table reference using schema resolution.
266    pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
267        self.schema.canonicalize_table_reference(name)
268    }
269
270    /// Normalizes an identifier according to dialect case sensitivity.
271    pub(crate) fn normalize_identifier(&self, name: &str) -> String {
272        self.schema.normalize_identifier(name)
273    }
274
275    /// Normalizes a qualified table name.
276    pub(crate) fn normalize_table_name(&self, name: &str) -> String {
277        self.schema.normalize_table_name(name)
278    }
279
280    /// Emits a warning when expression traversal exceeds the recursion guard.
281    pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
282        if self.depth_limit_statements.insert(statement_index) {
283            self.issues.push(
284                Issue::warning(
285                    issue_codes::APPROXIMATE_LINEAGE,
286                    format!(
287                        "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
288                        expression::MAX_RECURSION_DEPTH
289                    ),
290                )
291                .with_statement(statement_index),
292            );
293        }
294    }
295
296    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
297    fn analyze(&mut self) -> AnalyzeResult {
298        let (all_statements, mut preflight_issues) = collect_statements(self.request);
299        self.issues.append(&mut preflight_issues);
300
301        #[cfg(feature = "tracing")]
302        tracing::Span::current().record("stmt_count", all_statements.len());
303
304        self.precollect_ddl(&all_statements);
305
306        if all_statements.is_empty() {
307            self.run_lint_documents_without_statements();
308            return self.build_result();
309        }
310
311        self.run_lint_documents(&all_statements);
312
313        // Analyze all statements
314        for (
315            index,
316            StatementInput {
317                statement,
318                source_name,
319                source_sql,
320                source_range,
321                source_sql_untemplated,
322                source_range_untemplated,
323                templating_applied,
324                ..
325            },
326        ) in all_statements.into_iter().enumerate()
327        {
328            #[cfg(feature = "tracing")]
329            let _stmt_span = info_span!(
330                "analyze_statement",
331                index,
332                source = source_name.as_deref().map_or("inline", String::as_str),
333                stmt_type = ?statement
334            )
335            .entered();
336
337            // Extract resolved SQL when templating was applied
338            let resolved_sql = if templating_applied {
339                Some(source_sql[source_range.clone()].to_string())
340            } else {
341                None
342            };
343            let original_sql = source_sql_untemplated.as_deref().and_then(|sql| {
344                source_range_untemplated
345                    .as_ref()
346                    .and_then(|range| sql.get(range.clone()).map(str::to_string))
347            });
348            self.current_statement_source = Some(StatementSourceSlice {
349                sql: source_sql,
350                range: source_range.clone(),
351            });
352
353            let source_name_owned = source_name.as_deref().map(String::from);
354            let result = self.analyze_statement(
355                index,
356                &statement,
357                source_name_owned,
358                StatementSource {
359                    source_range,
360                    original_source_range: source_range_untemplated,
361                    original_sql,
362                    resolved_sql,
363                },
364            );
365            self.current_statement_source = None;
366
367            match result {
368                Ok(lineage) => {
369                    self.statement_lineages.push(lineage);
370                }
371                Err(e) => {
372                    self.issues.push(
373                        Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
374                    );
375                }
376            }
377        }
378
379        self.build_result()
380    }
381}
382
383struct StatementSourceSlice<'a> {
384    sql: Cow<'a, str>,
385    range: Range<usize>,
386}
387
388impl<'a> Analyzer<'a> {
389    fn run_lint_documents(&mut self, statements: &[StatementInput<'a>]) {
390        let Some(linter) = self.linter.as_ref() else {
391            return;
392        };
393
394        let mut start = 0usize;
395        while start < statements.len() {
396            let source_name_key = statements[start]
397                .source_name
398                .as_deref()
399                .map(|name| name.as_str());
400            let source_sql_key = statements[start].source_sql.as_ref();
401            let source_untemplated_sql_key = statements[start].source_sql_untemplated.as_deref();
402
403            let mut end = start + 1;
404            while end < statements.len()
405                && statements[end]
406                    .source_name
407                    .as_deref()
408                    .map(|name| name.as_str())
409                    == source_name_key
410                && statements[end].source_sql.as_ref() == source_sql_key
411                && statements[end].source_sql_untemplated.as_deref() == source_untemplated_sql_key
412            {
413                end += 1;
414            }
415
416            let mut lint_statements = Vec::with_capacity(end - start);
417            let mut source_statement_ranges = Vec::with_capacity(end - start);
418            for (offset, statement_input) in statements[start..end].iter().enumerate() {
419                lint_statements.push(LintStatement {
420                    statement: &statement_input.statement,
421                    statement_index: offset,
422                    statement_range: statement_input.source_range.clone(),
423                });
424                source_statement_ranges.push(statement_input.source_range_untemplated.clone());
425            }
426
427            let parser_fallback_used = statements[start..end]
428                .iter()
429                .any(|statement_input| statement_input.parser_fallback_used);
430            let document = LintDocument::new_with_parser_fallback_and_source(
431                source_sql_key,
432                source_untemplated_sql_key,
433                self.request.dialect,
434                lint_statements,
435                parser_fallback_used,
436                Some(source_statement_ranges),
437            );
438            self.issues.extend(linter.check_document(&document));
439
440            start = end;
441        }
442    }
443
444    fn run_lint_documents_without_statements(&mut self) {
445        let Some(linter) = self.linter.as_ref() else {
446            return;
447        };
448
449        if let Some(files) = &self.request.files {
450            if files.is_empty() {
451                return;
452            }
453            for file in files {
454                let document = LintDocument::new(&file.content, self.request.dialect, Vec::new());
455                self.issues.extend(linter.check_document(&document));
456            }
457            return;
458        }
459
460        if !self.request.sql.is_empty() {
461            let document = LintDocument::new(&self.request.sql, self.request.dialect, Vec::new());
462            self.issues.extend(linter.check_document(&document));
463        }
464    }
465
466    /// Pre-registers CREATE TABLE/VIEW targets so earlier statements can resolve them.
467    fn precollect_ddl(&mut self, statements: &[StatementInput]) {
468        self.descriptions = self.collect_description_map(statements);
469
470        if self.is_dbt_mode() {
471            self.precollect_dbt_models(statements);
472        }
473
474        for (index, stmt_input) in statements.iter().enumerate() {
475            match &stmt_input.statement {
476                Statement::CreateTable(create) => {
477                    self.precollect_create_table(create, index);
478                }
479                Statement::CreateView(CreateView { name, .. }) => {
480                    self.precollect_create_view(name);
481                }
482                _ => {}
483            }
484        }
485    }
486
487    /// Pre-registers dbt model relations so forward `ref(...)` consumers and
488    /// later producers share one canonical node identity.
489    ///
490    /// This pass only **declares type** — it does not record a producer
491    /// statement index. Producer indices are set later by the main analysis
492    /// pass via `record_view_produced` / `record_produced`. Keeping that
493    /// single source of truth for production avoids duplicating producer
494    /// state and prevents `declared_views` from drifting out of sync with
495    /// `produced_views`.
496    ///
497    /// Two other responsibilities handled here:
498    /// - Detecting duplicate model names across files and surfacing a warning
499    ///   so real project configuration errors aren't silently masked. The
500    ///   first definition's materialization wins for node identity; later
501    ///   producer statements still overwrite the producer index via the
502    ///   normal `record_produced` path, so cross-statement edges point at the
503    ///   most recently seen producer.
504    /// - Surfacing a warning when `materialized=` is present but can't be
505    ///   resolved (dynamic Jinja or adapter-specific value), so users know
506    ///   we fell back to the default node type.
507    fn precollect_dbt_models(&mut self, statements: &[StatementInput]) {
508        let mut first_source: HashMap<String, String> = HashMap::new();
509
510        for (index, stmt_input) in statements.iter().enumerate() {
511            let Statement::Query(_) = &stmt_input.statement else {
512                continue;
513            };
514
515            let Some(source_name) = stmt_input.source_name.as_deref() else {
516                continue;
517            };
518
519            let original_sql = stmt_input
520                .source_sql_untemplated
521                .as_deref()
522                .and_then(|sql| {
523                    stmt_input
524                        .source_range_untemplated
525                        .as_ref()
526                        .and_then(|range| sql.get(range.clone()))
527                });
528            let model_name = self.normalize_table_name(extract_model_name(source_name));
529
530            if let Some(existing) = first_source.get(&model_name) {
531                self.issues.push(
532                    Issue::warning(
533                        issue_codes::SCHEMA_CONFLICT,
534                        format!(
535                            "Duplicate dbt model '{model_name}' defined in both '{existing}' and '{source_name}'. Node identity (table/view/ephemeral) is taken from the first definition ('{existing}'); cross-statement lineage edges still point to the last statement that produces the model."
536                        ),
537                    )
538                    .with_statement(index),
539                );
540                continue;
541            }
542            first_source.insert(model_name.clone(), source_name.to_string());
543
544            let detection = original_sql
545                .map(detect_dbt_model_materialization)
546                .unwrap_or(DbtMaterializationDetection::NotConfigured);
547
548            match detection.node_type() {
549                Some(NodeType::View) => self.tracker.declare_view(&model_name),
550                Some(NodeType::Cte) => self.tracker.declare_ephemeral(&model_name),
551                // Table is the default in `relation_identity`, but we still
552                // need to declare it so forward `ref(...)` consumers resolve
553                // to a known relation instead of emitting
554                // `UNRESOLVED_REFERENCE` before the producer statement runs.
555                // `None` (NotConfigured / Unresolved) also defaults to table.
556                Some(NodeType::Table) | None => self.tracker.declare_table(&model_name),
557                Some(NodeType::Output | NodeType::Column) => {
558                    unreachable!("dbt materializations must map to relation-like node types")
559                }
560            }
561
562            if matches!(detection, DbtMaterializationDetection::Unresolved) {
563                self.issues.push(
564                    Issue::warning(
565                        issue_codes::APPROXIMATE_LINEAGE,
566                        format!(
567                            "dbt model '{model_name}' has a dynamic or unsupported `materialized` value; defaulting to table"
568                        ),
569                    )
570                    .with_statement(index),
571                );
572            }
573        }
574    }
575
576    /// Handles CREATE TABLE statements during DDL pre-collection.
577    fn precollect_create_table(
578        &mut self,
579        create: &sqlparser::ast::CreateTable,
580        statement_index: usize,
581    ) {
582        let canonical = self.normalize_table_name(&create.name.to_string());
583
584        if create.query.is_none() {
585            let (column_schemas, table_constraints) =
586                build_column_schemas_with_constraints(&create.columns, &create.constraints);
587
588            self.schema.seed_implied_schema_with_constraints(
589                &canonical,
590                column_schemas,
591                table_constraints,
592                create.temporary,
593                statement_index,
594            );
595        } else {
596            // This is a CTAS (CREATE TABLE ... AS SELECT ...).
597            // We mark the table as known to prevent UNRESOLVED_REFERENCE
598            // errors, but we don't have column schema yet.
599            self.schema.mark_table_known(&canonical);
600        }
601    }
602
603    /// Handles CREATE VIEW statements during DDL pre-collection.
604    fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
605        let canonical = self.normalize_table_name(&name.to_string());
606        self.schema.mark_table_known(&canonical);
607    }
608}
609
610#[cfg(test)]
611mod tests;