dissolve_python/
mypy_lsp.rs1use std::process::Command;
2use tracing::{debug, error, info, warn};
3
4pub struct MypyTypeIntrospector {
6 workspace_root: String,
7 daemon_started: bool,
8 checked_files: std::collections::HashSet<String>,
9}
10
11impl MypyTypeIntrospector {
12 pub fn new(workspace_root: Option<&str>) -> Result<Self, String> {
13 let workspace_root = workspace_root.map(|s| s.to_string()).unwrap_or_else(|| {
14 std::env::current_dir()
15 .map(|p| p.to_string_lossy().to_string())
16 .unwrap_or_else(|_| ".".to_string())
17 });
18
19 Ok(Self {
20 workspace_root,
21 daemon_started: false,
22 checked_files: std::collections::HashSet::new(),
23 })
24 }
25
26 pub fn ensure_daemon_started(&mut self) -> Result<(), String> {
28 if self.daemon_started {
29 return Ok(());
30 }
31
32 let status = Command::new("dmypy")
34 .arg("status")
35 .output()
36 .map_err(|e| format!("Failed to check dmypy status: {}", e))?;
37
38 if !status.status.success() {
39 info!("Starting dmypy daemon...");
41 let output = Command::new("dmypy")
42 .arg("start")
43 .arg("--")
44 .arg("--python-executable")
45 .arg("python3")
46 .env("PYTHONPATH", &self.workspace_root)
47 .current_dir(&self.workspace_root)
48 .output()
49 .map_err(|e| format!("Failed to start dmypy: {}", e))?;
50
51 if !output.status.success() {
52 let stderr = String::from_utf8_lossy(&output.stderr);
53 if stderr.contains("Daemon is still alive") || stderr.contains("already running") {
55 debug!("dmypy daemon is already running, reusing existing daemon");
56 } else {
57 return Err(format!("Failed to start dmypy daemon: {}", stderr));
58 }
59 }
60 }
61
62 self.daemon_started = true;
63 Ok(())
64 }
65
66 fn ensure_file_checked(&mut self, file_path: &str) -> Result<(), String> {
68 if self.checked_files.contains(file_path) {
69 return Ok(());
70 }
71
72 let check_output = Command::new("dmypy")
73 .arg("check")
74 .arg(file_path)
75 .env("PYTHONPATH", &self.workspace_root)
76 .current_dir(&self.workspace_root)
77 .output()
78 .map_err(|e| format!("Failed to run dmypy check: {}", e))?;
79
80 if !check_output.status.success() {
81 let stderr = String::from_utf8_lossy(&check_output.stderr);
82
83 if stderr.contains("Daemon has died") || stderr.contains("Daemon has crashed") {
85 warn!("dmypy daemon died, restarting...");
86 self.daemon_started = false;
87 self.ensure_daemon_started()?;
88 return self.ensure_file_checked(file_path);
90 } else if stderr.contains("Resource temporarily unavailable")
91 || stderr.contains("Daemon may be busy")
92 {
93 warn!("dmypy daemon is busy, skipping check for {}", file_path);
94 self.checked_files.insert(file_path.to_string());
95 return Ok(());
96 }
97
98 warn!("dmypy check had errors for {}: {}", file_path, stderr);
99 }
101
102 self.checked_files.insert(file_path.to_string());
103 Ok(())
104 }
105
106 pub fn get_type_at_position(
108 &mut self,
109 file_path: &str,
110 line: usize,
111 column: usize,
112 ) -> Result<Option<String>, String> {
113 self.ensure_daemon_started()?;
114 self.ensure_file_checked(file_path)?;
115
116 let location = format!("{}:{}:{}", file_path, line, column);
118 tracing::debug!("dmypy inspect location: {}", location);
119 let output = Command::new("dmypy")
120 .arg("inspect")
121 .arg("--show")
122 .arg("type")
123 .arg("--verbose")
124 .arg("--verbose") .arg("--force-reload") .arg("--limit")
127 .arg("1")
128 .arg(&location)
129 .env("PYTHONPATH", &self.workspace_root)
130 .current_dir(&self.workspace_root)
131 .output()
132 .map_err(|e| format!("Failed to run dmypy inspect: {}", e))?;
133
134 if !output.status.success() {
135 let stderr = String::from_utf8_lossy(&output.stderr);
136 let stdout = String::from_utf8_lossy(&output.stdout);
137 tracing::debug!(
138 "dmypy inspect failed - stderr: '{}', stdout: '{}'",
139 stderr,
140 stdout
141 );
142 tracing::debug!("dmypy inspect command failed:");
143 tracing::debug!(" Location: {}", location);
144 tracing::debug!(" Workspace: {}", self.workspace_root);
145 tracing::debug!(" stderr: '{}'", stderr);
146 tracing::debug!(" stdout: '{}'", stdout);
147
148 if stderr.contains("Daemon has died") || stderr.contains("Daemon has crashed") {
150 warn!("dmypy daemon died during inspect, restarting...");
151 self.daemon_started = false;
152 self.ensure_daemon_started()?;
153 self.ensure_file_checked(file_path)?;
154 return self.get_type_at_position(file_path, line, column);
156 } else if stderr.contains("Resource temporarily unavailable")
157 || stderr.contains("Daemon may be busy")
158 {
159 warn!(
160 "dmypy daemon is busy during inspect at {}:{}:{}",
161 file_path, line, column
162 );
163 return Ok(None);
164 }
165
166 error!(
167 "dmypy inspect failed at {}:{}:{} - {}",
168 file_path, line, column, stderr
169 );
170 return Err(format!("Type introspection failed: {}", stderr));
171 }
172
173 let stdout = String::from_utf8_lossy(&output.stdout);
174 tracing::debug!("dmypy inspect success - stdout: '{}'", stdout);
175
176 let lines: Vec<&str> = stdout.lines().collect();
179
180 if lines.is_empty() {
181 return Ok(None);
182 }
183
184 for line in lines {
186 let trimmed = line.trim();
187 if trimmed.is_empty() || trimmed == "None" {
188 continue;
189 }
190
191 let type_str = trimmed.trim_matches('"');
193
194 if type_str == "Any" {
196 continue;
197 }
198
199 if type_str.contains('.') && !type_str.contains("builtins.") {
201 if let Some(base_type) = type_str.split('|').next() {
203 let base = base_type.trim();
204 if base != "Any" {
205 return Ok(Some(base.to_string()));
206 }
207 }
208 return Ok(Some(type_str.to_string()));
209 }
210
211 return Ok(Some(type_str.to_string()));
213 }
214
215 warn!("mypy could not determine a concrete type at {}:{}:{} - only found 'Any' or no type info", file_path, line, column);
217 Ok(None)
218 }
219
220 pub fn resolve_type_fqn(
222 &mut self,
223 _file_path: &str,
224 type_name: &str,
225 ) -> Result<Option<String>, String> {
226 Ok(Some(type_name.to_string()))
229 }
230
231 pub fn invalidate_file(&mut self, file_path: &str) -> Result<(), String> {
233 tracing::debug!("Invalidating mypy cache for file: {}", file_path);
234
235 self.checked_files.remove(file_path);
237
238 Ok(())
241 }
242
243 pub fn stop_daemon(&mut self) -> Result<(), String> {
245 if !self.daemon_started {
246 return Ok(());
247 }
248
249 debug!("Stopping dmypy daemon...");
250 let output = Command::new("dmypy")
251 .arg("stop")
252 .output()
253 .map_err(|e| format!("Failed to stop dmypy: {}", e))?;
254
255 if !output.status.success() {
256 let stderr = String::from_utf8_lossy(&output.stderr);
257 warn!("Failed to stop dmypy daemon: {}", stderr);
258 } else {
259 debug!("Successfully stopped dmypy daemon");
260 }
261
262 self.daemon_started = false;
263 self.checked_files.clear();
264 Ok(())
265 }
266}
267
268impl Drop for MypyTypeIntrospector {
269 fn drop(&mut self) {
270 if self.daemon_started {
273 let _ = self.stop_daemon(); }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use std::fs;
282 use tempfile::tempdir;
283
284 #[test]
285 fn test_mypy_type_introspection() {
286 let dir = tempdir().unwrap();
287 let test_file = dir.path().join("test.py");
288
289 fs::write(
290 &test_file,
291 r#"
292from typing import List
293
294def test_func() -> List[str]:
295 result = ["hello", "world"]
296 return result
297"#,
298 )
299 .unwrap();
300
301 let introspector_result = MypyTypeIntrospector::new(Some(dir.path().to_str().unwrap()));
302 if introspector_result.is_err() {
303 eprintln!(
304 "Skipping test - mypy is not available: {:?}",
305 introspector_result.err()
306 );
307 return;
308 }
309 let mut introspector = introspector_result.unwrap();
310
311 let type_info_result = introspector.get_type_at_position(
313 test_file.to_str().unwrap(),
314 5, 4, );
317
318 if let Err(e) = &type_info_result {
319 eprintln!("get_type_at_position failed: {}", e);
320 eprintln!("Skipping test - mypy introspection not working properly");
321 return;
322 }
323
324 let type_info = type_info_result.unwrap();
325
326 assert!(type_info.is_some());
327 let type_str = type_info.unwrap();
328 assert!(type_str.contains("List") || type_str.contains("list"));
329 }
330}