mod helpers;
use std::collections::HashMap;
use std::time::Duration;
use helpers::sipbot_helper::TestUa;
use helpers::test_server::{TestPbx, TestPbxInject};
use rustpbx::config::ProxyConfig;
use rustpbx::proxy::routing::{
MatchConditions, QueueDialMode, RouteAction, RouteQueueConfig, RouteQueueStrategyConfig,
RouteQueueTargetConfig, RouteRule,
};
#[tokio::test]
async fn test_parallel_queue_fork_first_answer_wins() {
let _ = rustls::crypto::ring::default_provider().install_default();
let _ = tracing_subscriber::fmt()
.with_env_filter("rustpbx=debug,sipbot=debug")
.try_init();
let sip_port = portpicker::pick_unused_port().expect("no free SIP port");
let caller_port = portpicker::pick_unused_port().expect("no free caller port");
let agent1_port = portpicker::pick_unused_port().expect("no free agent1 port");
let agent2_port = portpicker::pick_unused_port().expect("no free agent2 port");
let agent1_uri = format!("sip:agent1@127.0.0.1:{}", agent1_port);
let agent2_uri = format!("sip:agent2@127.0.0.1:{}", agent2_port);
let queue_config = RouteQueueConfig {
name: Some("sales".to_string()),
accept_immediately: true,
strategy: RouteQueueStrategyConfig {
mode: QueueDialMode::Parallel,
wait_timeout_secs: Some(15),
targets: vec![
RouteQueueTargetConfig {
uri: agent1_uri,
label: Some("Agent 1".to_string()),
},
RouteQueueTargetConfig {
uri: agent2_uri,
label: Some("Agent 2".to_string()),
},
],
},
..Default::default()
};
let route_rule = RouteRule {
name: "to-sales-queue".to_string(),
priority: 10,
match_conditions: MatchConditions {
to_user: Some("sales".to_string()),
..Default::default()
},
action: RouteAction {
queue: Some("sales".to_string()),
..Default::default()
},
..Default::default()
};
let mut queues = HashMap::new();
queues.insert("sales".to_string(), queue_config);
let proxy_config = ProxyConfig {
modules: Some(vec!["registrar".to_string(), "call".to_string()]),
acl_rules: Some(vec!["allow all".to_string()]),
ensure_user: Some(false),
..Default::default()
};
let inject = TestPbxInject {
proxy_config: Some(proxy_config),
routes: Some(vec![route_rule]),
queues: Some(queues),
..Default::default()
};
let pbx = TestPbx::start_with_inject(sip_port, inject).await;
tracing::info!(
sip_port,
caller_port,
agent1_port,
agent2_port,
"TestPbx started"
);
let agent1 = TestUa::callee_with_username(agent1_port, 2, "agent1").await;
let agent2 = TestUa::callee_with_username(agent2_port, 0, "agent2").await;
let caller = TestUa::caller_with_target(
caller_port,
"caller",
format!("sip:sales@127.0.0.1:{}", sip_port),
)
.await;
tracing::info!("Caller placed call, waiting for queue to fork and agent to answer…");
tokio::time::sleep(Duration::from_secs(12)).await;
let agent2_rx = agent2.has_rtp_rx();
let agent2_tx = agent2.has_rtp_tx();
let agent2_quality = agent2.audio_quality_summary();
tracing::info!(
agent2_rx,
agent2_tx,
"Agent2 RTP stats: {}, quality: total={} silence={}",
agent2.rtp_stats_summary(),
agent2_quality.total_frames,
agent2_quality.silence_frames
);
assert!(
agent2_rx || agent2_tx,
"Agent2 should have RTP activity (answered the parallel fork)"
);
let agent1_rx = agent1.has_rtp_rx();
let agent1_tx = agent1.has_rtp_tx();
tracing::info!(
agent1_rx,
agent1_tx,
"Agent1 RTP stats: {}",
agent1.rtp_stats_summary()
);
assert!(
!(agent1_rx && agent1_tx),
"Agent1 should NOT have bidirectional RTP (its fork should have been cancelled)"
);
let caller_rx = caller.has_rtp_rx();
let caller_tx = caller.has_rtp_tx();
tracing::info!(
caller_rx,
caller_tx,
"Caller RTP stats: {}",
caller.rtp_stats_summary()
);
assert!(
caller_rx || caller_tx,
"Caller should have RTP activity (bridged with agent)"
);
agent1.stop();
agent2.stop();
caller.stop();
pbx.stop();
tracing::info!("Parallel queue E2E test completed!");
}