pytest_language_server/fixtures/
imports.rs1use super::FixtureDatabase;
11use rustpython_parser::ast::Stmt;
12use std::collections::HashSet;
13use std::path::{Path, PathBuf};
14use tracing::{debug, info};
15
16#[derive(Debug, Clone)]
18#[allow(dead_code)] pub struct FixtureImport {
20 pub module_path: String,
22 pub is_star_import: bool,
24 pub imported_names: Vec<String>,
26 pub importing_file: PathBuf,
28 pub line: usize,
30}
31
32impl FixtureDatabase {
33 pub(crate) fn extract_fixture_imports(
36 &self,
37 stmts: &[Stmt],
38 file_path: &Path,
39 line_index: &[usize],
40 ) -> Vec<FixtureImport> {
41 let mut imports = Vec::new();
42
43 for stmt in stmts {
44 if let Stmt::ImportFrom(import_from) = stmt {
45 let mut module = import_from
47 .module
48 .as_ref()
49 .map(|m| m.to_string())
50 .unwrap_or_default();
51
52 if let Some(_level) = import_from.level {
54 module = ".".to_string() + &module;
56 }
57
58 if self.is_standard_library_module(&module) {
60 continue;
61 }
62
63 let line =
64 self.get_line_from_offset(import_from.range.start().to_usize(), line_index);
65
66 let is_star = import_from
68 .names
69 .iter()
70 .any(|alias| alias.name.as_str() == "*");
71
72 if is_star {
73 imports.push(FixtureImport {
74 module_path: module,
75 is_star_import: true,
76 imported_names: Vec::new(),
77 importing_file: file_path.to_path_buf(),
78 line,
79 });
80 } else {
81 let names: Vec<String> = import_from
83 .names
84 .iter()
85 .map(|alias| alias.asname.as_ref().unwrap_or(&alias.name).to_string())
86 .collect();
87
88 if !names.is_empty() {
89 imports.push(FixtureImport {
90 module_path: module,
91 is_star_import: false,
92 imported_names: names,
93 importing_file: file_path.to_path_buf(),
94 line,
95 });
96 }
97 }
98 }
99 }
100
101 imports
102 }
103
104 fn is_standard_library_module(&self, module: &str) -> bool {
106 let stdlib_prefixes = [
107 "os",
108 "sys",
109 "re",
110 "json",
111 "typing",
112 "collections",
113 "functools",
114 "itertools",
115 "pathlib",
116 "datetime",
117 "time",
118 "math",
119 "random",
120 "copy",
121 "io",
122 "abc",
123 "contextlib",
124 "dataclasses",
125 "enum",
126 "logging",
127 "unittest",
128 "asyncio",
129 "concurrent",
130 "multiprocessing",
131 "threading",
132 "subprocess",
133 "shutil",
134 "tempfile",
135 "glob",
136 "fnmatch",
137 "pickle",
138 "sqlite3",
139 "urllib",
140 "http",
141 "email",
142 "html",
143 "xml",
144 "socket",
145 "ssl",
146 "select",
147 "signal",
148 "struct",
149 "codecs",
150 "textwrap",
151 "string",
152 "difflib",
153 "inspect",
154 "dis",
155 "traceback",
156 "warnings",
157 "weakref",
158 "types",
159 "importlib",
160 "pkgutil",
161 "pprint",
162 "reprlib",
163 "numbers",
164 "decimal",
165 "fractions",
166 "statistics",
167 "hashlib",
168 "hmac",
169 "secrets",
170 "base64",
171 "binascii",
172 "zlib",
173 "gzip",
174 "bz2",
175 "lzma",
176 "zipfile",
177 "tarfile",
178 "csv",
179 "configparser",
180 "argparse",
181 "getopt",
182 "getpass",
183 "platform",
184 "errno",
185 "ctypes",
186 "__future__",
187 ];
188
189 let first_part = module.split('.').next().unwrap_or(module);
190 stdlib_prefixes.contains(&first_part)
191 }
192
193 pub(crate) fn resolve_module_to_file(
196 &self,
197 module_path: &str,
198 importing_file: &Path,
199 ) -> Option<PathBuf> {
200 debug!(
201 "Resolving module '{}' from file {:?}",
202 module_path, importing_file
203 );
204
205 let parent_dir = importing_file.parent()?;
206
207 if module_path.starts_with('.') {
208 self.resolve_relative_import(module_path, parent_dir)
210 } else {
211 self.resolve_absolute_import(module_path, parent_dir)
213 }
214 }
215
216 fn resolve_relative_import(&self, module_path: &str, base_dir: &Path) -> Option<PathBuf> {
218 let mut current_dir = base_dir.to_path_buf();
219 let mut chars = module_path.chars().peekable();
220
221 while chars.peek() == Some(&'.') {
223 chars.next();
224 if chars.peek() != Some(&'.') {
225 break;
227 }
228 current_dir = current_dir.parent()?.to_path_buf();
230 }
231
232 let remaining: String = chars.collect();
233 if remaining.is_empty() {
234 let init_path = current_dir.join("__init__.py");
236 if init_path.exists() {
237 return Some(init_path);
238 }
239 return None;
240 }
241
242 self.find_module_file(&remaining, ¤t_dir)
243 }
244
245 fn resolve_absolute_import(&self, module_path: &str, start_dir: &Path) -> Option<PathBuf> {
247 let mut current_dir = start_dir.to_path_buf();
248
249 loop {
250 if let Some(path) = self.find_module_file(module_path, ¤t_dir) {
251 return Some(path);
252 }
253
254 match current_dir.parent() {
256 Some(parent) => current_dir = parent.to_path_buf(),
257 None => break,
258 }
259 }
260
261 None
262 }
263
264 fn find_module_file(&self, module_path: &str, base_dir: &Path) -> Option<PathBuf> {
266 let parts: Vec<&str> = module_path.split('.').collect();
267 let mut current_path = base_dir.to_path_buf();
268
269 for (i, part) in parts.iter().enumerate() {
270 let is_last = i == parts.len() - 1;
271
272 if is_last {
273 let py_file = current_path.join(format!("{}.py", part));
275 if py_file.exists() {
276 return Some(py_file);
277 }
278
279 let canonical_py_file = self.get_canonical_path(py_file.clone());
281 if self.file_cache.contains_key(&canonical_py_file) {
282 return Some(py_file);
283 }
284
285 let package_init = current_path.join(part).join("__init__.py");
287 if package_init.exists() {
288 return Some(package_init);
289 }
290
291 let canonical_package_init = self.get_canonical_path(package_init.clone());
293 if self.file_cache.contains_key(&canonical_package_init) {
294 return Some(package_init);
295 }
296 } else {
297 current_path = current_path.join(part);
299 if !current_path.is_dir() {
300 return None;
301 }
302 }
303 }
304
305 None
306 }
307
308 pub fn get_imported_fixtures(
313 &self,
314 file_path: &Path,
315 visited: &mut HashSet<PathBuf>,
316 ) -> HashSet<String> {
317 let canonical_path = self.get_canonical_path(file_path.to_path_buf());
318
319 if visited.contains(&canonical_path) {
321 debug!("Circular import detected for {:?}, skipping", file_path);
322 return HashSet::new();
323 }
324 visited.insert(canonical_path.clone());
325
326 let mut imported_fixtures = HashSet::new();
327
328 let Some(content) = self.get_file_content(&canonical_path) else {
330 return imported_fixtures;
331 };
332
333 let Some(parsed) = self.get_parsed_ast(&canonical_path, &content) else {
334 return imported_fixtures;
335 };
336
337 let line_index = self.get_line_index(&canonical_path, &content);
338
339 if let rustpython_parser::ast::Mod::Module(module) = parsed.as_ref() {
340 let imports = self.extract_fixture_imports(&module.body, &canonical_path, &line_index);
341
342 for import in imports {
343 let Some(resolved_path) =
345 self.resolve_module_to_file(&import.module_path, &canonical_path)
346 else {
347 debug!(
348 "Could not resolve module '{}' from {:?}",
349 import.module_path, canonical_path
350 );
351 continue;
352 };
353
354 let resolved_canonical = self.get_canonical_path(resolved_path);
355
356 debug!(
357 "Resolved import '{}' to {:?}",
358 import.module_path, resolved_canonical
359 );
360
361 if import.is_star_import {
362 if let Some(file_fixtures) = self.file_definitions.get(&resolved_canonical) {
365 for fixture_name in file_fixtures.iter() {
366 imported_fixtures.insert(fixture_name.clone());
367 }
368 }
369
370 let transitive = self.get_imported_fixtures(&resolved_canonical, visited);
372 imported_fixtures.extend(transitive);
373 } else {
374 for name in &import.imported_names {
376 if self.definitions.contains_key(name) {
377 imported_fixtures.insert(name.clone());
378 }
379 }
380 }
381 }
382 }
383
384 info!(
385 "Found {} imported fixtures for {:?}: {:?}",
386 imported_fixtures.len(),
387 file_path,
388 imported_fixtures
389 );
390
391 imported_fixtures
392 }
393
394 pub fn is_fixture_imported_in_file(&self, fixture_name: &str, file_path: &Path) -> bool {
397 let mut visited = HashSet::new();
398 let imported = self.get_imported_fixtures(file_path, &mut visited);
399 imported.contains(fixture_name)
400 }
401}