rustpbx 0.4.3

A SIP PBX implementation in Rust
Documentation
//! Test harness for [`CallApp`] implementations.
//!
//! Provides [`MockCallStack`]: a fully assembled in-memory call stack that
//! runs a `CallApp` without any real SIP session, media, or database.
//!
//! # Quick start
//!
//! ```rust,ignore
//! use crate::call::app::testing::MockCallStack;
//! use crate::call::domain::CallCommand;
//!
//! let mut stack = MockCallStack::run(Box::new(MyApp::new()), "1001", "1002");
//!
//! // App answers on enter → assert Accept
//! stack.assert_cmd(50, "answer", |c| matches!(c, CallCommand::Accept)).await;
//!
//! // Simulate audio playback finishing
//! stack.audio_complete("default");
//!
//! // App should now hang up
//! stack.assert_cmd(50, "hangup", |c| matches!(c, CallCommand::Hangup { .. })).await;
//! ```

use super::{AppEventLoop, CallApp, CallController, ControllerEvent, RecordingInfo};
use crate::call::app::{ApplicationContext, CallInfo};
use crate::call::domain::CallCommand;
use crate::config::Config;
use crate::proxy::proxy_call::sip_session::SipSessionHandle;
use chrono::Utc;
use sea_orm::DatabaseConnection;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

/// Type alias for mock call stack command sender.
pub type MockCmdTx = tokio::sync::mpsc::UnboundedSender<CallCommand>;
/// Type alias for mock call stack command receiver.
pub type MockCmdRx = tokio::sync::mpsc::UnboundedReceiver<CallCommand>;

/// Fully assembled, in-memory [`CallApp`] harness — no SIP stack required.
///
/// The harness wires up:
/// - A real [`SipSessionHandle`] backed by an in-memory channel (`cmd_rx`).
/// - A synthetic event channel (`event_tx`) for injecting DTMF, audio events, etc.
/// - A real [`AppEventLoop`] running the app in a background Tokio task.
pub struct MockCallStack {
    /// Inject SIP-originated events into the running app.
    event_tx: mpsc::UnboundedSender<ControllerEvent>,
    /// Observe [`CallCommand`]s emitted by the app toward the SIP layer.
    cmd_rx: MockCmdRx,
    /// Cancel token wired to the AppEventLoop's child token.
    cancel: CancellationToken,
    /// Background task running the AppEventLoop.
    join_handle: tokio::task::JoinHandle<anyhow::Result<()>>,
}

impl MockCallStack {
    /// Build the call stack and start the app in a background task.
    ///
    /// # Arguments
    /// * `app`    – The `CallApp` under test.
    /// * `caller` – Simulated caller-ID (e.g. `"1001"` or `"sip:alice@test"`).
    /// * `callee` – Simulated callee-ID (e.g. `"1002"`).
    pub fn run(app: Box<dyn CallApp>, caller: &str, callee: &str) -> Self {
        // Minimal ApplicationContext (no DB, no storage).
        let ctx = ApplicationContext::new(
            DatabaseConnection::Disconnected,
            CallInfo {
                session_id: "test-session".into(),
                caller: caller.into(),
                callee: callee.into(),
                direction: "inbound".into(),
                started_at: Utc::now(),
            },
            Arc::new(Config::default()),
        );

        Self::run_with_context(app, ctx)
    }

    pub fn run_with_context(app: Box<dyn CallApp>, ctx: ApplicationContext) -> Self {
        // Real SipSessionHandle — backed only by channels, no SIP socket.
        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
        let handle = SipSessionHandle::new_for_test("test-session", cmd_tx);

        // Synthetic event channel: test → controller.
        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<ControllerEvent>();

        // Controller + companion timer-fire channel for AppEventLoop.
        let (controller, timer_rx) = CallController::new(handle, event_rx);

        let cancel = CancellationToken::new();
        let event_loop = AppEventLoop::new(app, controller, ctx, cancel.child_token(), timer_rx);

        let join_handle = tokio::spawn(event_loop.run());

        Self {
            event_tx,
            cmd_rx,
            cancel,
            join_handle,
        }
    }

