Skip to main content

flowscope_core/
analyzer.rs

1use crate::linter::document::{LintDocument, LintStatement};
2use crate::linter::Linter;
3use crate::types::*;
4use sqlparser::ast::Statement;
5use std::borrow::Cow;
6use std::collections::HashSet;
7use std::ops::Range;
8use std::sync::Arc;
9#[cfg(feature = "tracing")]
10use tracing::info_span;
11
12/// Maximum SQL input size (10MB) to prevent memory exhaustion.
13/// This matches the TypeScript validation limit.
14const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
15
16mod complexity;
17mod context;
18pub(crate) mod cross_statement;
19mod ddl;
20mod diagnostics;
21mod expression;
22mod functions;
23mod global;
24pub mod helpers;
25mod input;
26mod query;
27pub(crate) mod schema_registry;
28mod select_analyzer;
29mod statements;
30mod transform;
31pub mod visitor;
32
33use cross_statement::CrossStatementTracker;
34use helpers::{build_column_schemas_with_constraints, find_identifier_span};
35use input::{collect_statements, StatementInput};
36use schema_registry::SchemaRegistry;
37
38// Re-export for use in other analyzer modules
39pub(crate) use schema_registry::TableResolution;
40
41/// Main entry point for SQL analysis
42#[must_use]
43pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
44    #[cfg(feature = "tracing")]
45    let _span =
46        info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
47            .entered();
48    let mut analyzer = Analyzer::new(request);
49    analyzer.analyze()
50}
51
52/// Split SQL into statement spans.
53#[must_use]
54pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
55    // Validate input size to prevent memory exhaustion
56    if request.sql.len() > MAX_SQL_LENGTH {
57        return StatementSplitResult::from_error(format!(
58            "SQL exceeds maximum length of {} bytes ({} bytes provided)",
59            MAX_SQL_LENGTH,
60            request.sql.len()
61        ));
62    }
63
64    StatementSplitResult {
65        statements: input::split_statement_spans_with_dialect(&request.sql, request.dialect),
66        error: None,
67    }
68}
69
70/// Internal analyzer state.
71///
72/// The analyzer is organized into focused components:
73/// - `schema`: Manages schema metadata, resolution, and normalization
74/// - `tracker`: Tracks cross-statement dependencies and lineage
75/// - `issues`: Collects warnings and errors during analysis
76/// - `statement_lineages`: Stores per-statement analysis results
77pub(crate) struct Analyzer<'a> {
78    pub(crate) request: &'a AnalyzeRequest,
79    pub(crate) issues: Vec<Issue>,
80    pub(crate) statement_lineages: Vec<StatementLineage>,
81    /// Schema registry for table/column resolution.
82    pub(crate) schema: SchemaRegistry,
83    /// Cross-statement dependency tracker.
84    pub(crate) tracker: CrossStatementTracker,
85    /// Whether column lineage is enabled.
86    pub(crate) column_lineage_enabled: bool,
87    /// Source slice for the currently analyzed statement (for span lookups).
88    current_statement_source: Option<StatementSourceSlice<'a>>,
89    /// Statements that already emitted a recursion-depth warning.
90    depth_limit_statements: HashSet<usize>,
91    /// SQL linter (None if linting is disabled).
92    linter: Option<Linter>,
93}
94
95impl<'a> Analyzer<'a> {
96    fn new(request: &'a AnalyzeRequest) -> Self {
97        // Check if column lineage is enabled (default: true)
98        let column_lineage_enabled = request
99            .options
100            .as_ref()
101            .and_then(|o| o.enable_column_lineage)
102            .unwrap_or(true);
103
104        let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
105
106        // Initialize linter only when explicitly requested via options.lint
107        let linter = request
108            .options
109            .as_ref()
110            .and_then(|o| o.lint.clone())
111            .filter(|c| c.enabled)
112            .map(Linter::new);
113
114        Self {
115            request,
116            issues: init_issues,
117            statement_lineages: Vec::new(),
118            schema,
119            tracker: CrossStatementTracker::new(),
120            column_lineage_enabled,
121            current_statement_source: None,
122            depth_limit_statements: HashSet::new(),
123            linter,
124        }
125    }
126
127    /// Finds the span of an identifier in the SQL text.
128    ///
129    /// This is used to attach source locations to issues for better error reporting.
130    pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
131        if let Some(source) = &self.current_statement_source {
132            let statement_sql = &source.sql[source.range.clone()];
133            return find_identifier_span(statement_sql, identifier, 0).map(|span| {
134                Span::new(
135                    source.range.start + span.start,
136                    source.range.start + span.end,
137                )
138            });
139        }
140
141        find_identifier_span(&self.request.sql, identifier, 0)
142    }
143
144    /// Returns the correct node ID and type for a relation (view vs table).
145    pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
146        self.tracker.relation_identity(canonical)
147    }
148
149    /// Returns the node ID for a relation.
150    pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
151        self.tracker.relation_node_id(canonical)
152    }
153
154    /// Check if implied schema capture is allowed (default: true).
155    pub(crate) fn allow_implied(&self) -> bool {
156        self.schema.allow_implied()
157    }
158
159    /// Canonicalizes a table reference using schema resolution.
160    pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
161        self.schema.canonicalize_table_reference(name)
162    }
163
164    /// Normalizes an identifier according to dialect case sensitivity.
165    pub(crate) fn normalize_identifier(&self, name: &str) -> String {
166        self.schema.normalize_identifier(name)
167    }
168
169    /// Normalizes a qualified table name.
170    pub(crate) fn normalize_table_name(&self, name: &str) -> String {
171        self.schema.normalize_table_name(name)
172    }
173
174    /// Emits a warning when expression traversal exceeds the recursion guard.
175    pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
176        if self.depth_limit_statements.insert(statement_index) {
177            self.issues.push(
178                Issue::warning(
179                    issue_codes::APPROXIMATE_LINEAGE,
180                    format!(
181                        "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
182                        expression::MAX_RECURSION_DEPTH
183                    ),
184                )
185                .with_statement(statement_index),
186            );
187        }
188    }
189
190    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
191    fn analyze(&mut self) -> AnalyzeResult {
192        let (all_statements, mut preflight_issues) = collect_statements(self.request);
193        self.issues.append(&mut preflight_issues);
194
195        #[cfg(feature = "tracing")]
196        tracing::Span::current().record("stmt_count", all_statements.len());
197
198        self.precollect_ddl(&all_statements);
199
200        if all_statements.is_empty() {
201            self.run_lint_documents_without_statements();
202            return self.build_result();
203        }
204
205        self.run_lint_documents(&all_statements);
206
207        // Analyze all statements
208        for (
209            index,
210            StatementInput {
211                statement,
212                source_name,
213                source_sql,
214                source_range,
215                templating_applied,
216                ..
217            },
218        ) in all_statements.into_iter().enumerate()
219        {
220            #[cfg(feature = "tracing")]
221            let _stmt_span = info_span!(
222                "analyze_statement",
223                index,
224                source = source_name.as_deref().map_or("inline", String::as_str),
225                stmt_type = ?statement
226            )
227            .entered();
228
229            // Extract resolved SQL when templating was applied
230            let resolved_sql = if templating_applied {
231                Some(source_sql[source_range.clone()].to_string())
232            } else {
233                None
234            };
235            self.current_statement_source = Some(StatementSourceSlice {
236                sql: source_sql,
237                range: source_range.clone(),
238            });
239
240            let source_name_owned = source_name.as_deref().map(String::from);
241            let result = self.analyze_statement(
242                index,
243                &statement,
244                source_name_owned,
245                source_range,
246                resolved_sql,
247            );
248            self.current_statement_source = None;
249
250            match result {
251                Ok(lineage) => {
252                    self.statement_lineages.push(lineage);
253                }
254                Err(e) => {
255                    self.issues.push(
256                        Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
257                    );
258                }
259            }
260        }
261
262        self.build_result()
263    }
264}
265
266struct StatementSourceSlice<'a> {
267    sql: Cow<'a, str>,
268    range: Range<usize>,
269}
270
271impl<'a> Analyzer<'a> {
272    fn run_lint_documents(&mut self, statements: &[StatementInput<'a>]) {
273        let Some(linter) = self.linter.as_ref() else {
274            return;
275        };
276
277        let mut start = 0usize;
278        while start < statements.len() {
279            let source_name_key = statements[start]
280                .source_name
281                .as_deref()
282                .map(|name| name.as_str());
283            let source_sql_key = statements[start].source_sql.as_ref();
284            let source_untemplated_sql_key = statements[start].source_sql_untemplated.as_deref();
285
286            let mut end = start + 1;
287            while end < statements.len()
288                && statements[end]
289                    .source_name
290                    .as_deref()
291                    .map(|name| name.as_str())
292                    == source_name_key
293                && statements[end].source_sql.as_ref() == source_sql_key
294                && statements[end].source_sql_untemplated.as_deref() == source_untemplated_sql_key
295            {
296                end += 1;
297            }
298
299            let mut lint_statements = Vec::with_capacity(end - start);
300            let mut source_statement_ranges = Vec::with_capacity(end - start);
301            for (offset, statement_input) in statements[start..end].iter().enumerate() {
302                lint_statements.push(LintStatement {
303                    statement: &statement_input.statement,
304                    statement_index: offset,
305                    statement_range: statement_input.source_range.clone(),
306                });
307                source_statement_ranges.push(statement_input.source_range_untemplated.clone());
308            }
309
310            let parser_fallback_used = statements[start..end]
311                .iter()
312                .any(|statement_input| statement_input.parser_fallback_used);
313            let document = LintDocument::new_with_parser_fallback_and_source(
314                source_sql_key,
315                source_untemplated_sql_key,
316                self.request.dialect,
317                lint_statements,
318                parser_fallback_used,
319                Some(source_statement_ranges),
320            );
321            self.issues.extend(linter.check_document(&document));
322
323            start = end;
324        }
325    }
326
327    fn run_lint_documents_without_statements(&mut self) {
328        let Some(linter) = self.linter.as_ref() else {
329            return;
330        };
331
332        if let Some(files) = &self.request.files {
333            if files.is_empty() {
334                return;
335            }
336            for file in files {
337                let document = LintDocument::new(&file.content, self.request.dialect, Vec::new());
338                self.issues.extend(linter.check_document(&document));
339            }
340            return;
341        }
342
343        if !self.request.sql.is_empty() {
344            let document = LintDocument::new(&self.request.sql, self.request.dialect, Vec::new());
345            self.issues.extend(linter.check_document(&document));
346        }
347    }
348
349    /// Pre-registers CREATE TABLE/VIEW targets so earlier statements can resolve them.
350    fn precollect_ddl(&mut self, statements: &[StatementInput]) {
351        for (index, stmt_input) in statements.iter().enumerate() {
352            match &stmt_input.statement {
353                Statement::CreateTable(create) => {
354                    self.precollect_create_table(create, index);
355                }
356                Statement::CreateView { name, .. } => {
357                    self.precollect_create_view(name);
358                }
359                _ => {}
360            }
361        }
362    }
363
364    /// Handles CREATE TABLE statements during DDL pre-collection.
365    fn precollect_create_table(
366        &mut self,
367        create: &sqlparser::ast::CreateTable,
368        statement_index: usize,
369    ) {
370        let canonical = self.normalize_table_name(&create.name.to_string());
371
372        if create.query.is_none() {
373            let (column_schemas, table_constraints) =
374                build_column_schemas_with_constraints(&create.columns, &create.constraints);
375
376            self.schema.seed_implied_schema_with_constraints(
377                &canonical,
378                column_schemas,
379                table_constraints,
380                create.temporary,
381                statement_index,
382            );
383        } else {
384            // This is a CTAS (CREATE TABLE ... AS SELECT ...).
385            // We mark the table as known to prevent UNRESOLVED_REFERENCE
386            // errors, but we don't have column schema yet.
387            self.schema.mark_table_known(&canonical);
388        }
389    }
390
391    /// Handles CREATE VIEW statements during DDL pre-collection.
392    fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
393        let canonical = self.normalize_table_name(&name.to_string());
394        self.schema.mark_table_known(&canonical);
395    }
396}
397
398#[cfg(test)]
399mod tests;