flowscope_core/
analyzer.rs

1use crate::types::*;
2use sqlparser::ast::Statement;
3use std::collections::HashSet;
4use std::ops::Range;
5use std::sync::Arc;
6#[cfg(feature = "tracing")]
7use tracing::info_span;
8
9/// Maximum SQL input size (10MB) to prevent memory exhaustion.
10/// This matches the TypeScript validation limit.
11const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
12
13mod complexity;
14mod context;
15pub(crate) mod cross_statement;
16mod ddl;
17mod diagnostics;
18mod expression;
19mod functions;
20mod global;
21pub mod helpers;
22mod input;
23mod query;
24pub(crate) mod schema_registry;
25mod select_analyzer;
26mod statements;
27mod transform;
28pub mod visitor;
29
30use cross_statement::CrossStatementTracker;
31use helpers::{build_column_schemas_with_constraints, find_identifier_span};
32use input::{collect_statements, StatementInput};
33use schema_registry::SchemaRegistry;
34
35// Re-export for use in other analyzer modules
36pub(crate) use schema_registry::TableResolution;
37
38/// Main entry point for SQL analysis
39#[must_use]
40pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
41    #[cfg(feature = "tracing")]
42    let _span =
43        info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
44            .entered();
45    let mut analyzer = Analyzer::new(request);
46    analyzer.analyze()
47}
48
49/// Split SQL into statement spans.
50///
51/// Note: The `dialect` field in the request is reserved for future dialect-specific
52/// splitting behavior. The current implementation uses a universal tokenizer that
53/// handles common SQL constructs (strings, comments, dollar-quoting).
54#[must_use]
55pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
56    // Validate input size to prevent memory exhaustion
57    if request.sql.len() > MAX_SQL_LENGTH {
58        return StatementSplitResult::from_error(format!(
59            "SQL exceeds maximum length of {} bytes ({} bytes provided)",
60            MAX_SQL_LENGTH,
61            request.sql.len()
62        ));
63    }
64
65    StatementSplitResult {
66        statements: input::split_statement_spans(&request.sql),
67        error: None,
68    }
69}
70
71/// Internal analyzer state.
72///
73/// The analyzer is organized into focused components:
74/// - `schema`: Manages schema metadata, resolution, and normalization
75/// - `tracker`: Tracks cross-statement dependencies and lineage
76/// - `issues`: Collects warnings and errors during analysis
77/// - `statement_lineages`: Stores per-statement analysis results
78pub(crate) struct Analyzer<'a> {
79    pub(crate) request: &'a AnalyzeRequest,
80    pub(crate) issues: Vec<Issue>,
81    pub(crate) statement_lineages: Vec<StatementLineage>,
82    /// Schema registry for table/column resolution.
83    pub(crate) schema: SchemaRegistry,
84    /// Cross-statement dependency tracker.
85    pub(crate) tracker: CrossStatementTracker,
86    /// Whether column lineage is enabled.
87    pub(crate) column_lineage_enabled: bool,
88    /// Source slice for the currently analyzed statement (for span lookups).
89    current_statement_source: Option<StatementSourceSlice<'a>>,
90    /// Statements that already emitted a recursion-depth warning.
91    depth_limit_statements: HashSet<usize>,
92}
93
94impl<'a> Analyzer<'a> {
95    fn new(request: &'a AnalyzeRequest) -> Self {
96        // Check if column lineage is enabled (default: true)
97        let column_lineage_enabled = request
98            .options
99            .as_ref()
100            .and_then(|o| o.enable_column_lineage)
101            .unwrap_or(true);
102
103        let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
104
105        Self {
106            request,
107            issues: init_issues,
108            statement_lineages: Vec::new(),
109            schema,
110            tracker: CrossStatementTracker::new(),
111            column_lineage_enabled,
112            current_statement_source: None,
113            depth_limit_statements: HashSet::new(),
114        }
115    }
116
117    /// Finds the span of an identifier in the SQL text.
118    ///
119    /// This is used to attach source locations to issues for better error reporting.
120    pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
121        if let Some(source) = &self.current_statement_source {
122            let statement_sql = &source.sql[source.range.clone()];
123            return find_identifier_span(statement_sql, identifier, 0).map(|span| {
124                Span::new(
125                    source.range.start + span.start,
126                    source.range.start + span.end,
127                )
128            });
129        }
130
131        find_identifier_span(&self.request.sql, identifier, 0)
132    }
133
134    /// Returns the correct node ID and type for a relation (view vs table).
135    pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
136        self.tracker.relation_identity(canonical)
137    }
138
139    /// Returns the node ID for a relation.
140    pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
141        self.tracker.relation_node_id(canonical)
142    }
143
144    /// Check if implied schema capture is allowed (default: true).
145    pub(crate) fn allow_implied(&self) -> bool {
146        self.schema.allow_implied()
147    }
148
149    /// Canonicalizes a table reference using schema resolution.
150    pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
151        self.schema.canonicalize_table_reference(name)
152    }
153
154    /// Normalizes an identifier according to dialect case sensitivity.
155    pub(crate) fn normalize_identifier(&self, name: &str) -> String {
156        self.schema.normalize_identifier(name)
157    }
158
159    /// Normalizes a qualified table name.
160    pub(crate) fn normalize_table_name(&self, name: &str) -> String {
161        self.schema.normalize_table_name(name)
162    }
163
164    /// Emits a warning when expression traversal exceeds the recursion guard.
165    pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
166        if self.depth_limit_statements.insert(statement_index) {
167            self.issues.push(
168                Issue::warning(
169                    issue_codes::APPROXIMATE_LINEAGE,
170                    format!(
171                        "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
172                        expression::MAX_RECURSION_DEPTH
173                    ),
174                )
175                .with_statement(statement_index),
176            );
177        }
178    }
179
180    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
181    fn analyze(&mut self) -> AnalyzeResult {
182        let (all_statements, mut preflight_issues) = collect_statements(self.request);
183        self.issues.append(&mut preflight_issues);
184
185        #[cfg(feature = "tracing")]
186        tracing::Span::current().record("stmt_count", all_statements.len());
187
188        self.precollect_ddl(&all_statements);
189
190        if all_statements.is_empty() {
191            return self.build_result();
192        }
193
194        // Analyze all statements
195        for (
196            index,
197            StatementInput {
198                statement,
199                source_name,
200                source_sql,
201                source_range,
202            },
203        ) in all_statements.into_iter().enumerate()
204        {
205            #[cfg(feature = "tracing")]
206            let _stmt_span = info_span!(
207                "analyze_statement",
208                index,
209                source = source_name.as_deref().map_or("inline", String::as_str),
210                stmt_type = ?statement
211            )
212            .entered();
213            self.current_statement_source = Some(StatementSourceSlice {
214                sql: source_sql,
215                range: source_range,
216            });
217
218            let source_name_owned = source_name.as_deref().map(String::from);
219            let result = self.analyze_statement(index, &statement, source_name_owned);
220            self.current_statement_source = None;
221
222            match result {
223                Ok(lineage) => {
224                    self.statement_lineages.push(lineage);
225                }
226                Err(e) => {
227                    self.issues.push(
228                        Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
229                    );
230                }
231            }
232        }
233
234        self.build_result()
235    }
236}
237
238struct StatementSourceSlice<'a> {
239    sql: &'a str,
240    range: Range<usize>,
241}
242
243impl<'a> Analyzer<'a> {
244    /// Pre-registers CREATE TABLE/VIEW targets so earlier statements can resolve them.
245    fn precollect_ddl(&mut self, statements: &[StatementInput]) {
246        for (index, stmt_input) in statements.iter().enumerate() {
247            match &stmt_input.statement {
248                Statement::CreateTable(create) => {
249                    self.precollect_create_table(create, index);
250                }
251                Statement::CreateView { name, .. } => {
252                    self.precollect_create_view(name);
253                }
254                _ => {}
255            }
256        }
257    }
258
259    /// Handles CREATE TABLE statements during DDL pre-collection.
260    fn precollect_create_table(
261        &mut self,
262        create: &sqlparser::ast::CreateTable,
263        statement_index: usize,
264    ) {
265        let canonical = self.normalize_table_name(&create.name.to_string());
266
267        if create.query.is_none() {
268            let (column_schemas, table_constraints) =
269                build_column_schemas_with_constraints(&create.columns, &create.constraints);
270
271            self.schema.seed_implied_schema_with_constraints(
272                &canonical,
273                column_schemas,
274                table_constraints,
275                create.temporary,
276                statement_index,
277            );
278        } else {
279            // This is a CTAS (CREATE TABLE ... AS SELECT ...).
280            // We mark the table as known to prevent UNRESOLVED_REFERENCE
281            // errors, but we don't have column schema yet.
282            self.schema.mark_table_known(&canonical);
283        }
284    }
285
286    /// Handles CREATE VIEW statements during DDL pre-collection.
287    fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
288        let canonical = self.normalize_table_name(&name.to_string());
289        self.schema.mark_table_known(&canonical);
290    }
291}
292
293#[cfg(test)]
294mod tests;