import asyncio
import json
import re
import subprocess
import sys
import time
import atexit
import signal
from pathlib import Path
from typing import Optional, Dict, Any, Callable, List, Tuple
from dataclasses import dataclass
try:
import websockets
WEBSOCKETS_AVAILABLE = True
except ImportError:
WEBSOCKETS_AVAILABLE = False
PROXY_HOST = '127.0.0.1'
PROXY_PORT = 15061
HTTP_PORT = 8082
RWI_WS_URL = f'ws://{PROXY_HOST}:{HTTP_PORT}/rwi/v1'
TEST_TOKEN = 'test-token-rwi'
RUSTPBX_CONFIG = 'config.toml.dev'
RUSTPBX_STARTUP_TIMEOUT = 10
IVR_NUMBER = '1100'
_rustpbx_process: Optional[subprocess.Popen] = None
_rustpbx_output_file = None
@dataclass
class RtpStats:
rx_packets: int = 0
rx_bytes: int = 0
tx_packets: int = 0
tx_bytes: int = 0
@property
def has_rx(self) -> bool:
return self.rx_packets > 0
@property
def has_tx(self) -> bool:
return self.tx_packets > 0
@property
def is_bidirectional(self) -> bool:
return self.has_rx and self.has_tx
def __str__(self) -> str:
return f"RTP RX:{self.rx_packets}p/{self.rx_bytes}b TX:{self.tx_packets}p/{self.tx_bytes}b"
class RwiClient:
def __init__(self):
self.ws = None
self.message_id = 0
self.pending_responses: Dict[str, asyncio.Future] = {}
self.event_handlers: List[Callable[[dict], None]] = []
self.receive_task = None
self.connected = False
async def connect(self, token: str = TEST_TOKEN) -> bool:
try:
url = f"{RWI_WS_URL}?token={token}"
self.ws = await websockets.connect(url)
self.connected = True
self.receive_task = asyncio.create_task(self._receive_loop())
return True
except Exception as e:
print(f"Failed to connect to RWI: {e}")
return False
async def disconnect(self):
self.connected = False
if self.receive_task:
self.receive_task.cancel()
try:
await self.receive_task
except asyncio.CancelledError:
pass
if self.ws:
await self.ws.close()
async def _receive_loop(self):
try:
async for message in self.ws:
try:
data = json.loads(message)
await self._handle_message(data)
except json.JSONDecodeError:
print(f"Received non-JSON message: {message}")
except websockets.exceptions.ConnectionClosed:
print("RWI WebSocket connection closed")
except asyncio.CancelledError:
pass
async def _handle_message(self, data: dict):
action_id = data.get('action_id')
if action_id and action_id in self.pending_responses:
future = self.pending_responses.pop(action_id)
if not future.done():
future.set_result(data)
return
for handler in self.event_handlers:
try:
handler(data)
except Exception as e:
print(f"Event handler error: {e}")
def add_event_handler(self, handler: Callable[[dict], None]):
self.event_handlers.append(handler)
def remove_event_handler(self, handler: Callable[[dict], None]):
if handler in self.event_handlers:
self.event_handlers.remove(handler)
async def send_request(self, action: str, params: dict = None, timeout: float = 5.0) -> dict:
self.message_id += 1
action_id = f"py-{self.message_id}"
request = {
"rwi": "1.0",
"action_id": action_id,
"action": action,
"params": params or {}
}
future = asyncio.get_event_loop().create_future()
self.pending_responses[action_id] = future
try:
await self.ws.send(json.dumps(request))
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
self.pending_responses.pop(action_id, None)
raise TimeoutError(f"Request {action} timed out")
async def subscribe(self, contexts: List[str]) -> dict:
return await self.send_request("session.subscribe", {"contexts": contexts})
async def list_calls(self) -> dict:
return await self.send_request("session.list_calls")
async def originate(self, call_id: str, destination: str,
caller_id: str = None, context: str = "default",
timeout_secs: int = 30) -> dict:
params = {
"call_id": call_id,
"destination": destination,
"context": context,
"timeout_secs": timeout_secs
}
if caller_id:
params["caller_id"] = caller_id
return await self.send_request("call.originate", params)
async def answer(self, call_id: str) -> dict:
return await self.send_request("call.answer", {"call_id": call_id})
async def hangup(self, call_id: str, reason: str = None) -> dict:
params = {"call_id": call_id}
if reason:
params["reason"] = reason
return await self.send_request("call.hangup", params)
async def reject(self, call_id: str, reason: str = None) -> dict:
params = {"call_id": call_id}
if reason:
params["reason"] = reason
return await self.send_request("call.reject", params)
async def ring(self, call_id: str) -> dict:
return await self.send_request("call.ring", {"call_id": call_id})
async def bridge(self, leg_a: str, leg_b: str) -> dict:
return await self.send_request("call.bridge", {"leg_a": leg_a, "leg_b": leg_b})
async def unbridge(self, call_id: str) -> dict:
return await self.send_request("call.unbridge", {"call_id": call_id})
async def hold(self, call_id: str) -> dict:
return await self.send_request("call.hold", {"call_id": call_id})
async def unhold(self, call_id: str) -> dict:
return await self.send_request("call.unhold", {"call_id": call_id})
async def transfer(self, call_id: str, target: str, attended: bool = False) -> dict:
params = {"call_id": call_id, "target": target}
if attended:
params["attended"] = True
return await self.send_request("call.transfer", params)
async def media_play(self, call_id: str, source_type: str, uri: str,
loop: bool = False) -> dict:
return await self.send_request("media.play", {
"call_id": call_id,
"source": {"type": source_type, "uri": uri},
"loop": loop
})
async def media_stop(self, call_id: str) -> dict:
return await self.send_request("media.stop", {"call_id": call_id})
async def record_start(self, call_id: str, path: str, beep: bool = True) -> dict:
return await self.send_request("record.start", {
"call_id": call_id,
"storage": {"type": "file", "path": path},
"beep": beep
})
async def record_stop(self, call_id: str) -> dict:
return await self.send_request("record.stop", {"call_id": call_id})
async def queue_enqueue(self, call_id: str, queue_id: str, priority: int = None) -> dict:
params = {"call_id": call_id, "queue_id": queue_id}
if priority is not None:
params["priority"] = priority
return await self.send_request("queue.enqueue", params)
async def queue_dequeue(self, call_id: str) -> dict:
return await self.send_request("queue.dequeue", {"call_id": call_id})
async def queue_agent_login(self, agent_id: str, queue_id: str) -> dict:
return await self.send_request("queue.agent_login", {
"agent_id": agent_id,
"queue_id": queue_id
})
async def queue_agent_logout(self, agent_id: str) -> dict:
return await self.send_request("queue.agent_logout", {"agent_id": agent_id})
async def queue_agent_ready(self, agent_id: str, ready: bool = True) -> dict:
return await self.send_request("queue.agent_ready", {
"agent_id": agent_id,
"ready": ready
})
async def queue_status(self, queue_id: str) -> dict:
return await self.send_request("queue.status", {"queue_id": queue_id})
async def conference_create(self, conf_id: str, max_members: int = None) -> dict:
params = {"conference_id": conf_id}
if max_members:
params["max_members"] = max_members
return await self.send_request("conference.create", params)
async def conference_destroy(self, conf_id: str) -> dict:
return await self.send_request("conference.destroy", {"conference_id": conf_id})
async def conference_add(self, conf_id: str, call_id: str) -> dict:
return await self.send_request("conference.add", {
"conference_id": conf_id,
"call_id": call_id
})
async def conference_remove(self, conf_id: str, call_id: str) -> dict:
return await self.send_request("conference.remove", {
"conference_id": conf_id,
"call_id": call_id
})
async def conference_mute(self, conf_id: str, call_id: str) -> dict:
return await self.send_request("conference.mute", {
"conference_id": conf_id,
"call_id": call_id
})
async def conference_unmute(self, conf_id: str, call_id: str) -> dict:
return await self.send_request("conference.unmute", {
"conference_id": conf_id,
"call_id": call_id
})
async def get_call_info(self, call_id: str) -> dict:
return await self.send_request("call.info", {"call_id": call_id})
async def send_dtmf(self, call_id: str, digits: str, duration_ms: int = 100) -> dict:
return await self.send_request("call.send_dtmf", {
"call_id": call_id,
"digits": digits,
"duration_ms": duration_ms
})
class SipBotProcess:
def __init__(self, name: str):
self.name = name
self.process: Optional[subprocess.Popen] = None
self.output = []
self.output_lock = None self.reader_thread = None
self._rtp_stats: Optional[RtpStats] = None
def _init_lock(self):
import threading
self.output_lock = threading.Lock()
def start_callee(self, port: int, ring_secs: int = 2, answer_mode: str = "echo"):
self._init_lock()
cmd = [
'sipbot', 'wait',
'--username', 'bob',
'--password', '123456',
'--register', f'{PROXY_HOST}:{PROXY_PORT}',
'-a', f'{PROXY_HOST}:{port}',
'--codecs', 'pcmu',
'--ring-duration', str(ring_secs),
'-v'
]
if answer_mode == "echo":
cmd.extend(['--echo'])
elif answer_mode == "reject":
cmd.extend(['--reject', '486']) else:
cmd.extend(['--answer', answer_mode])
print(f"[{self.name}] Starting callee: {' '.join(cmd)}")
self._start(cmd)
def start_caller(self, target: str, hangup: int = 10, wait_time: int = None):
self._init_lock()
cmd = [
'sipbot', 'call',
'-t', target,
'--username', 'alice',
'--password', '123456',
'--codecs', 'pcmu',
'--hangup', str(hangup),
'-v'
]
if wait_time:
cmd.extend(['--wait', str(wait_time)])
print(f"[{self.name}] Starting caller: {' '.join(cmd)}")
self._start(cmd)
def _start(self, cmd: list):
import threading
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
self.reader_thread = threading.Thread(target=self._read_output, daemon=True)
self.reader_thread.start()
def _read_output(self):
try:
for line in self.process.stdout:
line = line.rstrip()
if line:
with self.output_lock:
self.output.append(line)
print(f"[{self.name}] {line}")
except Exception as e:
print(f"[{self.name}] Reader error: {e}")
def get_output(self) -> str:
with self.output_lock:
return '\n'.join(self.output)
def get_rtp_stats(self) -> RtpStats:
output = self.get_output()
stats = RtpStats()
rx_match = re.search(r'RX:\s*(\d+)p/(\d+)b', output)
tx_match = re.search(r'TX:\s*(\d+)p/(\d+)b', output)
if rx_match:
stats.rx_packets = int(rx_match.group(1))
stats.rx_bytes = int(rx_match.group(2))
if tx_match:
stats.tx_packets = int(tx_match.group(1))
stats.tx_bytes = int(tx_match.group(2))
return stats
def wait(self, timeout: int = 60) -> int:
if self.process:
try:
return self.process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
print(f"[{self.name}] Timeout, terminating...")
self.terminate()
return -1
return 0
def terminate(self):
if self.process and self.process.poll() is None:
print(f"[{self.name}] Terminating...")
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
print(f"[{self.name}] Force killing...")
self.process.kill()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
class TestResult:
def __init__(self, name: str):
self.name = name
self.passed = False
self.errors = []
self.start_time = time.time()
self.end_time = None
self.rtp_stats: Dict[str, RtpStats] = {}
def add_error(self, msg: str):
self.errors.append(msg)
def add_rtp_stats(self, name: str, stats: RtpStats):
self.rtp_stats[name] = stats
def finish(self, passed: bool):
self.passed = passed
self.end_time = time.time()
def duration(self) -> float:
if self.end_time:
return self.end_time - self.start_time
return time.time() - self.start_time
def print_summary(self):
status = "✅ PASS" if self.passed else "❌ FAIL"
print(f"\n{'='*60}")
print(f"{status} {self.name} ({self.duration():.1f}s)")
if self.errors:
print("Errors:")
for err in self.errors:
print(f" - {err}")
if self.rtp_stats:
print("RTP Statistics:")
for name, stats in self.rtp_stats.items():
print(f" {name}: {stats}")
print('='*60)
def cleanup_rustpbx():
global _rustpbx_process, _rustpbx_output_file
if _rustpbx_process and _rustpbx_process.poll() is None:
print("\nCleaning up rustpbx...")
try:
_rustpbx_process.terminate()
_rustpbx_process.wait(timeout=5)
except subprocess.TimeoutExpired:
_rustpbx_process.kill()
_rustpbx_process.wait()
def start_rustpbx() -> bool:
global _rustpbx_process, _rustpbx_output_file
rustpbx_bin = Path('target/debug/rustpbx')
if not rustpbx_bin.exists():
print(f"rustpbx binary not found: {rustpbx_bin}")
return False
log_dir = Path('tests/logs')
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / 'rustpbx_rwi_e2e.log'
try:
_rustpbx_output_file = open(log_file, 'w')
_rustpbx_process = subprocess.Popen(
[str(rustpbx_bin), '--conf', RUSTPBX_CONFIG],
stdout=_rustpbx_output_file,
stderr=subprocess.STDOUT,
preexec_fn=None if sys.platform == 'win32' else lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)
)
print(f"rustpbx started (PID: {_rustpbx_process.pid})")
time.sleep(RUSTPBX_STARTUP_TIMEOUT)
if _rustpbx_process.poll() is not None:
print("rustpbx exited unexpectedly")
return False
return True
except Exception as e:
print(f"Failed to start rustpbx: {e}")
return False
def verify_rtp_bidirectional(stats: RtpStats, name: str, min_packets: int = 50) -> List[str]:
errors = []
if not stats.has_rx:
errors.append(f"{name}: No RTP received (RX=0)")
elif stats.rx_packets < min_packets:
errors.append(f"{name}: Too few RTP packets received ({stats.rx_packets} < {min_packets})")
if not stats.has_tx:
errors.append(f"{name}: No RTP transmitted (TX=0)")
elif stats.tx_packets < min_packets:
errors.append(f"{name}: Too few RTP packets transmitted ({stats.tx_packets} < {min_packets})")
return errors
def verify_rtp_rx_only(stats: RtpStats, name: str, min_packets: int = 50) -> List[str]:
errors = []
if not stats.has_rx:
errors.append(f"{name}: No RTP received (RX=0)")
elif stats.rx_packets < min_packets:
errors.append(f"{name}: Too few RTP packets received ({stats.rx_packets} < {min_packets})")
if stats.tx_packets > 10:
errors.append(f"{name}: Unexpected TX packets while on hold ({stats.tx_packets})")
return errors
async def wait_for_event(events: List[dict], event_type: str, timeout: float = 5.0) -> Optional[dict]:
start = time.time()
while time.time() - start < timeout:
for evt in events:
if evt.get('event') == event_type:
return evt
await asyncio.sleep(0.1)
return None
async def test_list_calls(rwi: RwiClient) -> TestResult:
result = TestResult("List Calls")
try:
resp = await rwi.list_calls()
if resp.get('status') == 'success':
result.finish(True)
else:
result.add_error(f"List calls failed: {resp.get('error')}")
result.finish(False)
except Exception as e:
result.add_error(f"Exception: {e}")
result.finish(False)
return result
async def test_originate_and_answer() -> TestResult:
result = TestResult("Originate & Answer with RTP")
with SipBotProcess("Bob") as bob:
bob.start_callee(port=5070, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
events = []
def on_event(evt):
events.append(evt)
rwi.add_event_handler(on_event)
call_id = f"test-call-{int(time.time())}"
destination = f"sip:bob@{PROXY_HOST}:5070"
print(f"Originating call {call_id} to {destination}")
orig_resp = await rwi.originate(
call_id=call_id,
destination=destination,
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
if orig_resp.get('status') != 'success':
result.add_error(f"Originate failed: {orig_resp.get('error')}")
result.finish(False)
return result
await asyncio.sleep(3)
event_types = []
for e in events:
event_types.extend(e.keys())
if 'call_ringing' not in event_types:
result.add_error(f"Expected call_ringing event not received. Got: {event_types}")
if 'call_answered' not in event_types:
result.add_error(f"Expected call_answered event not received. Got: {event_types}")
bob_stats = bob.get_rtp_stats()
result.add_rtp_stats("Bob", bob_stats)
rtp_errors = verify_rtp_bidirectional(bob_stats, "Bob", min_packets=30)
for err in rtp_errors:
result.add_error(err)
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
return result
async def test_hold_unhold() -> TestResult:
result = TestResult("Hold/Unhold with RTP")
with SipBotProcess("Bob") as bob:
bob.start_callee(port=5070, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-hold-{int(time.time())}"
await rwi.originate(
call_id=call_id,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(2)
stats_before = bob.get_rtp_stats()
result.add_rtp_stats("Before Hold", stats_before)
print("Putting call on hold...")
hold_resp = await rwi.hold(call_id)
if hold_resp.get('status') != 'success':
result.add_error(f"Hold failed: {hold_resp.get('error')}")
await asyncio.sleep(2)
print("Resuming call from hold...")
unhold_resp = await rwi.unhold(call_id)
if unhold_resp.get('status') != 'success':
result.add_error(f"Unhold failed: {unhold_resp.get('error')}")
await asyncio.sleep(2)
stats_after = bob.get_rtp_stats()
result.add_rtp_stats("After Unhold", stats_after)
rtp_errors = verify_rtp_bidirectional(stats_after, "Bob", min_packets=50)
for err in rtp_errors:
result.add_error(err)
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
return result
async def test_bridge() -> TestResult:
result = TestResult("Bridge Two Calls with RTP")
with SipBotProcess("Bob") as bob, SipBotProcess("Alice") as alice:
bob.start_callee(port=5070, ring_secs=1)
alice.start_callee(port=5071, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
leg_a = f"test-leg-a-{int(time.time())}"
leg_b = f"test-leg-b-{int(time.time())}"
print(f"Originating leg A to Bob...")
await rwi.originate(
call_id=leg_a,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(1)
print(f"Originating leg B to Alice...")
await rwi.originate(
call_id=leg_b,
destination=f"sip:alice@{PROXY_HOST}:5071",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(2)
print(f"Bridging {leg_a} <-> {leg_b}...")
bridge_resp = await rwi.bridge(leg_a, leg_b)
if bridge_resp.get('status') != 'success':
result.add_error(f"Bridge failed: {bridge_resp.get('error')}")
await asyncio.sleep(3)
bob_stats = bob.get_rtp_stats()
alice_stats = alice.get_rtp_stats()
result.add_rtp_stats("Bob", bob_stats)
result.add_rtp_stats("Alice", alice_stats)
for name, stats in [("Bob", bob_stats), ("Alice", alice_stats)]:
rtp_errors = verify_rtp_bidirectional(stats, name, min_packets=50)
for err in rtp_errors:
result.add_error(err)
await rwi.hangup(leg_a)
await rwi.hangup(leg_b)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
alice.terminate()
return result
async def test_conference() -> TestResult:
result = TestResult("Three-Party Conference with RTP")
conf_id = f"test-conf-{int(time.time())}"
with SipBotProcess("Bob") as bob, SipBotProcess("Alice") as alice, SipBotProcess("Charlie") as charlie:
bob.start_callee(port=5070, ring_secs=1)
alice.start_callee(port=5071, ring_secs=1)
charlie.start_callee(port=5072, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
print(f"Creating conference {conf_id}...")
create_resp = await rwi.conference_create(conf_id)
if create_resp.get('status') != 'success':
result.add_error(f"Conference create failed: {create_resp.get('error')}")
result.finish(False)
return result
calls = []
for name, port in [("Bob", 5070), ("Alice", 5071), ("Charlie", 5072)]:
call_id = f"test-{name.lower()}-{int(time.time())}"
print(f"Originating to {name}...")
await rwi.originate(
call_id=call_id,
destination=f"sip:{name.lower()}@{PROXY_HOST}:{port}",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
calls.append((name, call_id))
await asyncio.sleep(0.5)
await asyncio.sleep(2)
for name, call_id in calls:
print(f"Adding {name} to conference...")
add_resp = await rwi.conference_add(conf_id, call_id)
if add_resp.get('status') != 'success':
result.add_error(f"Failed to add {name}: {add_resp.get('error')}")
await asyncio.sleep(3)
print("Testing mute/unmute...")
await rwi.conference_mute(conf_id, calls[0][1]) await asyncio.sleep(1)
await rwi.conference_unmute(conf_id, calls[0][1]) await asyncio.sleep(1)
bob_stats = bob.get_rtp_stats()
alice_stats = alice.get_rtp_stats()
charlie_stats = charlie.get_rtp_stats()
result.add_rtp_stats("Bob", bob_stats)
result.add_rtp_stats("Alice", alice_stats)
result.add_rtp_stats("Charlie", charlie_stats)
for name, stats in [("Bob", bob_stats), ("Alice", alice_stats), ("Charlie", charlie_stats)]:
rtp_errors = verify_rtp_bidirectional(stats, name, min_packets=30)
for err in rtp_errors:
result.add_error(err)
for name, call_id in calls:
await rwi.hangup(call_id)
await rwi.conference_destroy(conf_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
alice.terminate()
charlie.terminate()
return result
async def test_media_play() -> TestResult:
result = TestResult("Media Play with RTP")
with SipBotProcess("Bob") as bob:
bob.start_callee(port=5070, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-media-{int(time.time())}"
await rwi.originate(
call_id=call_id,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(2)
audio_file = "/tmp/test_audio.wav"
print(f"Playing media: {audio_file}")
play_resp = await rwi.media_play(call_id, "file", audio_file)
if play_resp.get('status') != 'success':
print(f"Media play returned: {play_resp.get('status')} (may be expected if file missing)")
await asyncio.sleep(3)
await rwi.media_stop(call_id)
bob_stats = bob.get_rtp_stats()
result.add_rtp_stats("Bob", bob_stats)
rtp_errors = verify_rtp_bidirectional(bob_stats, "Bob", min_packets=30)
for err in rtp_errors:
result.add_error(err)
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
return result
async def test_queue() -> TestResult:
result = TestResult("Queue Operations (Enqueue/Dequeue/Agent)")
with SipBotProcess("Bob") as bob:
bob.start_callee(port=5070, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-queue-{int(time.time())}"
queue_id = "test-queue"
agent_id = "bob-agent"
print(f"Agent {agent_id} logging in...")
login_resp = await rwi.queue_agent_login(agent_id, queue_id)
if login_resp.get('status') != 'success':
result.add_error(f"Agent login failed: {login_resp.get('error')}")
else:
print(f"✓ Agent {agent_id} logged in to queue {queue_id}")
ready_resp = await rwi.queue_agent_ready(agent_id, True)
if ready_resp.get('status') != 'success':
result.add_error(f"Agent ready failed: {ready_resp.get('error')}")
await rwi.originate(
call_id=call_id,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(1)
print(f"Enqueueing call to {queue_id}...")
enqueue_resp = await rwi.queue_enqueue(call_id, queue_id, priority=1)
if enqueue_resp.get('status') != 'success':
result.add_error(f"Enqueue failed: {enqueue_resp.get('error')}")
await asyncio.sleep(2)
status_resp = await rwi.queue_status(queue_id)
if status_resp.get('status') == 'success':
print(f"Queue status: {status_resp.get('result', {})}")
print(f"Dequeueing call...")
dequeue_resp = await rwi.queue_dequeue(call_id)
if dequeue_resp.get('status') != 'success':
result.add_error(f"Dequeue failed: {dequeue_resp.get('error')}")
logout_resp = await rwi.queue_agent_logout(agent_id)
if logout_resp.get('status') != 'success':
result.add_error(f"Agent logout failed: {logout_resp.get('error')}")
bob_stats = bob.get_rtp_stats()
result.add_rtp_stats("Bob", bob_stats)
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
return result
async def test_ivr_dtmf() -> TestResult:
result = TestResult("IVR Call with DTMF via RWI")
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-ivr-{int(time.time())}"
print(f"Originating call to IVR {IVR_NUMBER}...")
orig_resp = await rwi.originate(
call_id=call_id,
destination=f"sip:{IVR_NUMBER}@{PROXY_HOST}:{PROXY_PORT}",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
if orig_resp.get('status') != 'success':
result.add_error(f"Originate to IVR failed: {orig_resp.get('error')}")
result.finish(False)
return result
await asyncio.sleep(2)
print("Sending DTMF '1'...")
dtmf_resp = await rwi.send_dtmf(call_id, "1")
if dtmf_resp.get('status') != 'success':
result.add_error(f"DTMF send failed: {dtmf_resp.get('error')}")
await asyncio.sleep(2)
print("Sending DTMF '#'...")
dtmf_resp = await rwi.send_dtmf(call_id, "#")
await asyncio.sleep(1)
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
return result
async def test_call_routing_dialplan() -> TestResult:
result = TestResult("Call Routing via Dialplan")
with SipBotProcess("Alice") as alice, SipBotProcess("Bob") as bob:
bob.start_callee(port=5070, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-routing-{int(time.time())}"
print("Originating to dialplan route 1000...")
orig_resp = await rwi.originate(
call_id=call_id,
destination=f"sip:1000@{PROXY_HOST}:{PROXY_PORT}",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
if orig_resp.get('status') != 'success':
result.add_error(f"Originate failed: {orig_resp.get('error')}")
result.finish(False)
return result
await asyncio.sleep(3)
info_resp = await rwi.get_call_info(call_id)
if info_resp.get('status') == 'success':
print(f"Call info: {info_resp.get('result', {})}")
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
alice.terminate()
return result
async def test_call_reject() -> TestResult:
result = TestResult("Call Reject via RWI")
with SipBotProcess("Bob") as bob:
bob.start_callee(port=5070, ring_secs=0, answer_mode="reject")
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-reject-{int(time.time())}"
orig_resp = await rwi.originate(
call_id=call_id,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=10
)
await asyncio.sleep(1)
print("Rejecting call...")
reject_resp = await rwi.reject(call_id, reason="busy")
if reject_resp.get('status') != 'success':
result.add_error(f"Reject failed: {reject_resp.get('error')}")
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
return result
async def test_call_ring() -> TestResult:
result = TestResult("Send Ringing via RWI")
with SipBotProcess("Bob") as bob:
bob.start_callee(port=5070, ring_secs=5) time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-ring-{int(time.time())}"
await rwi.originate(
call_id=call_id,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(1)
print("Sending ringing...")
ring_resp = await rwi.ring(call_id)
if ring_resp.get('status') != 'success':
result.add_error(f"Ring failed: {ring_resp.get('error')}")
await asyncio.sleep(2)
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
return result
async def test_conference_mute_unmute() -> TestResult:
result = TestResult("Conference Mute/Unmute")
conf_id = f"test-conf-mute-{int(time.time())}"
with SipBotProcess("Bob") as bob, SipBotProcess("Alice") as alice:
bob.start_callee(port=5070, ring_secs=1)
alice.start_callee(port=5071, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
print(f"Creating conference {conf_id}...")
create_resp = await rwi.conference_create(conf_id, max_members=5)
if create_resp.get('status') != 'success':
result.add_error(f"Conference create failed: {create_resp.get('error')}")
result.finish(False)
return result
bob_call = f"test-bob-{int(time.time())}"
alice_call = f"test-alice-{int(time.time())}"
await rwi.originate(
call_id=bob_call,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(1)
await rwi.originate(
call_id=alice_call,
destination=f"sip:alice@{PROXY_HOST}:5071",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(2)
await rwi.conference_add(conf_id, bob_call)
await rwi.conference_add(conf_id, alice_call)
await asyncio.sleep(2)
print("Muting Bob...")
mute_resp = await rwi.conference_mute(conf_id, bob_call)
if mute_resp.get('status') != 'success':
result.add_error(f"Mute failed: {mute_resp.get('error')}")
await asyncio.sleep(2)
print("Unmuting Bob...")
unmute_resp = await rwi.conference_unmute(conf_id, bob_call)
if unmute_resp.get('status') != 'success':
result.add_error(f"Unmute failed: {unmute_resp.get('error')}")
await asyncio.sleep(2)
bob_stats = bob.get_rtp_stats()
alice_stats = alice.get_rtp_stats()
result.add_rtp_stats("Bob", bob_stats)
result.add_rtp_stats("Alice", alice_stats)
await rwi.hangup(bob_call)
await rwi.hangup(alice_call)
await rwi.conference_destroy(conf_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
alice.terminate()
return result
async def test_transfer() -> TestResult:
result = TestResult("Call Transfer")
with SipBotProcess("Bob") as bob, SipBotProcess("Charlie") as charlie:
bob.start_callee(port=5070, ring_secs=1)
charlie.start_callee(port=5072, ring_secs=1)
time.sleep(1)
rwi = RwiClient()
if not await rwi.connect():
result.add_error("Failed to connect to RWI")
result.finish(False)
return result
try:
await rwi.subscribe(["default"])
call_id = f"test-transfer-{int(time.time())}"
await rwi.originate(
call_id=call_id,
destination=f"sip:bob@{PROXY_HOST}:5070",
caller_id=f"sip:rwi@{PROXY_HOST}",
timeout_secs=15
)
await asyncio.sleep(2)
target = f"sip:charlie@{PROXY_HOST}:5072"
print(f"Transferring call to {target}...")
transfer_resp = await rwi.transfer(call_id, target)
print(f"Transfer response: {transfer_resp.get('status')}")
await asyncio.sleep(3)
bob_stats = bob.get_rtp_stats()
charlie_stats = charlie.get_rtp_stats()
result.add_rtp_stats("Bob", bob_stats)
result.add_rtp_stats("Charlie", charlie_stats)
await rwi.hangup(call_id)
result.finish(len(result.errors) == 0)
finally:
await rwi.disconnect()
bob.terminate()
charlie.terminate()
return result
async def main():
print("\n" + "="*60)
print("RustPBX RWI E2E Tests with RTP Validation")
print("="*60)
if not WEBSOCKETS_AVAILABLE:
print("❌ websockets module not found. Install: pip install websockets")
return 1
try:
subprocess.run(['sipbot', '--version'], capture_output=True, check=True, timeout=5)
print("✓ sipbot is available")
except (subprocess.CalledProcessError, FileNotFoundError):
print("❌ sipbot not found. Install: cargo install sipbot")
return 1
atexit.register(cleanup_rustpbx)
if not start_rustpbx():
return 1
rwi = RwiClient()
if not await rwi.connect():
print("❌ Failed to connect to RWI")
return 1
results = []
result = await test_list_calls(rwi)
results.append(result)
result.print_summary()
await rwi.disconnect()
test_funcs = [
test_originate_and_answer, test_hold_unhold, test_bridge, test_transfer, test_call_reject, test_call_ring, test_call_routing_dialplan,
test_ivr_dtmf,
test_queue,
test_conference, test_conference_mute_unmute,
test_media_play, ]
for test_func in test_funcs:
try:
result = await test_func()
results.append(result)
result.print_summary()
except Exception as e:
print(f"\n❌ Test {test_func.__name__} failed with exception: {e}")
import traceback
traceback.print_exc()
result = TestResult(test_func.__name__.replace('test_', '').replace('_', ' ').title())
result.add_error(f"Exception: {e}")
result.finish(False)
results.append(result)
result.print_summary()
await asyncio.sleep(1)
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
passed = sum(1 for r in results if r.passed)
total = len(results)
for r in results:
status = "✅" if r.passed else "❌"
print(f"{status} {r.name}")
print(f"\nResult: {passed}/{total} tests passed")
print("="*60)
cleanup_rustpbx()
return 0 if passed == total else 1
if __name__ == '__main__':
try:
exit_code = asyncio.run(main())
except KeyboardInterrupt:
print("\n\nTest interrupted")
cleanup_rustpbx()
exit_code = 130
sys.exit(exit_code)