1use crate::types::*;
2use sqlparser::ast::Statement;
3use std::borrow::Cow;
4use std::collections::HashSet;
5use std::ops::Range;
6use std::sync::Arc;
7#[cfg(feature = "tracing")]
8use tracing::info_span;
9
10const MAX_SQL_LENGTH: usize = 10 * 1024 * 1024;
13
14mod complexity;
15mod context;
16pub(crate) mod cross_statement;
17mod ddl;
18mod diagnostics;
19mod expression;
20mod functions;
21mod global;
22pub mod helpers;
23mod input;
24mod query;
25pub(crate) mod schema_registry;
26mod select_analyzer;
27mod statements;
28mod transform;
29pub mod visitor;
30
31use cross_statement::CrossStatementTracker;
32use helpers::{build_column_schemas_with_constraints, find_identifier_span};
33use input::{collect_statements, StatementInput};
34use schema_registry::SchemaRegistry;
35
36pub(crate) use schema_registry::TableResolution;
38
39#[must_use]
41pub fn analyze(request: &AnalyzeRequest) -> AnalyzeResult {
42 #[cfg(feature = "tracing")]
43 let _span =
44 info_span!("analyze_request", statement_count = %request.sql.matches(';').count() + 1)
45 .entered();
46 let mut analyzer = Analyzer::new(request);
47 analyzer.analyze()
48}
49
50#[must_use]
56pub fn split_statements(request: &StatementSplitRequest) -> StatementSplitResult {
57 if request.sql.len() > MAX_SQL_LENGTH {
59 return StatementSplitResult::from_error(format!(
60 "SQL exceeds maximum length of {} bytes ({} bytes provided)",
61 MAX_SQL_LENGTH,
62 request.sql.len()
63 ));
64 }
65
66 StatementSplitResult {
67 statements: input::split_statement_spans(&request.sql),
68 error: None,
69 }
70}
71
72pub(crate) struct Analyzer<'a> {
80 pub(crate) request: &'a AnalyzeRequest,
81 pub(crate) issues: Vec<Issue>,
82 pub(crate) statement_lineages: Vec<StatementLineage>,
83 pub(crate) schema: SchemaRegistry,
85 pub(crate) tracker: CrossStatementTracker,
87 pub(crate) column_lineage_enabled: bool,
89 current_statement_source: Option<StatementSourceSlice<'a>>,
91 depth_limit_statements: HashSet<usize>,
93}
94
95impl<'a> Analyzer<'a> {
96 fn new(request: &'a AnalyzeRequest) -> Self {
97 let column_lineage_enabled = request
99 .options
100 .as_ref()
101 .and_then(|o| o.enable_column_lineage)
102 .unwrap_or(true);
103
104 let (schema, init_issues) = SchemaRegistry::new(request.schema.as_ref(), request.dialect);
105
106 Self {
107 request,
108 issues: init_issues,
109 statement_lineages: Vec::new(),
110 schema,
111 tracker: CrossStatementTracker::new(),
112 column_lineage_enabled,
113 current_statement_source: None,
114 depth_limit_statements: HashSet::new(),
115 }
116 }
117
118 pub(crate) fn find_span(&self, identifier: &str) -> Option<Span> {
122 if let Some(source) = &self.current_statement_source {
123 let statement_sql = &source.sql[source.range.clone()];
124 return find_identifier_span(statement_sql, identifier, 0).map(|span| {
125 Span::new(
126 source.range.start + span.start,
127 source.range.start + span.end,
128 )
129 });
130 }
131
132 find_identifier_span(&self.request.sql, identifier, 0)
133 }
134
135 pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
137 self.tracker.relation_identity(canonical)
138 }
139
140 pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
142 self.tracker.relation_node_id(canonical)
143 }
144
145 pub(crate) fn allow_implied(&self) -> bool {
147 self.schema.allow_implied()
148 }
149
150 pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
152 self.schema.canonicalize_table_reference(name)
153 }
154
155 pub(crate) fn normalize_identifier(&self, name: &str) -> String {
157 self.schema.normalize_identifier(name)
158 }
159
160 pub(crate) fn normalize_table_name(&self, name: &str) -> String {
162 self.schema.normalize_table_name(name)
163 }
164
165 pub(crate) fn emit_depth_limit_warning(&mut self, statement_index: usize) {
167 if self.depth_limit_statements.insert(statement_index) {
168 self.issues.push(
169 Issue::warning(
170 issue_codes::APPROXIMATE_LINEAGE,
171 format!(
172 "Expression recursion depth exceeded (>{}). Lineage may be incomplete.",
173 expression::MAX_RECURSION_DEPTH
174 ),
175 )
176 .with_statement(statement_index),
177 );
178 }
179 }
180
181 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(dialect = ?self.request.dialect, stmt_count)))]
182 fn analyze(&mut self) -> AnalyzeResult {
183 let (all_statements, mut preflight_issues) = collect_statements(self.request);
184 self.issues.append(&mut preflight_issues);
185
186 #[cfg(feature = "tracing")]
187 tracing::Span::current().record("stmt_count", all_statements.len());
188
189 self.precollect_ddl(&all_statements);
190
191 if all_statements.is_empty() {
192 return self.build_result();
193 }
194
195 for (
197 index,
198 StatementInput {
199 statement,
200 source_name,
201 source_sql,
202 source_range,
203 templating_applied,
204 },
205 ) in all_statements.into_iter().enumerate()
206 {
207 #[cfg(feature = "tracing")]
208 let _stmt_span = info_span!(
209 "analyze_statement",
210 index,
211 source = source_name.as_deref().map_or("inline", String::as_str),
212 stmt_type = ?statement
213 )
214 .entered();
215
216 let resolved_sql = if templating_applied {
218 Some(source_sql[source_range.clone()].to_string())
219 } else {
220 None
221 };
222
223 self.current_statement_source = Some(StatementSourceSlice {
224 sql: source_sql,
225 range: source_range.clone(),
226 });
227
228 let source_name_owned = source_name.as_deref().map(String::from);
229 let result = self.analyze_statement(
230 index,
231 &statement,
232 source_name_owned,
233 source_range,
234 resolved_sql,
235 );
236 self.current_statement_source = None;
237
238 match result {
239 Ok(lineage) => {
240 self.statement_lineages.push(lineage);
241 }
242 Err(e) => {
243 self.issues.push(
244 Issue::error(issue_codes::PARSE_ERROR, e.to_string()).with_statement(index),
245 );
246 }
247 }
248 }
249
250 self.build_result()
251 }
252}
253
254struct StatementSourceSlice<'a> {
255 sql: Cow<'a, str>,
256 range: Range<usize>,
257}
258
259impl<'a> Analyzer<'a> {
260 fn precollect_ddl(&mut self, statements: &[StatementInput]) {
262 for (index, stmt_input) in statements.iter().enumerate() {
263 match &stmt_input.statement {
264 Statement::CreateTable(create) => {
265 self.precollect_create_table(create, index);
266 }
267 Statement::CreateView { name, .. } => {
268 self.precollect_create_view(name);
269 }
270 _ => {}
271 }
272 }
273 }
274
275 fn precollect_create_table(
277 &mut self,
278 create: &sqlparser::ast::CreateTable,
279 statement_index: usize,
280 ) {
281 let canonical = self.normalize_table_name(&create.name.to_string());
282
283 if create.query.is_none() {
284 let (column_schemas, table_constraints) =
285 build_column_schemas_with_constraints(&create.columns, &create.constraints);
286
287 self.schema.seed_implied_schema_with_constraints(
288 &canonical,
289 column_schemas,
290 table_constraints,
291 create.temporary,
292 statement_index,
293 );
294 } else {
295 self.schema.mark_table_known(&canonical);
299 }
300 }
301
302 fn precollect_create_view(&mut self, name: &sqlparser::ast::ObjectName) {
304 let canonical = self.normalize_table_name(&name.to_string());
305 self.schema.mark_table_known(&canonical);
306 }
307}
308
309#[cfg(test)]
310mod tests;