pytest_language_server/fixtures/
imports.rs1use super::FixtureDatabase;
11use once_cell::sync::Lazy;
12use rustpython_parser::ast::Stmt;
13use std::collections::HashSet;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use tracing::{debug, info};
17
18static STDLIB_MODULES: Lazy<HashSet<&'static str>> = Lazy::new(|| {
20 [
21 "os",
22 "sys",
23 "re",
24 "json",
25 "typing",
26 "collections",
27 "functools",
28 "itertools",
29 "pathlib",
30 "datetime",
31 "time",
32 "math",
33 "random",
34 "copy",
35 "io",
36 "abc",
37 "contextlib",
38 "dataclasses",
39 "enum",
40 "logging",
41 "unittest",
42 "asyncio",
43 "concurrent",
44 "multiprocessing",
45 "threading",
46 "subprocess",
47 "shutil",
48 "tempfile",
49 "glob",
50 "fnmatch",
51 "pickle",
52 "sqlite3",
53 "urllib",
54 "http",
55 "email",
56 "html",
57 "xml",
58 "socket",
59 "ssl",
60 "select",
61 "signal",
62 "struct",
63 "codecs",
64 "textwrap",
65 "string",
66 "difflib",
67 "inspect",
68 "dis",
69 "traceback",
70 "warnings",
71 "weakref",
72 "types",
73 "importlib",
74 "pkgutil",
75 "pprint",
76 "reprlib",
77 "numbers",
78 "decimal",
79 "fractions",
80 "statistics",
81 "hashlib",
82 "hmac",
83 "secrets",
84 "base64",
85 "binascii",
86 "zlib",
87 "gzip",
88 "bz2",
89 "lzma",
90 "zipfile",
91 "tarfile",
92 "csv",
93 "configparser",
94 "argparse",
95 "getopt",
96 "getpass",
97 "platform",
98 "errno",
99 "ctypes",
100 "__future__",
101 ]
102 .into_iter()
103 .collect()
104});
105
106#[derive(Debug, Clone)]
108#[allow(dead_code)] pub struct FixtureImport {
110 pub module_path: String,
112 pub is_star_import: bool,
114 pub imported_names: Vec<String>,
116 pub importing_file: PathBuf,
118 pub line: usize,
120}
121
122impl FixtureDatabase {
123 pub(crate) fn extract_fixture_imports(
126 &self,
127 stmts: &[Stmt],
128 file_path: &Path,
129 line_index: &[usize],
130 ) -> Vec<FixtureImport> {
131 let mut imports = Vec::new();
132
133 for stmt in stmts {
134 if let Stmt::ImportFrom(import_from) = stmt {
135 let mut module = import_from
137 .module
138 .as_ref()
139 .map(|m| m.to_string())
140 .unwrap_or_default();
141
142 if let Some(_level) = import_from.level {
144 module = ".".to_string() + &module;
146 }
147
148 if self.is_standard_library_module(&module) {
150 continue;
151 }
152
153 let line =
154 self.get_line_from_offset(import_from.range.start().to_usize(), line_index);
155
156 let is_star = import_from
158 .names
159 .iter()
160 .any(|alias| alias.name.as_str() == "*");
161
162 if is_star {
163 imports.push(FixtureImport {
164 module_path: module,
165 is_star_import: true,
166 imported_names: Vec::new(),
167 importing_file: file_path.to_path_buf(),
168 line,
169 });
170 } else {
171 let names: Vec<String> = import_from
173 .names
174 .iter()
175 .map(|alias| alias.asname.as_ref().unwrap_or(&alias.name).to_string())
176 .collect();
177
178 if !names.is_empty() {
179 imports.push(FixtureImport {
180 module_path: module,
181 is_star_import: false,
182 imported_names: names,
183 importing_file: file_path.to_path_buf(),
184 line,
185 });
186 }
187 }
188 }
189 }
190
191 imports
192 }
193
194 fn is_standard_library_module(&self, module: &str) -> bool {
197 let first_part = module.split('.').next().unwrap_or(module);
198 STDLIB_MODULES.contains(first_part)
199 }
200
201 pub(crate) fn resolve_module_to_file(
204 &self,
205 module_path: &str,
206 importing_file: &Path,
207 ) -> Option<PathBuf> {
208 debug!(
209 "Resolving module '{}' from file {:?}",
210 module_path, importing_file
211 );
212
213 let parent_dir = importing_file.parent()?;
214
215 if module_path.starts_with('.') {
216 self.resolve_relative_import(module_path, parent_dir)
218 } else {
219 self.resolve_absolute_import(module_path, parent_dir)
221 }
222 }
223
224 fn resolve_relative_import(&self, module_path: &str, base_dir: &Path) -> Option<PathBuf> {
226 let mut current_dir = base_dir.to_path_buf();
227 let mut chars = module_path.chars().peekable();
228
229 while chars.peek() == Some(&'.') {
231 chars.next();
232 if chars.peek() != Some(&'.') {
233 break;
235 }
236 current_dir = current_dir.parent()?.to_path_buf();
238 }
239
240 let remaining: String = chars.collect();
241 if remaining.is_empty() {
242 let init_path = current_dir.join("__init__.py");
244 if init_path.exists() {
245 return Some(init_path);
246 }
247 return None;
248 }
249
250 self.find_module_file(&remaining, ¤t_dir)
251 }
252
253 fn resolve_absolute_import(&self, module_path: &str, start_dir: &Path) -> Option<PathBuf> {
255 let mut current_dir = start_dir.to_path_buf();
256
257 loop {
258 if let Some(path) = self.find_module_file(module_path, ¤t_dir) {
259 return Some(path);
260 }
261
262 match current_dir.parent() {
264 Some(parent) => current_dir = parent.to_path_buf(),
265 None => break,
266 }
267 }
268
269 None
270 }
271
272 fn find_module_file(&self, module_path: &str, base_dir: &Path) -> Option<PathBuf> {
274 let parts: Vec<&str> = module_path.split('.').collect();
275 let mut current_path = base_dir.to_path_buf();
276
277 for (i, part) in parts.iter().enumerate() {
278 let is_last = i == parts.len() - 1;
279
280 if is_last {
281 let py_file = current_path.join(format!("{}.py", part));
283 if py_file.exists() {
284 return Some(py_file);
285 }
286
287 let canonical_py_file = self.get_canonical_path(py_file.clone());
289 if self.file_cache.contains_key(&canonical_py_file) {
290 return Some(py_file);
291 }
292
293 let package_init = current_path.join(part).join("__init__.py");
295 if package_init.exists() {
296 return Some(package_init);
297 }
298
299 let canonical_package_init = self.get_canonical_path(package_init.clone());
301 if self.file_cache.contains_key(&canonical_package_init) {
302 return Some(package_init);
303 }
304 } else {
305 current_path = current_path.join(part);
307 if !current_path.is_dir() {
308 return None;
309 }
310 }
311 }
312
313 None
314 }
315
316 pub fn get_imported_fixtures(
322 &self,
323 file_path: &Path,
324 visited: &mut HashSet<PathBuf>,
325 ) -> HashSet<String> {
326 let canonical_path = self.get_canonical_path(file_path.to_path_buf());
327
328 if visited.contains(&canonical_path) {
330 debug!("Circular import detected for {:?}, skipping", file_path);
331 return HashSet::new();
332 }
333 visited.insert(canonical_path.clone());
334
335 let Some(content) = self.get_file_content(&canonical_path) else {
337 return HashSet::new();
338 };
339
340 let content_hash = Self::hash_content(&content);
341 let current_version = self
342 .definitions_version
343 .load(std::sync::atomic::Ordering::SeqCst);
344
345 if let Some(cached) = self.imported_fixtures_cache.get(&canonical_path) {
347 let (cached_content_hash, cached_version, cached_fixtures) = cached.value();
348 if *cached_content_hash == content_hash && *cached_version == current_version {
349 debug!("Cache hit for imported fixtures in {:?}", canonical_path);
350 return cached_fixtures.as_ref().clone();
351 }
352 }
353
354 let imported_fixtures = self.compute_imported_fixtures(&canonical_path, &content, visited);
356
357 self.imported_fixtures_cache.insert(
359 canonical_path.clone(),
360 (
361 content_hash,
362 current_version,
363 Arc::new(imported_fixtures.clone()),
364 ),
365 );
366
367 info!(
368 "Found {} imported fixtures for {:?}: {:?}",
369 imported_fixtures.len(),
370 file_path,
371 imported_fixtures
372 );
373
374 imported_fixtures
375 }
376
377 fn compute_imported_fixtures(
379 &self,
380 canonical_path: &Path,
381 content: &str,
382 visited: &mut HashSet<PathBuf>,
383 ) -> HashSet<String> {
384 let mut imported_fixtures = HashSet::new();
385
386 let Some(parsed) = self.get_parsed_ast(canonical_path, content) else {
387 return imported_fixtures;
388 };
389
390 let line_index = self.get_line_index(canonical_path, content);
391
392 if let rustpython_parser::ast::Mod::Module(module) = parsed.as_ref() {
393 let imports = self.extract_fixture_imports(&module.body, canonical_path, &line_index);
394
395 for import in imports {
396 let Some(resolved_path) =
398 self.resolve_module_to_file(&import.module_path, canonical_path)
399 else {
400 debug!(
401 "Could not resolve module '{}' from {:?}",
402 import.module_path, canonical_path
403 );
404 continue;
405 };
406
407 let resolved_canonical = self.get_canonical_path(resolved_path);
408
409 debug!(
410 "Resolved import '{}' to {:?}",
411 import.module_path, resolved_canonical
412 );
413
414 if import.is_star_import {
415 if let Some(file_fixtures) = self.file_definitions.get(&resolved_canonical) {
418 for fixture_name in file_fixtures.iter() {
419 imported_fixtures.insert(fixture_name.clone());
420 }
421 }
422
423 let transitive = self.get_imported_fixtures(&resolved_canonical, visited);
425 imported_fixtures.extend(transitive);
426 } else {
427 for name in &import.imported_names {
429 if self.definitions.contains_key(name) {
430 imported_fixtures.insert(name.clone());
431 }
432 }
433 }
434 }
435 }
436
437 imported_fixtures
438 }
439
440 pub fn is_fixture_imported_in_file(&self, fixture_name: &str, file_path: &Path) -> bool {
443 let mut visited = HashSet::new();
444 let imported = self.get_imported_fixtures(file_path, &mut visited);
445 imported.contains(fixture_name)
446 }
447}