1use crate::linter::document::{LintDocument, LintStatement};
2use crate::linter::Linter;
3use crate::types::*;
4use sqlparser::ast::Statement;
5use std::borrow::Cow;
6use std::collections::HashSet;
7use std::ops::Range;
8use std::sync::Arc;
9#[cfg(feature = "tracing")]
10use tracing::info_span;
11
12const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
15
16mod complexity;
17mod context;
18pub(crate) mod cross_statement;
19mod ddl;
20mod diagnostics;
21mod expression;
22mod functions;
23mod global;
24pub mod helpers;
25mod input;
26mod query;
27pub(crate) mod schema_registry;
28mod select_analyzer;
29mod statements;
30mod transform;
31pub mod visitor;
32
33use cross_statement::CrossStatementTracker;
34use helpers::{build_column_schemas_with_constraints, find_identifier_span};
35use input::{collect_statements, StatementInput};
36use schema_registry::SchemaRegistry;
37
38pub(crate) use schema_registry::TableResolution;
40
41#[must_use]
43pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
44 #[cfg(feature = "tracing")]
45 let _span =
46 info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
47 .entered();
48 let mut analyzer = Analyzer::new(request);
49 analyzer.analyze()
50}
51
52#[must_use]
54pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
55 if request.sql.len() > MAX_SQL_LENGTH {
57 return StatementSplitResult::from_error(format!(
58 "SQL exceeds maximum length of {} bytes ({} bytes provided)",
59 MAX_SQL_LENGTH,
60 request.sql.len()
61 ));
62 }
63
64 StatementSplitResult {
65 statements: input::split_statement_spans_with_dialect(&request.sql, request.dialect),
66 error: None,
67 }
68}
69
70pub(crate) struct Analyzer<'a> {
78 pub(crate) request: &'a AnalyzeRequest,
79 pub(crate) issues: Vec<Issue>,
80 pub(crate) statement_lineages: Vec<StatementLineage>,
81 pub(crate) schema: SchemaRegistry,
83 pub(crate) tracker: CrossStatementTracker,
85 pub(crate) column_lineage_enabled: bool,
87 current_statement_source: Option<StatementSourceSlice<'a>>,
89 depth_limit_statements: HashSet<usize>,
91 linter: Option<Linter>,
93}
94
95impl<'a> Analyzer<'a> {
96 fn new(request: &'a AnalyzeRequest) -> Self {
97 let column_lineage_enabled = request
99 .options
100 .as_ref()
101 .and_then(|o| o.enable_column_lineage)
102 .unwrap_or(true);
103
104 let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
105
106 let linter = request
108 .options
109 .as_ref()
110 .and_then(|o| o.lint.clone())
111 .filter(|c| c.enabled)
112 .map(Linter::new);
113
114 Self {
115 request,
116 issues: init_issues,
117 statement_lineages: Vec::new(),
118 schema,
119 tracker: CrossStatementTracker::new(),
120 column_lineage_enabled,
121 current_statement_source: None,
122 depth_limit_statements: HashSet::new(),
123 linter,
124 }
125 }
126
127 pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
131 if let Some(source) = &self.current_statement_source {
132 let statement_sql = &source.sql[source.range.clone()];
133 return find_identifier_span(statement_sql, identifier, 0).map(|span| {
134 Span::new(
135 source.range.start + span.start,
136 source.range.start + span.end,
137 )
138 });
139 }
140
141 find_identifier_span(&self.request.sql, identifier, 0)
142 }
143
144 pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
146 self.tracker.relation_identity(canonical)
147 }
148
149 pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
151 self.tracker.relation_node_id(canonical)
152 }
153
154 pub(crate) fn allow_implied(&self) -> bool {
156 self.schema.allow_implied()
157 }
158
159 pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
161 self.schema.canonicalize_table_reference(name)
162 }
163
164 pub(crate) fn normalize_identifier(&self, name: &str) -> String {
166 self.schema.normalize_identifier(name)
167 }
168
169 pub(crate) fn normalize_table_name(&self, name: &str) -> String {
171 self.schema.normalize_table_name(name)
172 }
173
174 pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
176 if self.depth_limit_statements.insert(statement_index) {
177 self.issues.push(
178 Issue::warning(
179 issue_codes::APPROXIMATE_LINEAGE,
180 format!(
181 "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
182 expression::MAX_RECURSION_DEPTH
183 ),
184 )
185 .with_statement(statement_index),
186 );
187 }
188 }
189
190 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
191 fn analyze(&mut self) -> AnalyzeResult {
192 let (all_statements, mut preflight_issues) = collect_statements(self.request);
193 self.issues.append(&mut preflight_issues);
194
195 #[cfg(feature = "tracing")]
196 tracing::Span::current().record("stmt_count", all_statements.len());
197
198 self.precollect_ddl(&all_statements);
199
200 if all_statements.is_empty() {
201 self.run_lint_documents_without_statements();
202 return self.build_result();
203 }
204
205 self.run_lint_documents(&all_statements);
206
207 for (
209 index,
210 StatementInput {
211 statement,
212 source_name,
213 source_sql,
214 source_range,
215 templating_applied,
216 ..
217 },
218 ) in all_statements.into_iter().enumerate()
219 {
220 #[cfg(feature = "tracing")]
221 let _stmt_span = info_span!(
222 "analyze_statement",
223 index,
224 source = source_name.as_deref().map_or("inline", String::as_str),
225 stmt_type = ?statement
226 )
227 .entered();
228
229 let resolved_sql = if templating_applied {
231 Some(source_sql[source_range.clone()].to_string())
232 } else {
233 None
234 };
235 self.current_statement_source = Some(StatementSourceSlice {
236 sql: source_sql,
237 range: source_range.clone(),
238 });
239
240 let source_name_owned = source_name.as_deref().map(String::from);
241 let result = self.analyze_statement(
242 index,
243 &statement,
244 source_name_owned,
245 source_range,
246 resolved_sql,
247 );
248 self.current_statement_source = None;
249
250 match result {
251 Ok(lineage) => {
252 self.statement_lineages.push(lineage);
253 }
254 Err(e) => {
255 self.issues.push(
256 Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
257 );
258 }
259 }
260 }
261
262 self.build_result()
263 }
264}
265
266struct StatementSourceSlice<'a> {
267 sql: Cow<'a, str>,
268 range: Range<usize>,
269}
270
271impl<'a> Analyzer<'a> {
272 fn run_lint_documents(&mut self, statements: &[StatementInput<'a>]) {
273 let Some(linter) = self.linter.as_ref() else {
274 return;
275 };
276
277 let mut start = 0usize;
278 while start < statements.len() {
279 let source_name_key = statements[start]
280 .source_name
281 .as_deref()
282 .map(|name| name.as_str());
283 let source_sql_key = statements[start].source_sql.as_ref();
284 let source_untemplated_sql_key = statements[start].source_sql_untemplated.as_deref();
285
286 let mut end = start + 1;
287 while end < statements.len()
288 && statements[end]
289 .source_name
290 .as_deref()
291 .map(|name| name.as_str())
292 == source_name_key
293 && statements[end].source_sql.as_ref() == source_sql_key
294 && statements[end].source_sql_untemplated.as_deref() == source_untemplated_sql_key
295 {
296 end += 1;
297 }
298
299 let mut lint_statements = Vec::with_capacity(end - start);
300 let mut source_statement_ranges = Vec::with_capacity(end - start);
301 for (offset, statement_input) in statements[start..end].iter().enumerate() {
302 lint_statements.push(LintStatement {
303 statement: &statement_input.statement,
304 statement_index: offset,
305 statement_range: statement_input.source_range.clone(),
306 });
307 source_statement_ranges.push(statement_input.source_range_untemplated.clone());
308 }
309
310 let parser_fallback_used = statements[start..end]
311 .iter()
312 .any(|statement_input| statement_input.parser_fallback_used);
313 let document = LintDocument::new_with_parser_fallback_and_source(
314 source_sql_key,
315 source_untemplated_sql_key,
316 self.request.dialect,
317 lint_statements,
318 parser_fallback_used,
319 Some(source_statement_ranges),
320 );
321 self.issues.extend(linter.check_document(&document));
322
323 start = end;
324 }
325 }
326
327 fn run_lint_documents_without_statements(&mut self) {
328 let Some(linter) = self.linter.as_ref() else {
329 return;
330 };
331
332 if let Some(files) = &self.request.files {
333 if files.is_empty() {
334 return;
335 }
336 for file in files {
337 let document = LintDocument::new(&file.content, self.request.dialect, Vec::new());
338 self.issues.extend(linter.check_document(&document));
339 }
340 return;
341 }
342
343 if !self.request.sql.is_empty() {
344 let document = LintDocument::new(&self.request.sql, self.request.dialect, Vec::new());
345 self.issues.extend(linter.check_document(&document));
346 }
347 }
348
349 fn precollect_ddl(&mut self, statements: &[StatementInput]) {
351 for (index, stmt_input) in statements.iter().enumerate() {
352 match &stmt_input.statement {
353 Statement::CreateTable(create) => {
354 self.precollect_create_table(create, index);
355 }
356 Statement::CreateView { name, .. } => {
357 self.precollect_create_view(name);
358 }
359 _ => {}
360 }
361 }
362 }
363
364 fn precollect_create_table(
366 &mut self,
367 create: &sqlparser::ast::CreateTable,
368 statement_index: usize,
369 ) {
370 let canonical = self.normalize_table_name(&create.name.to_string());
371
372 if create.query.is_none() {
373 let (column_schemas, table_constraints) =
374 build_column_schemas_with_constraints(&create.columns, &create.constraints);
375
376 self.schema.seed_implied_schema_with_constraints(
377 &canonical,
378 column_schemas,
379 table_constraints,
380 create.temporary,
381 statement_index,
382 );
383 } else {
384 self.schema.mark_table_known(&canonical);
388 }
389 }
390
391 fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
393 let canonical = self.normalize_table_name(&name.to_string());
394 self.schema.mark_table_known(&canonical);
395 }
396}
397
398#[cfg(test)]
399mod tests;