rustpbx 0.4.7

A SIP PBX implementation in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
use super::common::{
    create_test_request, create_test_server, create_test_server_with_config, create_transaction,
};
use crate::call::domain::{LegId, MediaCapability, MediaPathMode};
use crate::call::runtime::{AppDescriptor, AppRuntime, AppRuntimeError, AppStatus};
use crate::call::{
    DialDirection, DialStrategy, Dialplan, FailureAction, MediaConfig, QueueFallbackAction,
    QueuePlan, TransactionCookie,
};
use crate::config::{MediaProxyMode, ProxyConfig};
use crate::proxy::proxy_call::sip_session::SipSession;
use crate::proxy::proxy_call::state::CallContext;
use crate::proxy::proxy_call::test_util::tests::MockMediaPeer;
use crate::proxy::routing::{
    RouteQueueConfig, RouteQueueFallbackConfig, RouteQueueStrategyConfig, RouteQueueTargetConfig,
};
use async_trait::async_trait;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

struct AlreadyRunningThenOkRuntime {
    start_calls: AtomicUsize,
    stop_calls: AtomicUsize,
    stop_returns_not_running: bool,
    second_start_should_fail: bool,
}

impl AlreadyRunningThenOkRuntime {
    fn new() -> Self {
        Self {
            start_calls: AtomicUsize::new(0),
            stop_calls: AtomicUsize::new(0),
            stop_returns_not_running: false,
            second_start_should_fail: false,
        }
    }

    fn with_stop_not_running(mut self) -> Self {
        self.stop_returns_not_running = true;
        self
    }

    fn with_second_start_error(mut self) -> Self {
        self.second_start_should_fail = true;
        self
    }
}

#[async_trait]
impl AppRuntime for AlreadyRunningThenOkRuntime {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
    async fn start_app(
        &self,
        app_name: &str,
        _params: Option<serde_json::Value>,
        _auto_answer: bool,
    ) -> crate::call::runtime::AppResult<()> {
        let idx = self.start_calls.fetch_add(1, Ordering::SeqCst);
        if idx == 0 {
            return Err(AppRuntimeError::AlreadyRunning(app_name.to_string()));
        }
        if self.second_start_should_fail {
            return Err(AppRuntimeError::UnknownApp(app_name.to_string()));
        }
        Ok(())
    }

    async fn stop_app(&self, _reason: Option<String>) -> crate::call::runtime::AppResult<()> {
        self.stop_calls.fetch_add(1, Ordering::SeqCst);
        if self.stop_returns_not_running {
            return Err(AppRuntimeError::NotRunning);
        }
        Ok(())
    }

    fn inject_event(&self, _event: serde_json::Value) -> crate::call::runtime::AppResult<()> {
        Ok(())
    }

    fn is_running(&self) -> bool {
        false
    }

    fn status(&self) -> AppStatus {
        AppStatus::Idle
    }

    fn current_app(&self) -> Option<String> {
        None
    }

    fn required_capabilities(&self) -> Vec<MediaCapability> {
        vec![]
    }

    fn app_descriptor(&self, _app_name: &str) -> Option<AppDescriptor> {
        None
    }
}

struct AlwaysFailStartRuntime;

struct StartOnlyRuntime {
    start_calls: AtomicUsize,
}

impl StartOnlyRuntime {
    fn new() -> Self {
        Self {
            start_calls: AtomicUsize::new(0),
        }
    }
}

#[async_trait]
impl AppRuntime for AlwaysFailStartRuntime {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
    async fn start_app(
        &self,
        app_name: &str,
        _params: Option<serde_json::Value>,
        _auto_answer: bool,
    ) -> crate::call::runtime::AppResult<()> {
        Err(AppRuntimeError::UnknownApp(app_name.to_string()))
    }

    async fn stop_app(&self, _reason: Option<String>) -> crate::call::runtime::AppResult<()> {
        Ok(())
    }

    fn inject_event(&self, _event: serde_json::Value) -> crate::call::runtime::AppResult<()> {
        Ok(())
    }

