flowscope_core/
analyzer.rs1use crate::types::*;
2use sqlparser::ast::Statement;
3use std::collections::HashSet;
4use std::ops::Range;
5use std::sync::Arc;
6#[cfg(feature = "tracing")]
7use tracing::info_span;
8
9const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
12
13mod complexity;
14mod context;
15pub(crate) mod cross_statement;
16mod ddl;
17mod diagnostics;
18mod expression;
19mod functions;
20mod global;
21pub mod helpers;
22mod input;
23mod query;
24pub(crate) mod schema_registry;
25mod select_analyzer;
26mod statements;
27mod transform;
28pub mod visitor;
29
30use cross_statement::CrossStatementTracker;
31use helpers::{build_column_schemas_with_constraints, find_identifier_span};
32use input::{collect_statements, StatementInput};
33use schema_registry::SchemaRegistry;
34
35pub(crate) use schema_registry::TableResolution;
37
38#[must_use]
40pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
41 #[cfg(feature = "tracing")]
42 let _span =
43 info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
44 .entered();
45 let mut analyzer = Analyzer::new(request);
46 analyzer.analyze()
47}
48
49#[must_use]
55pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
56 if request.sql.len() > MAX_SQL_LENGTH {
58 return StatementSplitResult::from_error(format!(
59 "SQL exceeds maximum length of {} bytes ({} bytes provided)",
60 MAX_SQL_LENGTH,
61 request.sql.len()
62 ));
63 }
64
65 StatementSplitResult {
66 statements: input::split_statement_spans(&request.sql),
67 error: None,
68 }
69}
70
71pub(crate) struct Analyzer<'a> {
79 pub(crate) request: &'a AnalyzeRequest,
80 pub(crate) issues: Vec<Issue>,
81 pub(crate) statement_lineages: Vec<StatementLineage>,
82 pub(crate) schema: SchemaRegistry,
84 pub(crate) tracker: CrossStatementTracker,
86 pub(crate) column_lineage_enabled: bool,
88 current_statement_source: Option<StatementSourceSlice<'a>>,
90 depth_limit_statements: HashSet<usize>,
92}
93
94impl<'a> Analyzer<'a> {
95 fn new(request: &'a AnalyzeRequest) -> Self {
96 let column_lineage_enabled = request
98 .options
99 .as_ref()
100 .and_then(|o| o.enable_column_lineage)
101 .unwrap_or(true);
102
103 let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
104
105 Self {
106 request,
107 issues: init_issues,
108 statement_lineages: Vec::new(),
109 schema,
110 tracker: CrossStatementTracker::new(),
111 column_lineage_enabled,
112 current_statement_source: None,
113 depth_limit_statements: HashSet::new(),
114 }
115 }
116
117 pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
121 if let Some(source) = &self.current_statement_source {
122 let statement_sql = &source.sql[source.range.clone()];
123 return find_identifier_span(statement_sql, identifier, 0).map(|span| {
124 Span::new(
125 source.range.start + span.start,
126 source.range.start + span.end,
127 )
128 });
129 }
130
131 find_identifier_span(&self.request.sql, identifier, 0)
132 }
133
134 pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
136 self.tracker.relation_identity(canonical)
137 }
138
139 pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
141 self.tracker.relation_node_id(canonical)
142 }
143
144 pub(crate) fn allow_implied(&self) -> bool {
146 self.schema.allow_implied()
147 }
148
149 pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
151 self.schema.canonicalize_table_reference(name)
152 }
153
154 pub(crate) fn normalize_identifier(&self, name: &str) -> String {
156 self.schema.normalize_identifier(name)
157 }
158
159 pub(crate) fn normalize_table_name(&self, name: &str) -> String {
161 self.schema.normalize_table_name(name)
162 }
163
164 pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
166 if self.depth_limit_statements.insert(statement_index) {
167 self.issues.push(
168 Issue::warning(
169 issue_codes::APPROXIMATE_LINEAGE,
170 format!(
171 "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
172 expression::MAX_RECURSION_DEPTH
173 ),
174 )
175 .with_statement(statement_index),
176 );
177 }
178 }
179
180 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
181 fn analyze(&mut self) -> AnalyzeResult {
182 let (all_statements, mut preflight_issues) = collect_statements(self.request);
183 self.issues.append(&mut preflight_issues);
184
185 #[cfg(feature = "tracing")]
186 tracing::Span::current().record("stmt_count", all_statements.len());
187
188 self.precollect_ddl(&all_statements);
189
190 if all_statements.is_empty() {
191 return self.build_result();
192 }
193
194 for (
196 index,
197 StatementInput {
198 statement,
199 source_name,
200 source_sql,
201 source_range,
202 },
203 ) in all_statements.into_iter().enumerate()
204 {
205 #[cfg(feature = "tracing")]
206 let _stmt_span = info_span!(
207 "analyze_statement",
208 index,
209 source = source_name.as_deref().map_or("inline", String::as_str),
210 stmt_type = ?statement
211 )
212 .entered();
213 self.current_statement_source = Some(StatementSourceSlice {
214 sql: source_sql,
215 range: source_range,
216 });
217
218 let source_name_owned = source_name.as_deref().map(String::from);
219 let result = self.analyze_statement(index, &statement, source_name_owned);
220 self.current_statement_source = None;
221
222 match result {
223 Ok(lineage) => {
224 self.statement_lineages.push(lineage);
225 }
226 Err(e) => {
227 self.issues.push(
228 Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
229 );
230 }
231 }
232 }
233
234 self.build_result()
235 }
236}
237
238struct StatementSourceSlice<'a> {
239 sql: &'a str,
240 range: Range<usize>,
241}
242
243impl<'a> Analyzer<'a> {
244 fn precollect_ddl(&mut self, statements: &[StatementInput]) {
246 for (index, stmt_input) in statements.iter().enumerate() {
247 match &stmt_input.statement {
248 Statement::CreateTable(create) => {
249 self.precollect_create_table(create, index);
250 }
251 Statement::CreateView { name, .. } => {
252 self.precollect_create_view(name);
253 }
254 _ => {}
255 }
256 }
257 }
258
259 fn precollect_create_table(
261 &mut self,
262 create: &sqlparser::ast::CreateTable,
263 statement_index: usize,
264 ) {
265 let canonical = self.normalize_table_name(&create.name.to_string());
266
267 if create.query.is_none() {
268 let (column_schemas, table_constraints) =
269 build_column_schemas_with_constraints(&create.columns, &create.constraints);
270
271 self.schema.seed_implied_schema_with_constraints(
272 &canonical,
273 column_schemas,
274 table_constraints,
275 create.temporary,
276 statement_index,
277 );
278 } else {
279 self.schema.mark_table_known(&canonical);
283 }
284 }
285
286 fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
288 let canonical = self.normalize_table_name(&name.to_string());
289 self.schema.mark_table_known(&canonical);
290 }
291}
292
293#[cfg(test)]
294mod tests;