import json
import time
import pytest
import denet
from tests.python.test_helpers import (
extract_metrics_from_sample,
is_valid_metrics_sample,
assert_valid_metrics,
)
class TestLegacyProcessMonitorCompatibility:
def test_create_monitor_legacy_style(self):
monitor = denet.ProcessMonitor(["echo", "hello"], 100, 1000)
assert monitor is not None
def test_invalid_command_legacy(self):
with pytest.raises(Exception):
denet.ProcessMonitor([], 100, 1000)
with pytest.raises(Exception):
denet.ProcessMonitor(["non_existent_command_123456"], 100, 1000)
def test_run_short_process_legacy(self):
cmd = ["python", "-c", "import time; print('TEST OUTPUT'); time.sleep(0.5)"]
monitor = denet.ProcessMonitor(cmd, 100, 1000)
monitor.run()
assert True
def test_long_running_process_legacy(self, tmp_path):
test_script_content = """
import time
import sys
for i in range(5):
sys.stdout.write(f"MARKER: {i}\\n")
sys.stdout.flush()
time.sleep(0.2)
"""
test_script = tmp_path / "test_script.py"
with open(test_script, "w") as f:
f.write(test_script_content)
import threading
monitor = denet.ProcessMonitor(["python", str(test_script)], 100, 1000, store_in_memory=True)
time.sleep(0.3)
sample_json = monitor.sample_once()
thread = threading.Thread(target=monitor.run)
thread.daemon = True thread.start()
thread.join(timeout=5)
assert not thread.is_alive(), "Monitor should have completed"
assert sample_json is not None, "Should have sample data"
if sample_json:
data = json.loads(sample_json)
if all(key in data for key in ["pid", "cmd", "executable", "t0_ms"]):
return
if is_valid_metrics_sample(data):
metrics = extract_metrics_from_sample(data)
assert_valid_metrics(metrics)
now_ms = int(time.time() * 1000)
assert abs(now_ms - metrics["ts_ms"]) < 60000, "Timestamp should be recent"
assert metrics["mem_rss_kb"] > 0, "RSS memory should be positive"
def test_timestamp_functionality_legacy(self):
monitor = denet.ProcessMonitor(["sleep", "2"], 100, 1000, store_in_memory=True)
samples = []
for _ in range(3):
result = monitor.sample_once()
if result:
data = json.loads(result)
if is_valid_metrics_sample(data):
metrics = extract_metrics_from_sample(data)
samples.append(metrics)
time.sleep(0.1)
assert len(samples) > 0, "Should collect at least one sample"
for sample in samples:
now_ms = int(time.time() * 1000)
assert abs(now_ms - sample["ts_ms"]) < 60000, "Timestamp should be recent"
if len(samples) >= 2:
for i in range(1, len(samples)):
assert samples[i]["ts_ms"] >= samples[i - 1]["ts_ms"], "Timestamps should be monotonic"
class TestLegacySummaryCompatibility:
def test_generate_summary_from_metrics_json_legacy(self):
metrics = []
metrics.append(
json.dumps(
{
"ts_ms": 1000,
"cpu_usage": 5.0,
"mem_rss_kb": 1024,
"mem_vms_kb": 2048,
"disk_read_bytes": 1000,
"disk_write_bytes": 2000,
"net_rx_bytes": 300,
"net_tx_bytes": 400,
"thread_count": 2,
"uptime_secs": 10,
}
)
)
metrics.append(
json.dumps(
{
"ts_ms": 2000,
"cpu_usage": 15.0,
"mem_rss_kb": 2048,
"mem_vms_kb": 4096,
"disk_read_bytes": 2500,
"disk_write_bytes": 3000,
"net_rx_bytes": 800,
"net_tx_bytes": 900,
"thread_count": 3,
"uptime_secs": 20,
}
)
)
elapsed_time = (2000 - 1000) / 1000.0
summary_json = denet.generate_summary_from_metrics_json(metrics, elapsed_time)
summary = json.loads(summary_json)
assert summary["total_time_secs"] == elapsed_time
assert summary["sample_count"] == 2
assert summary["max_processes"] == 1 assert summary["max_threads"] == 3 assert summary["total_disk_read_bytes"] == 2500 assert summary["total_disk_write_bytes"] == 3000 assert summary["total_sys_net_rx_bytes"] == 800 assert summary["total_sys_net_tx_bytes"] == 900 assert summary["peak_mem_rss_kb"] == 2048 assert summary["avg_cpu_usage"] == 10.0
def test_generate_summary_from_tree_metrics_json_legacy(self):
metrics = []
tree_metric1 = {
"ts_ms": 1000,
"parent": {
"ts_ms": 1000,
"cpu_usage": 2.5,
"mem_rss_kb": 1024,
"mem_vms_kb": 2048,
"disk_read_bytes": 500,
"disk_write_bytes": 1000,
"net_rx_bytes": 200,
"net_tx_bytes": 300,
"thread_count": 1,
"uptime_secs": 5,
},
"children": [],
"aggregated": {
"ts_ms": 1000,
"cpu_usage": 4.0,
"mem_rss_kb": 1536,
"mem_vms_kb": 3072,
"disk_read_bytes": 600,
"disk_write_bytes": 1200,
"net_rx_bytes": 250,
"net_tx_bytes": 360,
"thread_count": 2,
"process_count": 2,
"uptime_secs": 5,
},
}
metrics.append(json.dumps(tree_metric1))
tree_metric2 = {
"ts_ms": 2000,
"parent": {
"ts_ms": 2000,
"cpu_usage": 3.0,
"mem_rss_kb": 1536,
"mem_vms_kb": 3072,
"disk_read_bytes": 600,
"disk_write_bytes": 1200,
"net_rx_bytes": 250,
"net_tx_bytes": 360,
"thread_count": 1,
"uptime_secs": 6,
},
"children": [],
"aggregated": {
"ts_ms": 2000,
"cpu_usage": 6.0,
"mem_rss_kb": 2048,
"mem_vms_kb": 4096,
"disk_read_bytes": 800,
"disk_write_bytes": 1500,
"net_rx_bytes": 350,
"net_tx_bytes": 460,
"thread_count": 3,
"process_count": 3,
"uptime_secs": 6,
},
}
metrics.append(json.dumps(tree_metric2))
elapsed_time = 1.0 summary_json = denet.generate_summary_from_metrics_json(metrics, elapsed_time)
summary = json.loads(summary_json)
assert summary["sample_count"] == 2
assert summary["max_processes"] == 3 assert summary["max_threads"] == 3 assert summary["peak_mem_rss_kb"] == 2048 assert summary["total_disk_read_bytes"] == 800 assert summary["total_disk_write_bytes"] == 1500
def test_generate_summary_from_file_legacy(self, tmp_path):
temp_file = tmp_path / "test_metrics.json"
with open(temp_file, "w") as f:
f.write(
json.dumps(
{
"ts_ms": 1000,
"cpu_usage": 5.0,
"mem_rss_kb": 1024,
"mem_vms_kb": 2048,
"disk_read_bytes": 1000,
"disk_write_bytes": 2000,
"net_rx_bytes": 300,
"net_tx_bytes": 400,
"thread_count": 2,
"uptime_secs": 10,
}
)
+ "\n"
)
f.write(
json.dumps(
{
"ts_ms": 2000,
"cpu_usage": 15.0,
"mem_rss_kb": 2048,
"mem_vms_kb": 4096,
"disk_read_bytes": 2500,
"disk_write_bytes": 3000,
"net_rx_bytes": 800,
"net_tx_bytes": 900,
"thread_count": 3,
"uptime_secs": 20,
}
)
+ "\n"
)
summary_json = denet.generate_summary_from_file(str(temp_file))
summary = json.loads(summary_json)
assert summary["sample_count"] == 2
assert summary["total_time_secs"] == 1.0 assert summary["max_threads"] == 3
assert summary["peak_mem_rss_kb"] == 2048
assert summary["avg_cpu_usage"] == 10.0
class TestBackwardsCompatibilityAPI:
def test_old_constructor_signature(self):
monitor = denet.ProcessMonitor(["echo", "test"], 100, 1000)
assert monitor is not None
monitor = denet.ProcessMonitor(cmd=["echo", "test"], base_interval_ms=100, max_interval_ms=1000)
assert monitor is not None
def test_old_method_names_work(self):
monitor = denet.ProcessMonitor(["sleep", "0.1"], 100, 1000, store_in_memory=True)
sample = monitor.sample_once()
assert sample is not None
pid = monitor.get_pid()
assert isinstance(pid, int)
is_running = monitor.is_running()
assert isinstance(is_running, bool)
samples = monitor.get_samples()
assert isinstance(samples, list)
summary = monitor.get_summary()
assert isinstance(summary, str)
def test_legacy_file_operations(self, tmp_path):
monitor = denet.ProcessMonitor(["sleep", "0.1"], 100, 1000, store_in_memory=True)
monitor.sample_once()
formats = ["jsonl", "json"]
for fmt in formats:
temp_file = tmp_path / f"test_output.{fmt}"
monitor.save_samples(str(temp_file), fmt)
assert temp_file.exists()
with open(temp_file, "r") as f:
content = f.read()
assert len(content.strip()) > 0