use crate::config::{OutputConfig, OutputFormat};
use crate::core::process_monitor::ProcessMonitor;
use crate::error::DenetError;
use crate::monitor::{Summary, SummaryGenerator};
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use std::time::Duration;
fn map_io_error(err: std::io::Error) -> pyo3::PyErr {
pyo3::exceptions::PyRuntimeError::new_err(format!("IO Error: {err}"))
}
#[pyclass(name = "ProcessMonitor")]
struct PyProcessMonitor {
inner: ProcessMonitor,
samples: Vec<String>,
output_config: OutputConfig,
metadata_written: bool,
}
fn build_output_config(
output_file: Option<String>,
output_format: &str,
store_in_memory: bool,
quiet: bool,
write_metadata: bool,
) -> PyResult<OutputConfig> {
let mut builder = OutputConfig::builder()
.format_str(output_format)?
.store_in_memory(store_in_memory)
.quiet(quiet)
.write_metadata(write_metadata);
if let Some(path) = output_file {
builder = builder.output_file(path);
}
Ok(builder.build())
}
#[pymethods]
impl PyProcessMonitor {
#[new]
#[pyo3(signature = (cmd, base_interval_ms, max_interval_ms, since_process_start=false, output_file=None, output_format="jsonl", store_in_memory=true, quiet=false, include_children=true, write_metadata=false))]
#[allow(clippy::too_many_arguments)]
fn new(
cmd: Vec<String>,
base_interval_ms: u64,
max_interval_ms: u64,
since_process_start: bool,
output_file: Option<String>,
output_format: &str,
store_in_memory: bool,
quiet: bool,
include_children: bool,
write_metadata: bool,
) -> PyResult<Self> {
let output_config = build_output_config(
output_file,
output_format,
store_in_memory,
quiet,
write_metadata,
)?;
let mut inner = ProcessMonitor::new_with_options(
cmd,
Duration::from_millis(base_interval_ms),
Duration::from_millis(max_interval_ms),
since_process_start,
)
.map_err(map_io_error)?;
inner.set_include_children(include_children);
Ok(PyProcessMonitor {
inner,
samples: Vec::new(),
output_config,
metadata_written: false,
})
}
#[staticmethod]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (pid, base_interval_ms, max_interval_ms, since_process_start=false, output_file=None, output_format="jsonl", store_in_memory=true, quiet=false, include_children=true, write_metadata=false))]
fn from_pid(
pid: usize,
base_interval_ms: u64,
max_interval_ms: u64,
since_process_start: bool,
output_file: Option<String>,
output_format: &str,
store_in_memory: bool,
quiet: bool,
include_children: bool,
write_metadata: Option<bool>,
) -> PyResult<Self> {
let output_config = build_output_config(
output_file,
output_format,
store_in_memory,
quiet,
write_metadata.unwrap_or(false),
)?;
let mut inner = ProcessMonitor::from_pid_with_options(
pid,
Duration::from_millis(base_interval_ms),
Duration::from_millis(max_interval_ms),
since_process_start,
)
.map_err(map_io_error)?;
inner.set_include_children(include_children);
Ok(PyProcessMonitor {
inner,
samples: Vec::new(),
output_config,
metadata_written: false,
})
}
#[staticmethod]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (cmd, stdout_file=None, stderr_file=None, timeout=None, base_interval_ms=100, max_interval_ms=1000, store_in_memory=true, output_file=None, output_format="jsonl", since_process_start=false, pause_for_attachment=true, quiet=false, include_children=true))]
fn execute_with_monitoring(
py: Python,
cmd: Vec<String>,
stdout_file: Option<String>,
stderr_file: Option<String>,
timeout: Option<f64>,
base_interval_ms: u64,
max_interval_ms: u64,
store_in_memory: bool,
output_file: Option<String>,
output_format: &str,
since_process_start: bool,
pause_for_attachment: bool,
quiet: bool,
include_children: bool,
) -> PyResult<(i32, PyProcessMonitor)> {
use std::fs::OpenOptions;
use std::time::Duration;
let subprocess = py.import_bound("subprocess")?;
let os = py.import_bound("os")?;
let signal = py.import_bound("signal")?;
let _time = py.import_bound("time")?;
let stdout_arg = if let Some(path) = &stdout_file {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.map_err(map_io_error)?;
Some(file)
} else {
None
};
let stderr_arg = if let Some(path) = &stderr_file {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.map_err(map_io_error)?;
Some(file)
} else {
None
};
let popen_kwargs = pyo3::types::PyDict::new_bound(py);
popen_kwargs.set_item("start_new_session", true)?;
if stdout_arg.is_some() {
popen_kwargs.set_item("stdout", stdout_file.as_ref().unwrap())?;
}
if stderr_arg.is_some() {
popen_kwargs.set_item("stderr", stderr_file.as_ref().unwrap())?;
}
let process = subprocess.call_method("Popen", (cmd.clone(),), Some(&popen_kwargs))?;
let pid: i32 = process.getattr("pid")?.extract()?;
if pause_for_attachment {
let sigstop = signal.getattr("SIGSTOP")?;
os.call_method("kill", (pid, sigstop), None)?;
}
let output_config = if let Some(path) = output_file {
OutputConfig::builder()
.output_file(path)
.format_str(output_format)?
.store_in_memory(store_in_memory)
.quiet(quiet)
.build()
} else {
OutputConfig::builder()
.format_str(output_format)?
.store_in_memory(store_in_memory)
.quiet(quiet)
.build()
};
let mut inner = ProcessMonitor::from_pid_with_options(
pid as usize,
Duration::from_millis(base_interval_ms),
Duration::from_millis(max_interval_ms),
since_process_start,
)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("IO Error: {e}")))?;
inner.set_include_children(include_children);
let monitor = PyProcessMonitor {
inner,
samples: Vec::new(),
output_config,
metadata_written: false,
};
if pause_for_attachment {
let sigcont = signal.getattr("SIGCONT")?;
os.call_method("kill", (pid, sigcont), None)?;
}
let exit_code = if let Some(timeout_secs) = timeout {
let timeout_dict = pyo3::types::PyDict::new_bound(py);
timeout_dict.set_item("timeout", timeout_secs)?;
match process.call_method("wait", (), Some(&timeout_dict)) {
Ok(code) => code.extract::<i32>()?,
Err(_e) => {
let _ = process.call_method("kill", (), None);
return Err(pyo3::exceptions::PyTimeoutError::new_err(format!(
"Process timed out after {timeout_secs}s"
)));
}
}
} else {
process.call_method("wait", (), None)?.extract::<i32>()?
};
Ok((exit_code, monitor))
}
fn run(&mut self) -> PyResult<()> {
use std::fs::OpenOptions;
use std::io::Write;
use std::thread::sleep;
let mut file_handle = if let Some(path) = &self.output_config.output_file {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.map_err(map_io_error)?;
Some(file)
} else {
None
};
while self.inner.is_running() {
let json = if self.inner.get_include_children() {
let tree_metrics = self.inner.sample_tree_metrics();
serde_json::to_string(&tree_metrics)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?
} else {
match self.inner.sample_metrics() {
Some(metrics) => serde_json::to_string(&metrics)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?,
None => {
sleep(self.inner.adaptive_interval());
continue;
}
}
};
if self.output_config.store_in_memory {
self.samples.push(json.clone());
}
if let Some(file) = &mut file_handle {
writeln!(file, "{json}").map_err(map_io_error)?;
} else if !self.output_config.quiet {
println!("{json}");
}
sleep(self.inner.adaptive_interval());
}
Ok(())
}
fn sample_once(&mut self) -> PyResult<Option<String>> {
use std::fs::OpenOptions;
use std::io::Write;
if !self.inner.is_running() {
return Ok(None);
}
let metrics_json = if self.inner.get_include_children() {
let tree_metrics = self.inner.sample_tree_metrics();
serde_json::to_string(&tree_metrics)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?
} else {
match self.inner.sample_metrics() {
Some(metrics) => serde_json::to_string(&metrics)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?,
None => return Ok(None),
}
};
if self.output_config.store_in_memory {
self.samples.push(metrics_json.clone());
}
if let Some(path) = &self.output_config.output_file {
if self.output_config.write_metadata && !self.metadata_written {
if let Some(metadata) = self.inner.get_metadata() {
let metadata_json = serde_json::to_string(&metadata)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.map_err(map_io_error)?;
writeln!(file, "{metadata_json}").map_err(map_io_error)?;
self.metadata_written = true;
}
}
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(map_io_error)?;
writeln!(file, "{metrics_json}").map_err(map_io_error)?;
}
Ok(Some(metrics_json))
}
fn is_running(&mut self) -> PyResult<bool> {
Ok(self.inner.is_running())
}
fn get_pid(&self) -> PyResult<usize> {
Ok(self.inner.get_pid())
}
fn get_metadata(&mut self) -> PyResult<Option<String>> {
Ok(self
.inner
.get_metadata()
.and_then(|metadata| serde_json::to_string(&metadata).ok()))
}
fn get_samples(&self) -> Vec<String> {
self.samples.clone()
}
fn clear_samples(&mut self) {
self.samples.clear();
}
fn save_samples(&self, path: String, format: Option<String>) -> PyResult<()> {
use std::fs::File;
use std::io::Write;
let output_format: OutputFormat = format
.unwrap_or_else(|| "jsonl".to_string())
.parse()
.map_err(|e: DenetError| pyo3::exceptions::PyValueError::new_err(e.to_string()))?;
let mut file = File::create(&path).map_err(map_io_error)?;
match output_format {
OutputFormat::Json => {
let json_array = format!("[{}]", self.samples.join(","));
file.write_all(json_array.as_bytes())
.map_err(map_io_error)?;
}
OutputFormat::JsonLines => {
for json in &self.samples {
writeln!(file, "{json}").map_err(map_io_error)?;
}
}
}
Ok(())
}
fn get_summary(&mut self) -> PyResult<String> {
if self.samples.is_empty() {
return serde_json::to_string(&Summary::new())
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()));
}
let elapsed = if self.samples.len() > 1 {
let first: serde_json::Value = serde_json::from_str(&self.samples[0])
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
let last: serde_json::Value =
serde_json::from_str(&self.samples[self.samples.len() - 1])
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
let first_ts = first.get("ts_ms").and_then(|v| v.as_u64()).unwrap_or(0);
let last_ts = last.get("ts_ms").and_then(|v| v.as_u64()).unwrap_or(0);
(last_ts as f64 - first_ts as f64) / 1000.0
} else {
0.0
};
let result = generate_summary_from_metrics_json(self.samples.clone(), elapsed)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(result)
}
#[cfg(feature = "gpu")]
fn is_gpu_enabled(&self) -> PyResult<bool> {
Ok(self.inner.is_gpu_enabled())
}
#[cfg(feature = "gpu")]
fn gpu_device_count(&self) -> PyResult<u32> {
Ok(self.inner.gpu_device_count())
}
#[cfg(feature = "gpu")]
fn get_gpu_summary(&self) -> PyResult<String> {
let summary = self.inner.get_gpu_summary();
serde_json::to_string(&summary)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))
}
#[cfg(not(feature = "gpu"))]
fn is_gpu_enabled(&self) -> PyResult<bool> {
Ok(false)
}
#[cfg(not(feature = "gpu"))]
fn gpu_device_count(&self) -> PyResult<u32> {
Ok(0)
}
#[cfg(not(feature = "gpu"))]
fn get_gpu_summary(&self) -> PyResult<String> {
let empty_summary = serde_json::json!({
"enabled": false,
"device_count": 0,
"total_memory_gb": 0.0,
"total_memory_used_gb": 0.0,
"max_gpu_utilization": 0,
"max_memory_utilization": 0
});
Ok(serde_json::to_string(&empty_summary)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?)
}
}
#[pyfunction]
fn generate_summary_from_file(path: String) -> PyResult<String> {
match SummaryGenerator::from_json_file(&path) {
Ok(summary) => Ok(serde_json::to_string(&summary)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?),
Err(e) => Err(pyo3::exceptions::PyIOError::new_err(e.to_string())),
}
}
#[pyfunction]
fn generate_summary_from_metrics_json(
metrics_json: Vec<String>,
elapsed_time: f64,
) -> PyResult<String> {
match SummaryGenerator::from_json_strings(&metrics_json, elapsed_time) {
Ok(summary) => Ok(serde_json::to_string(&summary)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?),
Err(e) => Err(pyo3::exceptions::PyRuntimeError::new_err(e.to_string())),
}
}
pub fn register_python_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyProcessMonitor>()?;
m.add_function(wrap_pyfunction!(generate_summary_from_file, m)?)?;
m.add_function(wrap_pyfunction!(generate_summary_from_metrics_json, m)?)?;
Ok(())
}