testlint_sdk/runtime_coverage/
python.rs

1use crate::platform::{process_exists, signal_process, ProcessSignal};
2use crate::runtime_coverage::{CoverageSummary, RuntimeCoverageResult};
3use std::fs;
4use std::path::Path;
5use std::process::Command;
6use std::thread;
7use std::time::Duration;
8
9pub struct PythonRuntimeCoverage;
10
11impl Default for PythonRuntimeCoverage {
12    fn default() -> Self {
13        Self::new()
14    }
15}
16
17impl PythonRuntimeCoverage {
18    pub fn new() -> Self {
19        PythonRuntimeCoverage
20    }
21
22    /// Attach to a running Python process and collect coverage continuously
23    pub fn attach_and_collect(&self, pid: u32) -> Result<RuntimeCoverageResult, String> {
24        println!("šŸ Attaching coverage to Python process PID: {}", pid);
25        println!("šŸ“Š Collecting coverage continuously (press Ctrl+C to stop)...");
26
27        // Check if process exists
28        self.check_process_exists(pid)?;
29
30        // Create a Python script that will inject coverage into the target process
31        let inject_script = self.create_injection_script(pid)?;
32
33        // Run the injection script
34        println!("šŸ’‰ Injecting coverage.py into process...");
35        let output = Command::new("python3")
36            .arg(&inject_script)
37            .output()
38            .map_err(|e| format!("Failed to run injection script: {}", e))?;
39
40        if !output.status.success() {
41            let stderr = String::from_utf8_lossy(&output.stderr);
42            return Err(format!("Coverage injection failed: {}", stderr));
43        }
44
45        println!("āœ“ Coverage injected successfully");
46        println!("ā³ Collecting coverage... Press Ctrl+C to stop and save");
47
48        // Wait for Ctrl+C
49        use std::sync::atomic::{AtomicBool, Ordering};
50        use std::sync::Arc;
51
52        let running = Arc::new(AtomicBool::new(true));
53        let r = running.clone();
54
55        ctrlc::set_handler(move || {
56            r.store(false, Ordering::SeqCst);
57        })
58        .expect("Error setting Ctrl-C handler");
59
60        let start_time = std::time::Instant::now();
61
62        while running.load(Ordering::SeqCst) {
63            thread::sleep(Duration::from_millis(100));
64        }
65
66        let duration_secs = start_time.elapsed().as_secs();
67
68        // Signal the process to save coverage (send SIGUSR1 on Unix, file trigger on Windows)
69        println!("\nšŸ“ Signaling process to save coverage data...");
70
71        #[cfg(unix)]
72        {
73            signal_process(pid, ProcessSignal::User1)?;
74        }
75
76        #[cfg(windows)]
77        {
78            // On Windows, create a trigger file that the injected code watches for
79            let trigger_file = std::env::temp_dir().join(format!("coverage_trigger_{}.txt", pid));
80            fs::write(&trigger_file, "save")?;
81            println!(
82                "ā„¹ļø  Created trigger file (Windows alternative to SIGUSR1): {}",
83                trigger_file.display()
84            );
85        }
86
87        // Wait a bit for coverage to be saved
88        thread::sleep(Duration::from_secs(2));
89
90        // Find and parse the coverage file
91        let coverage_file = self.find_coverage_file(pid)?;
92        let summary = self.parse_coverage_summary(&coverage_file)?;
93
94        // Clean up injection script
95        let _ = fs::remove_file(&inject_script);
96
97        Ok(RuntimeCoverageResult {
98            language: "Python".to_string(),
99            pid,
100            duration_secs,
101            coverage_file,
102            summary,
103        })
104    }
105
106    /// Check if the process exists
107    fn check_process_exists(&self, pid: u32) -> Result<(), String> {
108        if !process_exists(pid)? {
109            return Err(format!("Process {} not found", pid));
110        }
111        Ok(())
112    }
113
114    /// Create a Python script that injects coverage into the target process
115    fn create_injection_script(&self, pid: u32) -> Result<String, String> {
116        let temp_dir = std::env::temp_dir();
117        let script_path = temp_dir.join(format!("coverage_inject_{}.py", pid));
118        let script_path_str = script_path.to_string_lossy().to_string();
119
120        let script_content = format!(
121            r#"#!/usr/bin/env python3
122"""
123Runtime coverage injection script.
124Injects coverage.py into a running Python process using gdb.
125"""
126import os
127import sys
128import time
129import signal
130import tempfile
131
132PID = {pid}
133
134# Python code to inject into the target process
135COVERAGE_CODE = '''
136import coverage
137import signal
138import tempfile
139import atexit
140
141# Start coverage
142cov = coverage.Coverage(data_file=".coverage.{{}}".format(os.getpid()), branch=True)
143cov.start()
144
145print("[Coverage] Coverage started for PID {{}}".format(os.getpid()), file=sys.stderr)
146
147# Save coverage on SIGUSR1
148def save_coverage(signum, frame):
149    print("[Coverage] Saving coverage data...", file=sys.stderr)
150    cov.stop()
151    cov.save()
152    print("[Coverage] Coverage saved to .coverage.{{}}".format(os.getpid()), file=sys.stderr)
153
154# Also save on exit
155atexit.register(lambda: (cov.stop(), cov.save()))
156signal.signal(signal.SIGUSR1, save_coverage)
157'''
158
159def inject_via_pyrasite():
160    """Try using pyrasite to inject code."""
161    try:
162        import pyrasite
163        print(f"Injecting coverage into PID {{PID}} using pyrasite...")
164
165        # Create temp file with coverage code
166        with open(f'/tmp/cov_inject_{{PID}}.py', 'w') as f:
167            f.write(COVERAGE_CODE)
168
169        # Inject
170        pyrasite.inject(PID, f'/tmp/cov_inject_{{PID}}.py')
171        print("āœ“ Coverage injected successfully")
172        return True
173    except ImportError:
174        print("pyrasite not installed, trying gdb method...")
175        return False
176    except Exception as e:
177        print(f"pyrasite injection failed: {{e}}")
178        return False
179
180def inject_via_gdb():
181    """Fallback: use gdb to inject Python code."""
182    import subprocess
183
184    print(f"Injecting coverage into PID {{PID}} using gdb...")
185
186    gdb_commands = f'''
187set pagination off
188call (void*)PyGILState_Ensure()
189call (int)PyRun_SimpleString("{{COVERAGE_CODE.replace(chr(10), '\\\\n')}}")
190call (void)PyGILState_Release($1)
191detach
192quit
193'''
194
195    with open(f'/tmp/gdb_commands_{{PID}}.txt', 'w') as f:
196        f.write(gdb_commands)
197
198    try:
199        result = subprocess.run(
200            ['gdb', '-p', str(PID), '-batch', '-x', f'/tmp/gdb_commands_{{PID}}.txt'],
201            capture_output=True,
202            text=True,
203            timeout=30
204        )
205
206        if result.returncode == 0:
207            print("āœ“ Coverage injected via gdb")
208            return True
209        else:
210            print(f"gdb injection failed: {{result.stderr}}")
211            return False
212    except subprocess.TimeoutExpired:
213        print("gdb injection timed out")
214        return False
215    except FileNotFoundError:
216        print("gdb not found. Please install gdb.")
217        return False
218    finally:
219        # Cleanup
220        try:
221            os.remove(f'/tmp/gdb_commands_{{PID}}.txt')
222        except:
223            pass
224
225# Try pyrasite first, fall back to gdb
226if not inject_via_pyrasite():
227    if not inject_via_gdb():
228        print("ERROR: Failed to inject coverage. Install pyrasite or gdb:")
229        print("  pip install pyrasite")
230        print("  or: apt-get install gdb / brew install gdb")
231        sys.exit(1)
232
233print("Coverage collection running continuously...")
234print("Process will save coverage when receiving SIGUSR1 signal")
235"#,
236            pid = pid
237        );
238
239        fs::write(&script_path, script_content)
240            .map_err(|e| format!("Failed to create injection script: {}", e))?;
241
242        // Make executable
243        #[cfg(unix)]
244        {
245            Command::new("chmod")
246                .args(["+x", &script_path_str])
247                .output()
248                .map_err(|e| format!("Failed to make script executable: {}", e))?;
249        }
250
251        Ok(script_path_str)
252    }
253
254    /// Find the coverage file generated by the process
255    fn find_coverage_file(&self, pid: u32) -> Result<String, String> {
256        let coverage_file = format!(".coverage.{}", pid);
257
258        if Path::new(&coverage_file).exists() {
259            return Ok(coverage_file);
260        }
261
262        // Try current directory
263        if Path::new(".coverage").exists() {
264            return Ok(".coverage".to_string());
265        }
266
267        Err(format!(
268            "Coverage file not found. Expected .coverage.{} or .coverage",
269            pid
270        ))
271    }
272
273    /// Parse coverage summary from coverage file
274    fn parse_coverage_summary(&self, coverage_file: &str) -> Result<CoverageSummary, String> {
275        println!("šŸ“Š Generating coverage report...");
276
277        // Run coverage report to get summary
278        let output = Command::new("coverage")
279            .args(["report", "--data-file", coverage_file])
280            .output()
281            .map_err(|e| format!("Failed to run coverage report: {}", e))?;
282
283        if !output.status.success() {
284            return Err(format!(
285                "Coverage report failed: {}",
286                String::from_utf8_lossy(&output.stderr)
287            ));
288        }
289
290        let report = String::from_utf8_lossy(&output.stdout);
291
292        // Parse the TOTAL line
293        // Format: TOTAL    1234   123   90%
294        let total_line = report
295            .lines()
296            .find(|line| line.starts_with("TOTAL"))
297            .ok_or("Could not find TOTAL line in coverage report")?;
298
299        let parts: Vec<&str> = total_line.split_whitespace().collect();
300        if parts.len() < 4 {
301            return Err("Invalid coverage report format".to_string());
302        }
303
304        let total_lines = parts[1]
305            .parse::<usize>()
306            .map_err(|_| "Invalid total lines")?;
307        let missed_lines = parts[2]
308            .parse::<usize>()
309            .map_err(|_| "Invalid missed lines")?;
310        let covered_lines = total_lines - missed_lines;
311        let coverage_percentage = parts[3]
312            .trim_end_matches('%')
313            .parse::<f64>()
314            .map_err(|_| "Invalid coverage percentage")?;
315
316        Ok(CoverageSummary {
317            total_lines,
318            covered_lines,
319            coverage_percentage,
320            total_branches: None,
321            covered_branches: None,
322            branch_percentage: None,
323        })
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn test_runtime_coverage_new() {
333        let coverage = PythonRuntimeCoverage::new();
334        assert_eq!(std::mem::size_of_val(&coverage), 0);
335    }
336
337    #[test]
338    fn test_runtime_coverage_default() {
339        let coverage = PythonRuntimeCoverage;
340        assert_eq!(std::mem::size_of_val(&coverage), 0);
341    }
342}