mod helpers;
use futures::{SinkExt, StreamExt};
use helpers::sipbot_helper::TestUa;
use helpers::test_server::{TEST_TOKEN, TestPbx};
use std::time::Duration;
use tokio::time::timeout;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use uuid::Uuid;
type WsStream =
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
async fn ws_connect(rwi_url: &str) -> WsStream {
let url = format!("{}?token={}", rwi_url, TEST_TOKEN);
let (ws, _) = timeout(Duration::from_secs(5), connect_async(&url))
.await
.expect("connect timeout")
.expect("connect error");
ws
}
async fn ws_send_recv(ws: &mut WsStream, json: &str) -> serde_json::Value {
let req: serde_json::Value = serde_json::from_str(json).expect("invalid JSON");
let action_id = req["action_id"]
.as_str()
.expect("missing action_id")
.to_string();
ws.send(Message::Text(json.into())).await.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
panic!("ws_send_recv: timed out waiting for response");
}
let msg = timeout(remaining, ws.next())
.await
.expect("recv timeout")
.expect("stream ended")
.expect("ws error");
if let Message::Text(t) = msg {
let v: serde_json::Value = serde_json::from_str(&t).expect("not JSON");
if (v["type"] == "command_completed" || v["type"] == "command_failed")
&& v["action_id"] == action_id
{
return v;
}
}
}
}
#[allow(dead_code)]
async fn recv_until(
ws: &mut WsStream,
timeout_secs: u64,
predicate: impl Fn(&serde_json::Value) -> bool,
) -> serde_json::Value {
let deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
panic!("recv_until: timed out waiting for matching frame");
}
let msg = timeout(remaining, ws.next())
.await
.expect("recv_until timeout")
.expect("stream ended")
.expect("ws error");
let v: serde_json::Value = match msg {
Message::Text(t) => serde_json::from_str(&t).expect("not JSON"),
Message::Ping(_) | Message::Pong(_) => continue,
other => panic!("unexpected frame: {other:?}"),
};
if predicate(&v) {
return v;
}
}
}
fn rwi_req(action: &str, params: serde_json::Value) -> (String, String) {
let id = Uuid::new_v4().to_string();
let json = serde_json::to_string(&serde_json::json!({
"rwi": "1.0",
"action_id": id,
"action": action,
"params": params,
}))
.unwrap();
(id, json)
}
#[tokio::test]
async fn test_ivr_queue_agent_flow() {
let _ = tracing_subscriber::fmt::try_init();
let sip_port = portpicker::pick_unused_port().expect("no free SIP port");
let _caller_port = portpicker::pick_unused_port().expect("no free caller port");
let agent_port = portpicker::pick_unused_port().expect("no free agent port");
let pbx = TestPbx::start(sip_port).await;
let agent = TestUa::callee_with_username(agent_port, 1, "agent1").await;
let mut ws = ws_connect(&pbx.rwi_url).await;
let (_, sub_json) = rwi_req(
"session.subscribe",
serde_json::json!({"contexts": ["default"]}),
);
let v = ws_send_recv(&mut ws, &sub_json).await;
assert_eq!(v["type"], "command_completed", "subscribe failed: {v}");
let call_id = format!("e2e-ivr-{}", Uuid::new_v4());
let (_, orig_json) = rwi_req(
"call.originate",
serde_json::json!({
"call_id": call_id,
"destination": format!("sip:ivr@127.0.0.1:{}", sip_port),
"caller_id": format!("sip:rwi@{}", pbx.sip_host()),
"context": "default",
"timeout_secs": 15,
}),
);
let v = ws_send_recv(&mut ws, &orig_json).await;
assert_eq!(v["type"], "command_completed", "originate failed: {v}");
tokio::time::sleep(Duration::from_secs(2)).await;
tracing::info!("Found call_id: {}", call_id);
tracing::info!("Starting Queue app on session {}", call_id);
let (_, app_start_json) = rwi_req(
"call.app_start",
serde_json::json!({
"call_id": call_id,
"app_name": "queue",
"params": {"name": "sales"},
}),
);
let v = ws_send_recv(&mut ws, &app_start_json).await;
assert_eq!(v["type"], "command_completed", "app_start failed: {v}");
tokio::time::sleep(Duration::from_secs(1)).await;
tracing::info!("Stopping queue app on session {}", call_id);
let (_, app_stop_json) = rwi_req("call.app_stop", serde_json::json!({"call_id": call_id}));
let v = ws_send_recv(&mut ws, &app_stop_json).await;
tracing::info!("app_stop response: {:?}", v);
tokio::time::sleep(Duration::from_secs(1)).await;
tracing::info!("Adding Agent SIP leg to session {}", call_id);
let (_, agent_add_json) = rwi_req(
"call.leg_add",
serde_json::json!({
"call_id": call_id,
"target": agent.sip_uri("agent1"),
"leg_id": "agent-1",
}),
);
let v = ws_send_recv(&mut ws, &agent_add_json).await;
assert_eq!(v["type"], "command_completed", "agent leg_add failed: {v}");
tokio::time::sleep(Duration::from_secs(2)).await;
let agent_stats = agent.rtp_stats_summary();
tracing::info!("Agent RTP stats: {}", agent_stats);
ws.close(None).await.unwrap();
agent.stop();
pbx.stop();
tracing::info!("IVR -> Queue -> Agent flow test completed!");
}