Skip to main content

dissolve_python/
mypy_lsp.rs

1use std::process::Command;
2use tracing::{debug, error, info, warn};
3
4/// Mypy-based type introspection using dmypy daemon
5pub struct MypyTypeIntrospector {
6    workspace_root: String,
7    daemon_started: bool,
8    checked_files: std::collections::HashSet<String>,
9}
10
11impl MypyTypeIntrospector {
12    pub fn new(workspace_root: Option<&str>) -> Result<Self, String> {
13        let workspace_root = workspace_root.map(|s| s.to_string()).unwrap_or_else(|| {
14            std::env::current_dir()
15                .map(|p| p.to_string_lossy().to_string())
16                .unwrap_or_else(|_| ".".to_string())
17        });
18
19        Ok(Self {
20            workspace_root,
21            daemon_started: false,
22            checked_files: std::collections::HashSet::new(),
23        })
24    }
25
26    /// Start the mypy daemon if not already running
27    pub fn ensure_daemon_started(&mut self) -> Result<(), String> {
28        if self.daemon_started {
29            return Ok(());
30        }
31
32        // Check if daemon is already running
33        let status = Command::new("dmypy")
34            .arg("status")
35            .output()
36            .map_err(|e| format!("Failed to check dmypy status: {}", e))?;
37
38        if !status.status.success() {
39            // Start the daemon
40            info!("Starting dmypy daemon...");
41            let output = Command::new("dmypy")
42                .arg("start")
43                .arg("--")
44                .arg("--python-executable")
45                .arg("python3")
46                .env("PYTHONPATH", &self.workspace_root)
47                .current_dir(&self.workspace_root)
48                .output()
49                .map_err(|e| format!("Failed to start dmypy: {}", e))?;
50
51            if !output.status.success() {
52                let stderr = String::from_utf8_lossy(&output.stderr);
53                // Check if daemon is already running - this is fine
54                if stderr.contains("Daemon is still alive") || stderr.contains("already running") {
55                    debug!("dmypy daemon is already running, reusing existing daemon");
56                } else {
57                    return Err(format!("Failed to start dmypy daemon: {}", stderr));
58                }
59            }
60        }
61
62        self.daemon_started = true;
63        Ok(())
64    }
65
66    /// Check a file with mypy if not already checked
67    fn ensure_file_checked(&mut self, file_path: &str) -> Result<(), String> {
68        if self.checked_files.contains(file_path) {
69            return Ok(());
70        }
71
72        let check_output = Command::new("dmypy")
73            .arg("check")
74            .arg(file_path)
75            .env("PYTHONPATH", &self.workspace_root)
76            .current_dir(&self.workspace_root)
77            .output()
78            .map_err(|e| format!("Failed to run dmypy check: {}", e))?;
79
80        if !check_output.status.success() {
81            let stderr = String::from_utf8_lossy(&check_output.stderr);
82
83            // Handle daemon connection issues specially
84            if stderr.contains("Daemon has died") || stderr.contains("Daemon has crashed") {
85                warn!("dmypy daemon died, restarting...");
86                self.daemon_started = false;
87                self.ensure_daemon_started()?;
88                // Retry the check
89                return self.ensure_file_checked(file_path);
90            } else if stderr.contains("Resource temporarily unavailable")
91                || stderr.contains("Daemon may be busy")
92            {
93                warn!("dmypy daemon is busy, skipping check for {}", file_path);
94                self.checked_files.insert(file_path.to_string());
95                return Ok(());
96            }
97
98            warn!("dmypy check had errors for {}: {}", file_path, stderr);
99            // Continue anyway - mypy might still have type info despite errors
100        }
101
102        self.checked_files.insert(file_path.to_string());
103        Ok(())
104    }
105
106    /// Get the type of an expression at a specific location
107    pub fn get_type_at_position(
108        &mut self,
109        file_path: &str,
110        line: usize,
111        column: usize,
112    ) -> Result<Option<String>, String> {
113        self.ensure_daemon_started()?;
114        self.ensure_file_checked(file_path)?;
115
116        // Now inspect the type at the given position
117        let location = format!("{}:{}:{}", file_path, line, column);
118        tracing::debug!("dmypy inspect location: {}", location);
119        let output = Command::new("dmypy")
120            .arg("inspect")
121            .arg("--show")
122            .arg("type")
123            .arg("--verbose")
124            .arg("--verbose") // Double verbose for full type info
125            .arg("--force-reload") // Force reload to get fresh type info
126            .arg("--limit")
127            .arg("1")
128            .arg(&location)
129            .env("PYTHONPATH", &self.workspace_root)
130            .current_dir(&self.workspace_root)
131            .output()
132            .map_err(|e| format!("Failed to run dmypy inspect: {}", e))?;
133
134        if !output.status.success() {
135            let stderr = String::from_utf8_lossy(&output.stderr);
136            let stdout = String::from_utf8_lossy(&output.stdout);
137            tracing::debug!(
138                "dmypy inspect failed - stderr: '{}', stdout: '{}'",
139                stderr,
140                stdout
141            );
142            tracing::debug!("dmypy inspect command failed:");
143            tracing::debug!("  Location: {}", location);
144            tracing::debug!("  Workspace: {}", self.workspace_root);
145            tracing::debug!("  stderr: '{}'", stderr);
146            tracing::debug!("  stdout: '{}'", stdout);
147
148            // Handle daemon connection issues specially
149            if stderr.contains("Daemon has died") || stderr.contains("Daemon has crashed") {
150                warn!("dmypy daemon died during inspect, restarting...");
151                self.daemon_started = false;
152                self.ensure_daemon_started()?;
153                self.ensure_file_checked(file_path)?;
154                // Retry the inspect
155                return self.get_type_at_position(file_path, line, column);
156            } else if stderr.contains("Resource temporarily unavailable")
157                || stderr.contains("Daemon may be busy")
158            {
159                warn!(
160                    "dmypy daemon is busy during inspect at {}:{}:{}",
161                    file_path, line, column
162                );
163                return Ok(None);
164            }
165
166            error!(
167                "dmypy inspect failed at {}:{}:{} - {}",
168                file_path, line, column, stderr
169            );
170            return Err(format!("Type introspection failed: {}", stderr));
171        }
172
173        let stdout = String::from_utf8_lossy(&output.stdout);
174        tracing::debug!("dmypy inspect success - stdout: '{}'", stdout);
175
176        // dmypy inspect returns multiple lines - one type per expression at the position
177        // We want the most specific type that contains our module types
178        let lines: Vec<&str> = stdout.lines().collect();
179
180        if lines.is_empty() {
181            return Ok(None);
182        }
183
184        // Look for a concrete type in the output
185        for line in lines {
186            let trimmed = line.trim();
187            if trimmed.is_empty() || trimmed == "None" {
188                continue;
189            }
190
191            // Remove quotes if present
192            let type_str = trimmed.trim_matches('"');
193
194            // Skip if it's exactly "Any" - we need concrete types
195            if type_str == "Any" {
196                continue;
197            }
198
199            // If it contains a module path, it's likely what we want
200            if type_str.contains('.') && !type_str.contains("builtins.") {
201                // Extract the base type from union types like "dulwich.worktree.WorkTree | None"
202                if let Some(base_type) = type_str.split('|').next() {
203                    let base = base_type.trim();
204                    if base != "Any" {
205                        return Ok(Some(base.to_string()));
206                    }
207                }
208                return Ok(Some(type_str.to_string()));
209            }
210
211            // Return any non-Any type we find
212            return Ok(Some(type_str.to_string()));
213        }
214
215        // If we only found "Any" or nothing, return None
216        warn!("mypy could not determine a concrete type at {}:{}:{} - only found 'Any' or no type info", file_path, line, column);
217        Ok(None)
218    }
219
220    /// Get the fully qualified name of a type
221    pub fn resolve_type_fqn(
222        &mut self,
223        _file_path: &str,
224        type_name: &str,
225    ) -> Result<Option<String>, String> {
226        // For mypy, the type returned is already fully qualified
227        // so we can just return it as-is
228        Ok(Some(type_name.to_string()))
229    }
230
231    /// Invalidate cached type information for a file after modifications
232    pub fn invalidate_file(&mut self, file_path: &str) -> Result<(), String> {
233        tracing::debug!("Invalidating mypy cache for file: {}", file_path);
234
235        // Remove the file from checked files so it will be re-checked next time
236        self.checked_files.remove(file_path);
237
238        // dmypy will automatically detect file changes and re-analyze
239        // when we run check or inspect on it next time
240        Ok(())
241    }
242
243    /// Stop the dmypy daemon
244    pub fn stop_daemon(&mut self) -> Result<(), String> {
245        if !self.daemon_started {
246            return Ok(());
247        }
248
249        debug!("Stopping dmypy daemon...");
250        let output = Command::new("dmypy")
251            .arg("stop")
252            .output()
253            .map_err(|e| format!("Failed to stop dmypy: {}", e))?;
254
255        if !output.status.success() {
256            let stderr = String::from_utf8_lossy(&output.stderr);
257            warn!("Failed to stop dmypy daemon: {}", stderr);
258        } else {
259            debug!("Successfully stopped dmypy daemon");
260        }
261
262        self.daemon_started = false;
263        self.checked_files.clear();
264        Ok(())
265    }
266}
267
268impl Drop for MypyTypeIntrospector {
269    fn drop(&mut self) {
270        // Stop the daemon when this introspector is dropped
271        // This is fine because we'll reuse introspector instances between tests
272        if self.daemon_started {
273            let _ = self.stop_daemon(); // Ignore errors during cleanup
274        }
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use std::fs;
282    use tempfile::tempdir;
283
284    #[test]
285    fn test_mypy_type_introspection() {
286        let dir = tempdir().unwrap();
287        let test_file = dir.path().join("test.py");
288
289        fs::write(
290            &test_file,
291            r#"
292from typing import List
293
294def test_func() -> List[str]:
295    result = ["hello", "world"]
296    return result
297"#,
298        )
299        .unwrap();
300
301        let introspector_result = MypyTypeIntrospector::new(Some(dir.path().to_str().unwrap()));
302        if introspector_result.is_err() {
303            eprintln!(
304                "Skipping test - mypy is not available: {:?}",
305                introspector_result.err()
306            );
307            return;
308        }
309        let mut introspector = introspector_result.unwrap();
310
311        // Get type of 'result' variable
312        let type_info_result = introspector.get_type_at_position(
313            test_file.to_str().unwrap(),
314            5, // Line with 'result'
315            4, // Column at 'result'
316        );
317
318        if let Err(e) = &type_info_result {
319            eprintln!("get_type_at_position failed: {}", e);
320            eprintln!("Skipping test - mypy introspection not working properly");
321            return;
322        }
323
324        let type_info = type_info_result.unwrap();
325
326        assert!(type_info.is_some());
327        let type_str = type_info.unwrap();
328        assert!(type_str.contains("List") || type_str.contains("list"));
329    }
330}