import json
import os
import pytest
import subprocess
import sys
from denet import execute_with_monitoring
@pytest.fixture
def temp_output_file(tmp_path):
return str(tmp_path / "test_metrics.jsonl")
@pytest.fixture
def temp_stdout_file(tmp_path):
return str(tmp_path / "stdout.txt")
@pytest.fixture
def temp_stderr_file(tmp_path):
return str(tmp_path / "stderr.txt")
class TestExecuteWithMonitoring:
def test_basic_execution(self, temp_output_file):
cmd = [sys.executable, "-c", "import time; print('hello'); time.sleep(0.5)"]
exit_code, monitor = execute_with_monitoring(
cmd, output_file=temp_output_file, base_interval_ms=50, max_interval_ms=100
)
assert exit_code == 0
assert monitor is not None
assert os.path.exists(temp_output_file)
samples = monitor.get_samples()
assert len(samples) > 0
assert "cpu_usage" in samples[0]
assert "mem_rss_kb" in samples[0]
def test_string_command(self, temp_output_file):
cmd = f"{sys.executable} -c print(123)"
exit_code, monitor = execute_with_monitoring(cmd, output_file=temp_output_file, quiet=True)
assert exit_code == 0
assert monitor is not None
def test_stdout_stderr_redirection(self, temp_output_file, temp_stdout_file, temp_stderr_file):
cmd = [sys.executable, "-c", "import sys; print('stdout test'); print('stderr test', file=sys.stderr)"]
exit_code, _ = execute_with_monitoring(
cmd, stdout_file=temp_stdout_file, stderr_file=temp_stderr_file, output_file=temp_output_file
)
assert exit_code == 0
with open(temp_stdout_file, "r") as f:
stdout_content = f.read()
assert "stdout test" in stdout_content
with open(temp_stderr_file, "r") as f:
stderr_content = f.read()
assert "stderr test" in stderr_content
def test_timeout(self, temp_output_file):
cmd = [sys.executable, "-c", "import time; time.sleep(10)"]
with pytest.raises(subprocess.TimeoutExpired):
execute_with_monitoring(
cmd,
timeout=0.1, output_file=temp_output_file,
)
def test_without_pausing(self, temp_output_file):
cmd = [sys.executable, "-c", "import time; print('no pause'); time.sleep(0.2)"]
exit_code, monitor = execute_with_monitoring(cmd, output_file=temp_output_file, pause_for_attachment=False)
assert exit_code == 0
assert monitor is not None
samples = monitor.get_samples()
assert len(samples) > 0
def test_since_process_start(self, temp_output_file):
cmd = [sys.executable, "-c", "import time; time.sleep(0.3)"]
exit_code, monitor = execute_with_monitoring(cmd, output_file=temp_output_file, since_process_start=True)
assert exit_code == 0
assert monitor is not None
samples = monitor.get_samples()
assert len(samples) > 0
summary = monitor.get_summary()
assert summary is not None
def test_output_formats(self, tmp_path):
cmd = [sys.executable, "-c", "import time; time.sleep(0.2)"]
formats = ["jsonl", "json"]
for fmt in formats:
output_file = str(tmp_path / f"metrics.{fmt}")
exit_code, monitor = execute_with_monitoring(cmd, output_file=output_file, output_format=fmt)
assert exit_code == 0
assert os.path.exists(output_file)
with open(output_file, "r") as f:
content = f.read()
assert len(content) > 0
assert "{" in content
assert "}" in content
def test_without_children(self, temp_output_file):
cmd = [sys.executable, "-c", "import subprocess, time; subprocess.Popen(['sleep', '0.1']); time.sleep(0.2)"]
exit_code, monitor = execute_with_monitoring(cmd, output_file=temp_output_file, include_children=False)
assert exit_code == 0
samples = monitor.get_samples()
assert len(samples) > 0
summary = monitor.get_summary()
assert "child_processes" not in summary or not summary["child_processes"]
def test_file_handle_cleanup(self, temp_stdout_file, temp_stderr_file):
cmd = [sys.executable, "-c", "print('test')"]
exit_code, monitor = execute_with_monitoring(
cmd, stdout_file=temp_stdout_file, stderr_file=temp_stderr_file, quiet=True
)
assert exit_code == 0
with open(temp_stdout_file, "r") as f:
content = f.read()
assert "test" in content
def test_monitoring_exception_handling(self, temp_output_file):
import unittest.mock
cmd = [sys.executable, "-c", "import time; time.sleep(0.3)"]
with unittest.mock.patch("denet.ProcessMonitor.from_pid") as mock_from_pid:
mock_monitor = unittest.mock.MagicMock()
mock_monitor.is_running.return_value = True
mock_monitor.sample_once.side_effect = RuntimeError("Simulated monitoring error")
mock_monitor.get_samples.return_value = []
mock_monitor.get_summary.return_value = {}
mock_from_pid.return_value = mock_monitor
exit_code, monitor = execute_with_monitoring(cmd, output_file=temp_output_file)
assert exit_code == 0
assert mock_monitor.sample_once.called
def test_timeout_with_process_cleanup(self, temp_output_file):
import unittest.mock
cmd = [sys.executable, "-c", "import time; time.sleep(10)"]
original_killpg = os.killpg
call_count = 0
def mock_killpg(pgid, sig):
nonlocal call_count
call_count += 1
if call_count == 1: raise ProcessLookupError("Process group not found")
return original_killpg(pgid, sig)
with unittest.mock.patch("os.killpg", side_effect=mock_killpg):
with pytest.raises(subprocess.TimeoutExpired):
execute_with_monitoring(
cmd,
timeout=0.1,
output_file=temp_output_file,
)
assert call_count >= 1
def test_failed_command(self, temp_output_file):
cmd = [sys.executable, "-c", "import sys; sys.exit(42)"]
exit_code, monitor = execute_with_monitoring(cmd, output_file=temp_output_file)
assert exit_code == 42
assert monitor is not None
def test_invalid_command(self, temp_output_file):
cmd = ["nonexistent_command_that_should_fail"]
with pytest.raises((FileNotFoundError, OSError)):
execute_with_monitoring(cmd, output_file=temp_output_file)
def test_memory_only_monitoring(self):
cmd = [sys.executable, "-c", "import time; print('memory test'); time.sleep(0.2)"]
exit_code, monitor = execute_with_monitoring(
cmd,
store_in_memory=True,
output_file=None, )
assert exit_code == 0
samples = monitor.get_samples()
assert len(samples) > 0
assert any("cpu_usage" in sample for sample in samples)
def test_write_metadata_enabled(self, temp_output_file):
cmd = ["sleep", "1"]
exit_code, monitor = execute_with_monitoring(
cmd, output_file=temp_output_file, write_metadata=True, base_interval_ms=200
)
assert exit_code == 0
with open(temp_output_file, "r") as f:
lines = f.readlines()
assert len(lines) >= 2
first_line = lines[0].strip()
metadata = json.loads(first_line)
assert "pid" in metadata
assert "cmd" in metadata
assert "executable" in metadata
assert "t0_ms" in metadata
assert isinstance(metadata["pid"], int)
assert isinstance(metadata["cmd"], list)
assert isinstance(metadata["executable"], str)
assert isinstance(metadata["t0_ms"], int)
if len(lines) > 1:
second_line = lines[1].strip()
metrics = json.loads(second_line)
assert "ts_ms" in metrics
assert any(key in metrics for key in ["parent", "cpu_usage", "mem_rss_kb"])
def test_write_metadata_disabled(self, temp_output_file):
cmd = ["sleep", "1"]
exit_code, monitor = execute_with_monitoring(
cmd,
output_file=temp_output_file,
write_metadata=False, base_interval_ms=200,
)
assert exit_code == 0
with open(temp_output_file, "r") as f:
lines = f.readlines()
assert len(lines) >= 1
first_line = lines[0].strip()
first_data = json.loads(first_line)
metrics_keys = ["ts_ms", "parent", "cpu_usage", "mem_rss_kb"]
has_metrics_keys = any(key in first_data for key in metrics_keys)
assert has_metrics_keys
metadata_keys = ["pid", "cmd", "executable", "t0_ms"]
has_metadata_keys = all(key in first_data for key in metadata_keys)
assert not has_metadata_keys
def test_write_metadata_default_false(self, temp_output_file):
cmd = ["sleep", "1"]
exit_code, monitor = execute_with_monitoring(
cmd,
output_file=temp_output_file,
base_interval_ms=200,
)
assert exit_code == 0
with open(temp_output_file, "r") as f:
lines = f.readlines()
if len(lines) >= 1:
first_line = lines[0].strip()
first_data = json.loads(first_line)
metrics_keys = ["ts_ms", "parent", "cpu_usage", "mem_rss_kb"]
has_metrics_keys = any(key in first_data for key in metrics_keys)
assert has_metrics_keys
def test_write_metadata_no_output_file(self):
cmd = ["sleep", "1"]
exit_code, monitor = execute_with_monitoring(
cmd,
store_in_memory=True,
output_file=None, write_metadata=True, base_interval_ms=200,
)
assert exit_code == 0
samples = monitor.get_samples()
assert len(samples) > 0
first_sample = json.loads(samples[0])
metrics_keys = ["ts_ms", "parent", "cpu_usage", "mem_rss_kb"]
has_metrics_keys = any(key in first_sample for key in metrics_keys)
assert has_metrics_keys