import argparse, socket, time, struct, sys, threading
def main():
p = argparse.ArgumentParser()
p.add_argument("--role", required=True, choices=["sender", "receiver"])
p.add_argument("--rate-mbps", type=float, default=0, help="0 = full blast")
p.add_argument("--duration", type=float, default=10)
p.add_argument("--chunk-size", type=int, default=1024)
p.add_argument("--port", type=int, default=8443)
args = p.parse_args()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.connect(("127.0.0.1", args.port))
sock.settimeout(30)
print(f"[{args.role}] connected to 127.0.0.1:{args.port}")
settle = 3
print(f"[{args.role}] waiting {settle}s for both sides to connect...")
time.sleep(settle)
sock.settimeout(2)
buf = b""
for attempt in range(15):
sock.sendall(b"READY\n")
if attempt == 0:
print(f"[{args.role}] sent READY, waiting for peer...")
try:
while True:
chunk = sock.recv(4096)
if not chunk:
print(f"[{args.role}] connection closed during handshake")
return
buf += chunk
if b"READY\n" in buf:
break
except socket.timeout:
pass
if b"READY\n" in buf:
break
else:
print(f"[{args.role}] TIMEOUT waiting for peer READY after 15 attempts")
return
sock.settimeout(30)
print(f"[{args.role}] peer READY received - starting benchmark")
leftover = buf.split(b"READY\n", 1)[1]
total_sent = 0
total_recv = 0
recv_count = 0
chunks_sent = 0
payload = bytes(range(256)) * (args.chunk_size // 256 + 1)
payload = payload[:args.chunk_size]
if args.rate_mbps > 0:
bytes_per_sec = args.rate_mbps * 1_000_000 / 8
delay = args.chunk_size / bytes_per_sec
else:
delay = 0
done = threading.Event()
errors = []
def recv_thread():
nonlocal total_recv, recv_count, leftover
sock_r = sock if leftover:
total_recv += len(leftover)
recv_count += 1
while not done.is_set():
try:
sock_r.settimeout(1.0)
data = sock_r.recv(65536)
if not data:
break
total_recv += len(data)
recv_count += 1
except socket.timeout:
continue
except Exception as e:
errors.append(str(e))
break
rt = threading.Thread(target=recv_thread, daemon=True)
rt.start()
start = time.monotonic()
try:
while time.monotonic() - start < args.duration:
sock.sendall(payload)
total_sent += len(payload)
chunks_sent += 1
if delay > 0:
time.sleep(delay)
except Exception as e:
errors.append(f"send error: {e}")
elapsed_send = time.monotonic() - start
print(f"[{args.role}] send done: {total_sent:,} bytes ({chunks_sent} chunks) in {elapsed_send:.1f}s")
time.sleep(2)
done.set()
rt.join(timeout=3)
total_elapsed = time.monotonic() - start
send_mbps = (total_sent * 8 / elapsed_send / 1_000_000) if elapsed_send > 0 else 0
recv_mbps = (total_recv * 8 / total_elapsed / 1_000_000) if total_elapsed > 0 else 0
loss_pct = ((total_sent - total_recv) / total_sent * 100) if total_sent > 0 else 0
print(f"[{args.role}] RESULTS:")
print(f" TX: {total_sent:>12,} bytes ({send_mbps:.1f} Mbps)")
print(f" RX: {total_recv:>12,} bytes ({recv_mbps:.1f} Mbps, {recv_count} recvs)")
if errors:
print(f" ERRORS: {errors}")
print(f"SUMMARY|{args.role}|tx={total_sent}|rx={total_recv}|send_mbps={send_mbps:.1f}|recv_mbps={recv_mbps:.1f}|chunks={chunks_sent}|duration={total_elapsed:.1f}")
sock.close()
if __name__ == "__main__":
main()