flowscope_core/
analyzer.rs

1use crate::types::*;
2use sqlparser::ast::Statement;
3use std::borrow::Cow;
4use std::collections::HashSet;
5use std::ops::Range;
6use std::sync::Arc;
7#[cfg(feature = "tracing")]
8use tracing::info_span;
9
10/// Maximum SQL input size (10MB) to prevent memory exhaustion.
11/// This matches the TypeScript validation limit.
12const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
13
14mod complexity;
15mod context;
16pub(crate) mod cross_statement;
17mod ddl;
18mod diagnostics;
19mod expression;
20mod functions;
21mod global;
22pub mod helpers;
23mod input;
24mod query;
25pub(crate) mod schema_registry;
26mod select_analyzer;
27mod statements;
28mod transform;
29pub mod visitor;
30
31use cross_statement::CrossStatementTracker;
32use helpers::{build_column_schemas_with_constraints, find_identifier_span};
33use input::{collect_statements, StatementInput};
34use schema_registry::SchemaRegistry;
35
36// Re-export for use in other analyzer modules
37pub(crate) use schema_registry::TableResolution;
38
39/// Main entry point for SQL analysis
40#[must_use]
41pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
42    #[cfg(feature = "tracing")]
43    let _span =
44        info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
45            .entered();
46    let mut analyzer = Analyzer::new(request);
47    analyzer.analyze()
48}
49
50/// Split SQL into statement spans.
51///
52/// Note: The `dialect` field in the request is reserved for future dialect-specific
53/// splitting behavior. The current implementation uses a universal tokenizer that
54/// handles common SQL constructs (strings, comments, dollar-quoting).
55#[must_use]
56pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
57    // Validate input size to prevent memory exhaustion
58    if request.sql.len() > MAX_SQL_LENGTH {
59        return StatementSplitResult::from_error(format!(
60            "SQL exceeds maximum length of {} bytes ({} bytes provided)",
61            MAX_SQL_LENGTH,
62            request.sql.len()
63        ));
64    }
65
66    StatementSplitResult {
67        statements: input::split_statement_spans(&request.sql),
68        error: None,
69    }
70}
71
72/// Internal analyzer state.
73///
74/// The analyzer is organized into focused components:
75/// - `schema`: Manages schema metadata, resolution, and normalization
76/// - `tracker`: Tracks cross-statement dependencies and lineage
77/// - `issues`: Collects warnings and errors during analysis
78/// - `statement_lineages`: Stores per-statement analysis results
79pub(crate) struct Analyzer<'a> {
80    pub(crate) request: &'a AnalyzeRequest,
81    pub(crate) issues: Vec<Issue>,
82    pub(crate) statement_lineages: Vec<StatementLineage>,
83    /// Schema registry for table/column resolution.
84    pub(crate) schema: SchemaRegistry,
85    /// Cross-statement dependency tracker.
86    pub(crate) tracker: CrossStatementTracker,
87    /// Whether column lineage is enabled.
88    pub(crate) column_lineage_enabled: bool,
89    /// Source slice for the currently analyzed statement (for span lookups).
90    current_statement_source: Option<StatementSourceSlice<'a>>,
91    /// Statements that already emitted a recursion-depth warning.
92    depth_limit_statements: HashSet<usize>,
93}
94
95impl<'a> Analyzer<'a> {
96    fn new(request: &'a AnalyzeRequest) -> Self {
97        // Check if column lineage is enabled (default: true)
98        let column_lineage_enabled = request
99            .options
100            .as_ref()
101            .and_then(|o| o.enable_column_lineage)
102            .unwrap_or(true);
103
104        let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
105
106        Self {
107            request,
108            issues: init_issues,
109            statement_lineages: Vec::new(),
110            schema,
111            tracker: CrossStatementTracker::new(),
112            column_lineage_enabled,
113            current_statement_source: None,
114            depth_limit_statements: HashSet::new(),
115        }
116    }
117
118    /// Finds the span of an identifier in the SQL text.
119    ///
120    /// This is used to attach source locations to issues for better error reporting.
121    pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
122        if let Some(source) = &self.current_statement_source {
123            let statement_sql = &source.sql[source.range.clone()];
124            return find_identifier_span(statement_sql, identifier, 0).map(|span| {
125                Span::new(
126                    source.range.start + span.start,
127                    source.range.start + span.end,
128                )
129            });
130        }
131
132        find_identifier_span(&self.request.sql, identifier, 0)
133    }
134
135    /// Returns the correct node ID and type for a relation (view vs table).
136    pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
137        self.tracker.relation_identity(canonical)
138    }
139
140    /// Returns the node ID for a relation.
141    pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
142        self.tracker.relation_node_id(canonical)
143    }
144
145    /// Check if implied schema capture is allowed (default: true).
146    pub(crate) fn allow_implied(&self) -> bool {
147        self.schema.allow_implied()
148    }
149
150    /// Canonicalizes a table reference using schema resolution.
151    pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
152        self.schema.canonicalize_table_reference(name)
153    }
154
155    /// Normalizes an identifier according to dialect case sensitivity.
156    pub(crate) fn normalize_identifier(&self, name: &str) -> String {
157        self.schema.normalize_identifier(name)
158    }
159
160    /// Normalizes a qualified table name.
161    pub(crate) fn normalize_table_name(&self, name: &str) -> String {
162        self.schema.normalize_table_name(name)
163    }
164
165    /// Emits a warning when expression traversal exceeds the recursion guard.
166    pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
167        if self.depth_limit_statements.insert(statement_index) {
168            self.issues.push(
169                Issue::warning(
170                    issue_codes::APPROXIMATE_LINEAGE,
171                    format!(
172                        "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
173                        expression::MAX_RECURSION_DEPTH
174                    ),
175                )
176                .with_statement(statement_index),
177            );
178        }
179    }
180
181    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
182    fn analyze(&mut self) -> AnalyzeResult {
183        let (all_statements, mut preflight_issues) = collect_statements(self.request);
184        self.issues.append(&mut preflight_issues);
185
186        #[cfg(feature = "tracing")]
187        tracing::Span::current().record("stmt_count", all_statements.len());
188
189        self.precollect_ddl(&all_statements);
190
191        if all_statements.is_empty() {
192            return self.build_result();
193        }
194
195        // Analyze all statements
196        for (
197            index,
198            StatementInput {
199                statement,
200                source_name,
201                source_sql,
202                source_range,
203                templating_applied,
204            },
205        ) in all_statements.into_iter().enumerate()
206        {
207            #[cfg(feature = "tracing")]
208            let _stmt_span = info_span!(
209                "analyze_statement",
210                index,
211                source = source_name.as_deref().map_or("inline", String::as_str),
212                stmt_type = ?statement
213            )
214            .entered();
215
216            // Extract resolved SQL when templating was applied
217            let resolved_sql = if templating_applied {
218                Some(source_sql[source_range.clone()].to_string())
219            } else {
220                None
221            };
222
223            self.current_statement_source = Some(StatementSourceSlice {
224                sql: source_sql,
225                range: source_range.clone(),
226            });
227
228            let source_name_owned = source_name.as_deref().map(String::from);
229            let result = self.analyze_statement(
230                index,
231                &statement,
232                source_name_owned,
233                source_range,
234                resolved_sql,
235            );
236            self.current_statement_source = None;
237
238            match result {
239                Ok(lineage) => {
240                    self.statement_lineages.push(lineage);
241                }
242                Err(e) => {
243                    self.issues.push(
244                        Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
245                    );
246                }
247            }
248        }
249
250        self.build_result()
251    }
252}
253
254struct StatementSourceSlice<'a> {
255    sql: Cow<'a, str>,
256    range: Range<usize>,
257}
258
259impl<'a> Analyzer<'a> {
260    /// Pre-registers CREATE TABLE/VIEW targets so earlier statements can resolve them.
261    fn precollect_ddl(&mut self, statements: &[StatementInput]) {
262        for (index, stmt_input) in statements.iter().enumerate() {
263            match &stmt_input.statement {
264                Statement::CreateTable(create) => {
265                    self.precollect_create_table(create, index);
266                }
267                Statement::CreateView { name, .. } => {
268                    self.precollect_create_view(name);
269                }
270                _ => {}
271            }
272        }
273    }
274
275    /// Handles CREATE TABLE statements during DDL pre-collection.
276    fn precollect_create_table(
277        &mut self,
278        create: &sqlparser::ast::CreateTable,
279        statement_index: usize,
280    ) {
281        let canonical = self.normalize_table_name(&create.name.to_string());
282
283        if create.query.is_none() {
284            let (column_schemas, table_constraints) =
285                build_column_schemas_with_constraints(&create.columns, &create.constraints);
286
287            self.schema.seed_implied_schema_with_constraints(
288                &canonical,
289                column_schemas,
290                table_constraints,
291                create.temporary,
292                statement_index,
293            );
294        } else {
295            // This is a CTAS (CREATE TABLE ... AS SELECT ...).
296            // We mark the table as known to prevent UNRESOLVED_REFERENCE
297            // errors, but we don't have column schema yet.
298            self.schema.mark_table_known(&canonical);
299        }
300    }
301
302    /// Handles CREATE VIEW statements during DDL pre-collection.
303    fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
304        let canonical = self.normalize_table_name(&name.to_string());
305        self.schema.mark_table_known(&canonical);
306    }
307}
308
309#[cfg(test)]
310mod tests;