import os
import subprocess
import time
import json
import glob
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
print_lock = threading.Lock()
def get_file_size(filepath):
return os.path.getsize(filepath)
def run_compression(input_file, converter_path):
filename = os.path.basename(input_file)
with print_lock:
print(f"Processing {filename}...")
start_time = time.time()
cmd = [converter_path, "-c", "-y", "-f", input_file]
result = subprocess.run(cmd, capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
if result.returncode != 0:
with print_lock:
print(f"Error processing {filename}: {result.stderr}")
return None
with print_lock:
print(f"Completed {filename} in {execution_time:.3f} seconds")
return execution_time
def process_file(input_file, converter_path):
filename = os.path.basename(input_file)
original_size = get_file_size(input_file)
execution_time = run_compression(input_file, converter_path)
if execution_time is None:
return None
base_name = os.path.splitext(filename)[0]
output_file = f"{base_name}_encoded.mcap"
if os.path.exists(output_file):
compressed_size = get_file_size(output_file)
compression_ratio = compressed_size / original_size
else:
with print_lock:
print(f"Warning: Expected output file {output_file} not found")
compressed_size = None
compression_ratio = None
result_entry = {
"filename": filename,
"original_size_mb": round(original_size / (1024 * 1024), 2),
"compressed_size_mb": round(compressed_size / (1024 * 1024), 2) if compressed_size else None,
"execution_time_seconds": round(execution_time, 3),
"compression_ratio": round(compression_ratio, 4) if compression_ratio else None
}
return result_entry
def main():
data_dir = "DATA"
converter_path = "./build_release/tools/cloudini_rosbag_converter"
results_file = os.path.join(data_dir, "compression_results.json")
if not os.path.exists(converter_path):
print(f"Error: {converter_path} not found. Please build the project first.")
return
mcap_files = glob.glob(os.path.join(data_dir, "*.mcap"))
original_files = [f for f in mcap_files if not f.endswith("_encoded.mcap")]
if not original_files:
print(f"No .mcap files found in {data_dir}")
return
print(f"Found {len(original_files)} files to process")
print(f"Using ThreadPoolExecutor for parallel processing...")
results = []
start_total_time = time.time()
max_workers = min(len(original_files), os.cpu_count() or 4)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(process_file, input_file, converter_path): input_file
for input_file in sorted(original_files)
}
for future in as_completed(future_to_file):
input_file = future_to_file[future]
filename = os.path.basename(input_file)
try:
result_entry = future.result()
if result_entry is not None:
results.append(result_entry)
print(f"\n{'='*50}")
print(f"Completed: {filename}")
print(f"{'='*50}")
print(f"Original size: {result_entry['original_size_mb']} MB")
print(f"Compressed size: {result_entry['compressed_size_mb']} MB")
print(f"Compression ratio: {result_entry['compression_ratio']}")
print(f"Execution time: {result_entry['execution_time_seconds']} seconds")
else:
print(f"Failed to process: {filename}")
except Exception as exc:
print(f"File {filename} generated an exception: {exc}")
end_total_time = time.time()
total_processing_time = end_total_time - start_total_time
with open(results_file, 'w') as f:
json.dump({
"benchmark_info": {
"tool": "cloudini_rosbag_converter",
"compression_method": "zstd",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"total_files_processed": len(results),
"max_workers": max_workers,
"total_wall_time_seconds": round(total_processing_time, 3)
},
"results": results
}, f, indent=2)
print(f"\n{'='*50}")
print(f"Results saved to: {results_file}")
print(f"{'='*50}")
if results:
avg_compression = sum(r["compression_ratio"] for r in results if r["compression_ratio"]) / len([r for r in results if r["compression_ratio"]])
total_cpu_time = sum(r["execution_time_seconds"] for r in results)
print(f"\nSUMMARY STATISTICS:")
print(f"Files processed: {len(results)}")
print(f"Max workers used: {max_workers}")
print(f"Average compression ratio: {avg_compression:.4f}")
print(f"Total CPU time: {total_cpu_time:.3f} seconds")
print(f"Total wall time: {total_processing_time:.3f} seconds")
print(f"Speedup factor: {total_cpu_time/total_processing_time:.2f}x")
if __name__ == "__main__":
main()