testlint_sdk/runtime_coverage/
python.rs1use crate::platform::{process_exists, signal_process, ProcessSignal};
2use crate::runtime_coverage::{CoverageSummary, RuntimeCoverageResult};
3use std::fs;
4use std::path::Path;
5use std::process::Command;
6use std::thread;
7use std::time::Duration;
8
9pub struct PythonRuntimeCoverage;
10
11impl Default for PythonRuntimeCoverage {
12 fn default() -> Self {
13 Self::new()
14 }
15}
16
17impl PythonRuntimeCoverage {
18 pub fn new() -> Self {
19 PythonRuntimeCoverage
20 }
21
22 pub fn attach_and_collect(&self, pid: u32) -> Result<RuntimeCoverageResult, String> {
24 println!("š Attaching coverage to Python process PID: {}", pid);
25 println!("š Collecting coverage continuously (press Ctrl+C to stop)...");
26
27 self.check_process_exists(pid)?;
29
30 let inject_script = self.create_injection_script(pid)?;
32
33 println!("š Injecting coverage.py into process...");
35 let output = Command::new("python3")
36 .arg(&inject_script)
37 .output()
38 .map_err(|e| format!("Failed to run injection script: {}", e))?;
39
40 if !output.status.success() {
41 let stderr = String::from_utf8_lossy(&output.stderr);
42 return Err(format!("Coverage injection failed: {}", stderr));
43 }
44
45 println!("ā Coverage injected successfully");
46 println!("ā³ Collecting coverage... Press Ctrl+C to stop and save");
47
48 use std::sync::atomic::{AtomicBool, Ordering};
50 use std::sync::Arc;
51
52 let running = Arc::new(AtomicBool::new(true));
53 let r = running.clone();
54
55 ctrlc::set_handler(move || {
56 r.store(false, Ordering::SeqCst);
57 })
58 .expect("Error setting Ctrl-C handler");
59
60 let start_time = std::time::Instant::now();
61
62 while running.load(Ordering::SeqCst) {
63 thread::sleep(Duration::from_millis(100));
64 }
65
66 let duration_secs = start_time.elapsed().as_secs();
67
68 println!("\nš Signaling process to save coverage data...");
70
71 #[cfg(unix)]
72 {
73 signal_process(pid, ProcessSignal::User1)?;
74 }
75
76 #[cfg(windows)]
77 {
78 let trigger_file = std::env::temp_dir().join(format!("coverage_trigger_{}.txt", pid));
80 fs::write(&trigger_file, "save")?;
81 println!(
82 "ā¹ļø Created trigger file (Windows alternative to SIGUSR1): {}",
83 trigger_file.display()
84 );
85 }
86
87 thread::sleep(Duration::from_secs(2));
89
90 let coverage_file = self.find_coverage_file(pid)?;
92 let summary = self.parse_coverage_summary(&coverage_file)?;
93
94 let _ = fs::remove_file(&inject_script);
96
97 Ok(RuntimeCoverageResult {
98 language: "Python".to_string(),
99 pid,
100 duration_secs,
101 coverage_file,
102 summary,
103 })
104 }
105
106 fn check_process_exists(&self, pid: u32) -> Result<(), String> {
108 if !process_exists(pid)? {
109 return Err(format!("Process {} not found", pid));
110 }
111 Ok(())
112 }
113
114 fn create_injection_script(&self, pid: u32) -> Result<String, String> {
116 let temp_dir = std::env::temp_dir();
117 let script_path = temp_dir.join(format!("coverage_inject_{}.py", pid));
118 let script_path_str = script_path.to_string_lossy().to_string();
119
120 let script_content = format!(
121 r#"#!/usr/bin/env python3
122"""
123Runtime coverage injection script.
124Injects coverage.py into a running Python process using gdb.
125"""
126import os
127import sys
128import time
129import signal
130import tempfile
131
132PID = {pid}
133
134# Python code to inject into the target process
135COVERAGE_CODE = '''
136import coverage
137import signal
138import tempfile
139import atexit
140
141# Start coverage
142cov = coverage.Coverage(data_file=".coverage.{{}}".format(os.getpid()), branch=True)
143cov.start()
144
145print("[Coverage] Coverage started for PID {{}}".format(os.getpid()), file=sys.stderr)
146
147# Save coverage on SIGUSR1
148def save_coverage(signum, frame):
149 print("[Coverage] Saving coverage data...", file=sys.stderr)
150 cov.stop()
151 cov.save()
152 print("[Coverage] Coverage saved to .coverage.{{}}".format(os.getpid()), file=sys.stderr)
153
154# Also save on exit
155atexit.register(lambda: (cov.stop(), cov.save()))
156signal.signal(signal.SIGUSR1, save_coverage)
157'''
158
159def inject_via_pyrasite():
160 """Try using pyrasite to inject code."""
161 try:
162 import pyrasite
163 print(f"Injecting coverage into PID {{PID}} using pyrasite...")
164
165 # Create temp file with coverage code
166 with open(f'/tmp/cov_inject_{{PID}}.py', 'w') as f:
167 f.write(COVERAGE_CODE)
168
169 # Inject
170 pyrasite.inject(PID, f'/tmp/cov_inject_{{PID}}.py')
171 print("ā Coverage injected successfully")
172 return True
173 except ImportError:
174 print("pyrasite not installed, trying gdb method...")
175 return False
176 except Exception as e:
177 print(f"pyrasite injection failed: {{e}}")
178 return False
179
180def inject_via_gdb():
181 """Fallback: use gdb to inject Python code."""
182 import subprocess
183
184 print(f"Injecting coverage into PID {{PID}} using gdb...")
185
186 gdb_commands = f'''
187set pagination off
188call (void*)PyGILState_Ensure()
189call (int)PyRun_SimpleString("{{COVERAGE_CODE.replace(chr(10), '\\\\n')}}")
190call (void)PyGILState_Release($1)
191detach
192quit
193'''
194
195 with open(f'/tmp/gdb_commands_{{PID}}.txt', 'w') as f:
196 f.write(gdb_commands)
197
198 try:
199 result = subprocess.run(
200 ['gdb', '-p', str(PID), '-batch', '-x', f'/tmp/gdb_commands_{{PID}}.txt'],
201 capture_output=True,
202 text=True,
203 timeout=30
204 )
205
206 if result.returncode == 0:
207 print("ā Coverage injected via gdb")
208 return True
209 else:
210 print(f"gdb injection failed: {{result.stderr}}")
211 return False
212 except subprocess.TimeoutExpired:
213 print("gdb injection timed out")
214 return False
215 except FileNotFoundError:
216 print("gdb not found. Please install gdb.")
217 return False
218 finally:
219 # Cleanup
220 try:
221 os.remove(f'/tmp/gdb_commands_{{PID}}.txt')
222 except:
223 pass
224
225# Try pyrasite first, fall back to gdb
226if not inject_via_pyrasite():
227 if not inject_via_gdb():
228 print("ERROR: Failed to inject coverage. Install pyrasite or gdb:")
229 print(" pip install pyrasite")
230 print(" or: apt-get install gdb / brew install gdb")
231 sys.exit(1)
232
233print("Coverage collection running continuously...")
234print("Process will save coverage when receiving SIGUSR1 signal")
235"#,
236 pid = pid
237 );
238
239 fs::write(&script_path, script_content)
240 .map_err(|e| format!("Failed to create injection script: {}", e))?;
241
242 #[cfg(unix)]
244 {
245 Command::new("chmod")
246 .args(["+x", &script_path_str])
247 .output()
248 .map_err(|e| format!("Failed to make script executable: {}", e))?;
249 }
250
251 Ok(script_path_str)
252 }
253
254 fn find_coverage_file(&self, pid: u32) -> Result<String, String> {
256 let coverage_file = format!(".coverage.{}", pid);
257
258 if Path::new(&coverage_file).exists() {
259 return Ok(coverage_file);
260 }
261
262 if Path::new(".coverage").exists() {
264 return Ok(".coverage".to_string());
265 }
266
267 Err(format!(
268 "Coverage file not found. Expected .coverage.{} or .coverage",
269 pid
270 ))
271 }
272
273 fn parse_coverage_summary(&self, coverage_file: &str) -> Result<CoverageSummary, String> {
275 println!("š Generating coverage report...");
276
277 let output = Command::new("coverage")
279 .args(["report", "--data-file", coverage_file])
280 .output()
281 .map_err(|e| format!("Failed to run coverage report: {}", e))?;
282
283 if !output.status.success() {
284 return Err(format!(
285 "Coverage report failed: {}",
286 String::from_utf8_lossy(&output.stderr)
287 ));
288 }
289
290 let report = String::from_utf8_lossy(&output.stdout);
291
292 let total_line = report
295 .lines()
296 .find(|line| line.starts_with("TOTAL"))
297 .ok_or("Could not find TOTAL line in coverage report")?;
298
299 let parts: Vec<&str> = total_line.split_whitespace().collect();
300 if parts.len() < 4 {
301 return Err("Invalid coverage report format".to_string());
302 }
303
304 let total_lines = parts[1]
305 .parse::<usize>()
306 .map_err(|_| "Invalid total lines")?;
307 let missed_lines = parts[2]
308 .parse::<usize>()
309 .map_err(|_| "Invalid missed lines")?;
310 let covered_lines = total_lines - missed_lines;
311 let coverage_percentage = parts[3]
312 .trim_end_matches('%')
313 .parse::<f64>()
314 .map_err(|_| "Invalid coverage percentage")?;
315
316 Ok(CoverageSummary {
317 total_lines,
318 covered_lines,
319 coverage_percentage,
320 total_branches: None,
321 covered_branches: None,
322 branch_percentage: None,
323 })
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn test_runtime_coverage_new() {
333 let coverage = PythonRuntimeCoverage::new();
334 assert_eq!(std::mem::size_of_val(&coverage), 0);
335 }
336
337 #[test]
338 fn test_runtime_coverage_default() {
339 let coverage = PythonRuntimeCoverage;
340 assert_eq!(std::mem::size_of_val(&coverage), 0);
341 }
342}