import json
import time
import multiprocessing as mp
import denet
def cpu_intensive_task(duration=2):
end_time = time.time() + duration
while time.time() < end_time:
_ = sum(i * i for i in range(10000))
def analyze_cpu_core_usage(samples):
core_usage = {}
core_switches = 0
last_core = None
for sample_str in samples:
sample = json.loads(sample_str)
if 'cpu_core' in sample and sample['cpu_core'] is not None:
core = sample['cpu_core']
if core not in core_usage:
core_usage[core] = 0
core_usage[core] += 1
if last_core is not None and last_core != core:
core_switches += 1
last_core = core
return core_usage, core_switches
def test_single_process_core_tracking():
print("Testing CPU core tracking for single process...")
monitor = denet.ProcessMonitor(
cmd=["python", "-c", "import time; start=time.time(); [sum(i*i for i in range(10000)) for _ in range(100000) if time.time()-start < 2]"],
base_interval_ms=50, max_interval_ms=100,
store_in_memory=True
)
monitor.run()
samples = monitor.get_samples()
core_usage, core_switches = analyze_cpu_core_usage(samples)
print(f" Total samples: {len(samples)}")
print(f" CPU cores used: {sorted(core_usage.keys())}")
print(f" Samples per core: {dict(sorted(core_usage.items()))}")
print(f" Core switches detected: {core_switches}")
return samples
def test_multiprocess_core_distribution():
print("\nTesting CPU core distribution for multiple processes...")
test_script = """
import multiprocessing as mp
import time
def cpu_worker(worker_id):
print(f"Worker {worker_id} started on PID {mp.current_process().pid}")
end = time.time() + 1.5
while time.time() < end:
_ = sum(i * i for i in range(10000))
if __name__ == "__main__":
processes = []
for i in range(4): # Create 4 worker processes
p = mp.Process(target=cpu_worker, args=(i,))
p.start()
processes.append(p)
for p in processes:
p.join()
"""
import tempfile
import os
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(test_script)
script_path = f.name
try:
monitor = denet.ProcessMonitor(
cmd=["python", script_path],
base_interval_ms=50,
max_interval_ms=100,
store_in_memory=True
)
monitor.run()
samples = monitor.get_samples()
parent_cores = {}
child_cores = {}
child_core_distribution = {}
for sample_str in samples:
sample = json.loads(sample_str)
if 'cpu_core' in sample and sample['cpu_core'] is not None:
core = sample['cpu_core']
if core not in parent_cores:
parent_cores[core] = 0
parent_cores[core] += 1
if 'children' in sample:
for child in sample['children']:
child_pid = child['pid']
if 'cpu_core' in child['metrics'] and child['metrics']['cpu_core'] is not None:
core = child['metrics']['cpu_core']
if child_pid not in child_cores:
child_cores[child_pid] = {}
if core not in child_cores[child_pid]:
child_cores[child_pid][core] = 0
child_cores[child_pid][core] += 1
if core not in child_core_distribution:
child_core_distribution[core] = set()
child_core_distribution[core].add(child_pid)
print(f" Total samples: {len(samples)}")
print(f" Parent process cores used: {sorted(parent_cores.keys())}")
print("\n Child process core distribution:")
for pid, cores in sorted(child_cores.items()):
primary_core = max(cores.items(), key=lambda x: x[1])[0]
print(f" PID {pid}: primarily on core {primary_core} ({cores})")
print("\n Cores and their processes:")
for core, pids in sorted(child_core_distribution.items()):
print(f" Core {core}: PIDs {sorted(pids)}")
finally:
os.unlink(script_path)
def demonstrate_core_affinity():
print("\nDemonstrating CPU affinity tracking...")
import platform
import shutil
if platform.system() != 'Linux':
print(" CPU core tracking is only available on Linux")
return
if not shutil.which('taskset'):
print(" 'taskset' command not found. Skipping affinity demonstration.")
return
monitor = denet.ProcessMonitor(
cmd=["taskset", "-c", "0", "python", "-c",
"import time; start=time.time(); [sum(i*i for i in range(10000)) for _ in range(50000) if time.time()-start < 1]"],
base_interval_ms=50,
max_interval_ms=100,
store_in_memory=True
)
monitor.run()
samples = monitor.get_samples()
core_usage, core_switches = analyze_cpu_core_usage(samples)
print(f" Process pinned to core 0")
print(f" Cores used: {sorted(core_usage.keys())}")
print(f" Core switches: {core_switches}")
if core_usage and max(core_usage.keys()) == 0 and len(core_usage) == 1:
print(" ✓ Process stayed on core 0 as expected")
else:
print(" ⚠ Process may have moved between cores despite affinity setting")
def main():
print("CPU Core Tracking Demonstration")
print("=" * 50)
import platform
if platform.system() != 'Linux':
print("Note: CPU core tracking is currently only supported on Linux.")
print("On other platforms, the cpu_core field will be None.")
return
test_single_process_core_tracking()
test_multiprocess_core_distribution()
demonstrate_core_affinity()
print("\nCPU core tracking demonstration complete!")
print("\nKey insights:")
print("- Processes may switch between cores due to OS scheduling")
print("- Multiple processes tend to be distributed across available cores")
print("- CPU affinity can be used to pin processes to specific cores")
print("- Core tracking helps identify scheduling patterns and potential bottlenecks")
if __name__ == "__main__":
main()