    fn is_running(&self) -> bool {
        false
    }

    fn status(&self) -> AppStatus {
        AppStatus::Idle
    }

    fn current_app(&self) -> Option<String> {
        None
    }

    fn required_capabilities(&self) -> Vec<MediaCapability> {
        vec![]
    }

    fn app_descriptor(&self, _app_name: &str) -> Option<AppDescriptor> {
        None
    }
}

#[async_trait]
impl AppRuntime for StartOnlyRuntime {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
    async fn start_app(
        &self,
        _app_name: &str,
        _params: Option<serde_json::Value>,
        _auto_answer: bool,
    ) -> crate::call::runtime::AppResult<()> {
        self.start_calls.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }

    async fn stop_app(&self, _reason: Option<String>) -> crate::call::runtime::AppResult<()> {
        Ok(())
    }

    fn inject_event(&self, _event: serde_json::Value) -> crate::call::runtime::AppResult<()> {
        Ok(())
    }

    fn is_running(&self) -> bool {
        false
    }

    fn status(&self) -> AppStatus {
        AppStatus::Idle
    }

    fn current_app(&self) -> Option<String> {
        None
    }

    fn required_capabilities(&self) -> Vec<MediaCapability> {
        vec![]
    }

    fn app_descriptor(&self, _app_name: &str) -> Option<AppDescriptor> {
        None
    }
}

async fn build_session(dialplan: Dialplan) -> SipSession {
    let (server, _) = create_test_server().await;
    build_session_on_server(server, dialplan).await
}

async fn build_session_with_config(dialplan: Dialplan, config: ProxyConfig) -> SipSession {
    let (server, _) = create_test_server_with_config(config).await;
    build_session_on_server(server, dialplan).await
}

async fn build_session_on_server(
    server: Arc<crate::proxy::server::SipServerInner>,
    dialplan: Dialplan,
) -> SipSession {
    let request = create_test_request(
        rsipstack::sip::Method::Invite,
        "alice",
        None,
        "rustpbx.com",
        None,
    );
    let (tx, _) = create_transaction(request).await;
    let (state_tx, _state_rx) = mpsc::unbounded_channel();
    let server_dialog = server
        .dialog_layer
        .get_or_create_server_invite(&tx, state_tx, None, None)
        .expect("failed to create server dialog");

    let context = CallContext {
        session_id: "test-session".to_string(),
        dialplan: Arc::new(dialplan),
        cookie: TransactionCookie::default(),
        start_time: Instant::now(),
        original_caller: "sip:alice@rustpbx.com".to_string(),
        original_callee: "sip:ivr@rustpbx.com".to_string(),
        max_forwards: 70,
        created_at: chrono::Utc::now().to_rfc3339(),
        metadata: None,
    };

    let caller_peer = Arc::new(MockMediaPeer::new());
    let callee_peer = Arc::new(MockMediaPeer::new());
    let use_media_proxy =
        SipSession::check_media_proxy(&context, &context.dialplan.media.proxy_mode);
    let (session, _handle, _cmd_rx) = SipSession::new(
        server,
        CancellationToken::new(),
        None,
        context,
        server_dialog,
        use_media_proxy,
        caller_peer,
        callee_peer,
    );
    session
}

fn build_dialplan_with_mode(mode: MediaProxyMode) -> Dialplan {
    let request = create_test_request(
        rsipstack::sip::Method::Invite,
        "alice",
        None,
        "rustpbx.com",
        None,
    );
    Dialplan::new("test-session".to_string(), request, DialDirection::Inbound)
        .with_media(MediaConfig::new().with_proxy_mode(mode))
}