    // ── Event injection ───────────────────────────────────────────────────────

    /// Inject a DTMF digit received from the remote party.
    pub fn dtmf(&self, digit: impl Into<String>) -> &Self {
        let _ = self
            .event_tx
            .send(ControllerEvent::DtmfReceived(digit.into()));
        self
    }

    /// Inject an audio-playback-complete event.
    pub fn audio_complete(&self, track_id: impl Into<String>) -> &Self {
        let _ = self.event_tx.send(ControllerEvent::AudioComplete {
            track_id: track_id.into(),
            interrupted: false,
        });
        self
    }

    /// Inject an interrupted audio event (e.g. barge-in).
    pub fn audio_interrupted(&self, track_id: impl Into<String>) -> &Self {
        let _ = self.event_tx.send(ControllerEvent::AudioComplete {
            track_id: track_id.into(),
            interrupted: true,
        });
        self
    }

    /// Inject a recording-complete event.
    pub fn record_complete(
        &self,
        path: impl Into<String>,
        duration: Duration,
        size_bytes: u64,
    ) -> &Self {
        let _ = self
            .event_tx
            .send(ControllerEvent::RecordingComplete(RecordingInfo {
                path: path.into(),
                duration,
                size_bytes,
            }));
        self
    }

    /// Inject a remote-hangup event (no specific reason).
    pub fn remote_hangup(&self) -> &Self {
        let _ = self.event_tx.send(ControllerEvent::Hangup(None));
        self
    }

    /// Inject a custom event.
    pub fn custom(&self, name: impl Into<String>, data: serde_json::Value) -> &Self {
        let _ = self
            .event_tx
            .send(ControllerEvent::Custom(name.into(), data));
        self
    }

    /// Return a clone of the event sender so callers can inject events
    /// asynchronously (e.g. from a background task wired to a real FileTrack
    /// completion_notify).
    pub fn event_sender(&self) -> mpsc::UnboundedSender<ControllerEvent> {
        self.event_tx.clone()
    }

    // ── Command observation ───────────────────────────────────────────────────

    /// Wait up to `timeout_ms` milliseconds for the next [`CallCommand`] sent by the app.
    ///
    /// Returns `None` on timeout.
    pub async fn next_cmd(&mut self, timeout_ms: u64) -> Option<CallCommand> {
        tokio::time::timeout(Duration::from_millis(timeout_ms), self.cmd_rx.recv())
            .await
            .ok()
            .flatten()
    }

    /// Assert the next command satisfies `matcher` within `timeout_ms` ms.
    ///
    /// # Panics
    /// If the timeout elapses or the matcher returns `false`.
    pub async fn assert_cmd<F>(&mut self, timeout_ms: u64, label: &str, matcher: F)
    where
        F: FnOnce(&CallCommand) -> bool,
    {
        let cmd = self
            .next_cmd(timeout_ms)
            .await
            .unwrap_or_else(|| panic!("timed out waiting for command '{label}'"));
        assert!(matcher(&cmd), "command mismatch for '{label}': got {cmd:?}");
    }

    /// Drain all immediately-available commands without blocking.
    pub fn drain_cmds(&mut self) -> Vec<CallCommand> {
        let mut out = Vec::new();
        while let Ok(cmd) = self.cmd_rx.try_recv() {
            out.push(cmd);
        }
        out
    }

    // ── Lifecycle ─────────────────────────────────────────────────────────────

    /// Cancel the running event loop (simulate system shutdown).
    pub fn cancel(&self) {
        self.cancel.cancel();
    }

    /// Wait for the event loop task to complete and surface any error.
    pub async fn join(self) -> anyhow::Result<()> {
        self.join_handle.await.expect("MockCallStack task panicked")
    }

    /// Inject a timeout event.
    pub fn timeout(&self, id: impl Into<String>) -> &Self {
        let _ = self.event_tx.send(ControllerEvent::Timeout(id.into()));
        self
    }

    /// Simulate entering the app (calls on_enter and processes initial commands).
    pub async fn enter(&mut self) {
        // Wait for initial commands from on_enter
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
}