fn make_queue_hangup_config(queue_name: &str) -> ProxyConfig {
    let mut config = ProxyConfig::default();
    config.queues.insert(
        queue_name.to_string(),
        RouteQueueConfig {
            name: Some(queue_name.to_string()),
            strategy: RouteQueueStrategyConfig {
                targets: vec![RouteQueueTargetConfig {
                    uri: "skill-group:missing".to_string(),
                    label: Some("missing-skill-group".to_string()),
                }],
                ..Default::default()
            },
            fallback: Some(RouteQueueFallbackConfig {
                failure_code: Some(486),
                failure_reason: Some("All agents unavailable".to_string()),
                ..Default::default()
            }),
            ..Default::default()
        },
    );
    config
}

#[tokio::test]
async fn test_media_proxy_auto_anchors_application_flow() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );

    let session = build_session(dialplan).await;
    assert_eq!(session.media_profile.path, MediaPathMode::Anchored);
}

#[tokio::test]
async fn test_media_proxy_auto_anchors_queue_flow() {
    let queue_plan = QueuePlan {
        queue_name: "support".to_string(),
        ..Default::default()
    };
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_queue(queue_plan);

    let session = build_session(dialplan).await;
    assert_eq!(session.media_profile.path, MediaPathMode::Anchored);
}

#[tokio::test]
async fn test_media_proxy_auto_keeps_plain_targets_bypass_without_recording() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto)
        .with_targets(DialStrategy::Sequential(vec![]));

    let session = build_session(dialplan).await;
    assert_eq!(session.media_profile.path, MediaPathMode::Bypass);
}

#[tokio::test]
async fn test_start_ivr_app_restarts_after_already_running() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );
    let mut session = build_session(dialplan).await;

    let runtime = Arc::new(AlreadyRunningThenOkRuntime::new());
    session.app_runtime = runtime.clone();

    session
        .start_ivr_app("hello")
        .await
        .expect("start_ivr_app should recover from AlreadyRunning");

    assert_eq!(runtime.start_calls.load(Ordering::SeqCst), 2);
    assert_eq!(runtime.stop_calls.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn test_start_ivr_app_restarts_even_if_stop_reports_not_running() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );
    let mut session = build_session(dialplan).await;

    let runtime = Arc::new(AlreadyRunningThenOkRuntime::new().with_stop_not_running());
    session.app_runtime = runtime.clone();

    session
        .start_ivr_app("hello")
        .await
        .expect("restart should continue when stop_app returns NotRunning");

    assert_eq!(runtime.start_calls.load(Ordering::SeqCst), 2);
    assert_eq!(runtime.stop_calls.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn test_start_ivr_app_propagates_non_retryable_start_error() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );
    let mut session = build_session(dialplan).await;

    session.app_runtime = Arc::new(AlwaysFailStartRuntime);

    let err = session
        .start_ivr_app("hello")
        .await
        .expect_err("non-AlreadyRunning error should be returned");
    assert!(
        err.to_string().contains("Failed to start IVR 'hello'"),
        "unexpected error: {}",
        err
    );
}

#[tokio::test]
async fn test_start_ivr_app_reports_restart_failure_when_second_start_fails() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );
    let mut session = build_session(dialplan).await;

    let runtime = Arc::new(AlreadyRunningThenOkRuntime::new().with_second_start_error());
    session.app_runtime = runtime.clone();

    let err = session
        .start_ivr_app("hello")
        .await
        .expect_err("second start failure should be surfaced");
    assert!(
        err.to_string().contains("Failed to restart IVR 'hello'"),
        "unexpected error: {}",
        err
    );
    assert_eq!(runtime.start_calls.load(Ordering::SeqCst), 2);
    assert_eq!(runtime.stop_calls.load(Ordering::SeqCst), 1);
}

#[test]
fn test_queue_fallback_without_prompt_maps_to_hangup() {
    let queue = RouteQueueConfig {
        name: Some("support".to_string()),
        fallback: Some(RouteQueueFallbackConfig {
            failure_code: Some(486),
            failure_reason: Some("All agents unavailable".to_string()),
            failure_prompt: None,
            ..Default::default()
        }),
        ..Default::default()
    };

    let plan = queue.to_queue_plan().expect("queue plan should build");
    match plan.fallback {
        Some(QueueFallbackAction::Failure(FailureAction::Hangup { .. })) => {}
        other => panic!("expected Hangup fallback, got {:?}", other),
    }
}

#[test]
fn test_queue_fallback_with_prompt_maps_to_play_then_hangup() {
    let queue = RouteQueueConfig {
        name: Some("support".to_string()),
        fallback: Some(RouteQueueFallbackConfig {
            failure_code: Some(486),
            failure_reason: Some("All agents unavailable".to_string()),
            failure_prompt: Some("sounds/queue-fallback.wav".to_string()),
            ..Default::default()
        }),
        ..Default::default()
    };

    let plan = queue.to_queue_plan().expect("queue plan should build");
    match plan.fallback {
        Some(QueueFallbackAction::Failure(FailureAction::PlayThenHangup {
            audio_file, ..
        })) => assert_eq!(audio_file, "sounds/queue-fallback.wav"),
        other => panic!("expected PlayThenHangup fallback, got {:?}", other),
    }
}

#[tokio::test]
async fn test_queue_transfer_without_return_ivr_keeps_hangup_fallback_when_no_agents() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );
    let config = make_queue_hangup_config("support");
    let mut session = build_session_with_config(dialplan, config).await;
    let (callee_tx, mut callee_rx) = mpsc::unbounded_channel();
    session.callee_event_tx = Some(callee_tx);

    let err = session
        .handle_queue_transfer(
            LegId::from("caller"),
            "support",
            None,
            Vec::new(),
            &mut callee_rx,
        )
        .await
        .expect_err("without return_ivr, hangup fallback should surface failure");
    assert!(
        err.to_string().contains("Queue transfer failed"),
        "unexpected error: {}",
        err
    );
}

#[tokio::test]
async fn test_queue_transfer_return_ivr_overrides_hangup_fallback_when_no_agents() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );
    let config = make_queue_hangup_config("support");
    let mut session = build_session_with_config(dialplan, config).await;
    let (callee_tx, mut callee_rx) = mpsc::unbounded_channel();
    session.callee_event_tx = Some(callee_tx);

    let runtime = Arc::new(StartOnlyRuntime::new());
    session.app_runtime = runtime.clone();

    session
        .handle_queue_transfer(
            LegId::from("caller"),
            "support",
            Some("hello".to_string()),
            Vec::new(),
            &mut callee_rx,
        )
        .await
        .expect("return_ivr should override hangup fallback and start IVR");

    assert_eq!(runtime.start_calls.load(Ordering::SeqCst), 1);
}

// ─── DTMF fix regression tests ───────────────────────────────────────────────
//
// These tests guard against regressions introduced by the wholesale DTMF fix:
// "accept_call() must call start_caller_ingress_monitor_if_needed() BEFORE
// setting connected_callee, but only for bridge-based calls."

/// accept_call must set connected_callee for a plain P2P (Targets, no bridge) call.
///
/// Regression: the fix must not prevent connected_callee from being assigned.
#[tokio::test]
async fn test_accept_call_sets_connected_callee_for_p2p_targets_flow() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto)
        .with_targets(crate::call::DialStrategy::Sequential(vec![]));
    let mut session = build_session(dialplan).await;

    // Simulate a P2P RTP↔RTP call: no bridge created, default false.
    assert!(!session.has_active_caller_ingress_monitor());

    session
        .accept_call(Some("sip:bob@rustpbx.com".to_string()), None, None)
        .await
        .expect("accept_call should succeed for P2P call");

    assert_eq!(
        session.meta.connected_callee,
        Some("sip:bob@rustpbx.com".to_string()),
        "connected_callee must be set after accept_call"
    );
    // No ingress monitor task should be started for non-bridge P2P calls.
    assert!(
        !session.has_active_caller_ingress_monitor(),
        "ingress monitor must not be started for non-bridge P2P call"
    );
}

/// accept_call must set connected_callee for an Application-flow (IVR/Queue) call.
///
/// In the Application flow the ingress monitor is set up by execute_flow() before
/// the callee connects.  accept_call() must not disrupt that nor prevent the
/// connected_callee assignment.
#[tokio::test]
async fn test_accept_call_sets_connected_callee_for_application_ivr_flow() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto).with_application(
        "ivr".to_string(),
        None,
        true,
    );
    let mut session = build_session(dialplan).await;

    session
        .accept_call(Some("sip:agent@rustpbx.com".to_string()), None, None)
        .await
        .expect("accept_call should succeed for IVR flow");

    assert_eq!(
        session.meta.connected_callee,
        Some("sip:agent@rustpbx.com".to_string()),
        "connected_callee must be set after accept_call in IVR/Application flow"
    );
}

/// For bridge-based calls (Targets flow, e.g. wholesale with WebRTC caller),
/// accept_call() must attempt to install the DTMF sink before setting
/// connected_callee, so that the connected_callee guard inside
/// start_caller_ingress_monitor_if_needed() does not short-circuit setup.
///
/// In unit tests there is no real BridgePeer, so the bridge path exits early
/// after the "no bridge" check.  What we CAN assert is:
/// 1. accept_call() completes without panic.
/// 2. connected_callee is correctly set.
/// 3. A subsequent call to start_caller_ingress_monitor_if_needed() returns
///    immediately because connected_callee is now Some (guard fires) —
///    verifiable via the absence of log-level side effects and by calling
///    accept_call again idempotently.
#[tokio::test]
async fn test_accept_call_for_bridge_wholesale_flow_sets_connected_callee() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto)
        .with_targets(crate::call::DialStrategy::Sequential(vec![]));
    let mut session = build_session(dialplan).await;

    // Simulate bridge-based wholesale call (WebRTC→bridge→RTP).
    // We only set the flag; we do NOT set session.media.answer because accept_call
    // would try to send a real 200 OK on the SIP dialog (unit tests have no
    // transport), which would fail at the send step, not at the DTMF-setup step.
    // The intent here is to verify that accept_call does not panic and that
    // connected_callee is assigned correctly even when caller_uses_bridge = true.
    session.set_caller_uses_bridge_for_test(true);

    session
        .accept_call(Some("sip:trunk@wholesale.example".to_string()), None, None)
        .await
        .expect("accept_call should succeed for bridge-based wholesale call");

    assert_eq!(
        session.meta.connected_callee,
        Some("sip:trunk@wholesale.example".to_string()),
        "connected_callee must be set after accept_call for bridge-based call"
    );
}

/// The guard in start_caller_ingress_monitor_if_needed must fire when
/// connected_callee is already set — preventing duplicate DTMF sink installation.
///
/// We verify this by calling accept_call twice: the second call should succeed
/// without panic, and connected_callee must be updated to the new value.
/// If the guard were broken, a double-bridge-start or double-task-spawn would
/// occur (causing a race or panic in production but likely just a silent no-op
/// in unit tests).
#[tokio::test]
async fn test_accept_call_guard_prevents_duplicate_dtmf_setup() {
    let dialplan = build_dialplan_with_mode(MediaProxyMode::Auto)
        .with_targets(crate::call::DialStrategy::Sequential(vec![]));
    let mut session = build_session(dialplan).await;

    // First accept — callee A
    session
        .accept_call(Some("sip:a@example.com".to_string()), None, None)
        .await
        .expect("first accept_call should succeed");
    assert_eq!(
        session.meta.connected_callee,
        Some("sip:a@example.com".to_string())
    );

    // Second accept — callee B (re-INVITE / transfer scenario).
    // The guard inside start_caller_ingress_monitor_if_needed must see
    // connected_callee = Some and skip re-setup.
    session
        .accept_call(Some("sip:b@example.com".to_string()), None, None)
        .await
        .expect("second accept_call should not panic or fail");
    assert_eq!(
        session.meta.connected_callee,
        Some("sip:b@example.com".to_string())
    );
}