use super::{AppEventLoop, CallApp, CallController, ControllerEvent, RecordingInfo};
use crate::call::app::{ApplicationContext, CallInfo};
use crate::call::domain::CallCommand;
use crate::config::Config;
use crate::proxy::proxy_call::sip_session::SipSessionHandle;
use chrono::Utc;
use sea_orm::DatabaseConnection;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub type MockCmdTx = tokio::sync::mpsc::UnboundedSender<CallCommand>;
pub type MockCmdRx = tokio::sync::mpsc::Receiver<CallCommand>;
pub struct MockCallStack {
event_tx: mpsc::UnboundedSender<ControllerEvent>,
cmd_rx: MockCmdRx,
cancel: CancellationToken,
join_handle: tokio::task::JoinHandle<anyhow::Result<()>>,
}
impl MockCallStack {
pub fn run(app: Box<dyn CallApp>, caller: &str, callee: &str) -> Self {
let ctx = ApplicationContext::new(
DatabaseConnection::Disconnected,
CallInfo {
session_id: "test-session".into(),
caller: caller.into(),
callee: callee.into(),
direction: "inbound".into(),
started_at: Utc::now(),
},
Arc::new(Config::default()),
);
Self::run_with_context(app, ctx)
}
pub fn run_with_context(app: Box<dyn CallApp>, ctx: ApplicationContext) -> Self {
let (cmd_tx, cmd_rx) = mpsc::channel(256);
let handle = SipSessionHandle::new_for_test("test-session", cmd_tx);
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<ControllerEvent>();
let (controller, timer_rx) = CallController::new(handle, event_rx);
let cancel = CancellationToken::new();
let event_loop = AppEventLoop::new(app, controller, ctx, cancel.child_token(), timer_rx);
let join_handle = tokio::spawn(event_loop.run());
Self {
event_tx,
cmd_rx,
cancel,
join_handle,
}
}
pub fn dtmf(&self, digit: impl Into<String>) -> &Self {
let _ = self
.event_tx
.send(ControllerEvent::DtmfReceived(digit.into()));
self
}
pub fn audio_complete(&self, track_id: impl Into<String>) -> &Self {
let _ = self.event_tx.send(ControllerEvent::AudioComplete {
track_id: track_id.into(),
interrupted: false,
});
self
}
pub fn audio_interrupted(&self, track_id: impl Into<String>) -> &Self {
let _ = self.event_tx.send(ControllerEvent::AudioComplete {
track_id: track_id.into(),
interrupted: true,
});
self
}
pub fn record_complete(
&self,
path: impl Into<String>,
duration: Duration,
size_bytes: u64,
) -> &Self {
let _ = self
.event_tx
.send(ControllerEvent::RecordingComplete(RecordingInfo {
path: path.into(),
duration,
size_bytes,
}));
self
}
pub fn remote_hangup(&self) -> &Self {
let _ = self.event_tx.send(ControllerEvent::Hangup(None));
self
}
pub fn custom(&self, name: impl Into<String>, data: serde_json::Value) -> &Self {
let _ = self
.event_tx
.send(ControllerEvent::Custom(name.into(), data));
self
}
pub fn event_sender(&self) -> mpsc::UnboundedSender<ControllerEvent> {
self.event_tx.clone()
}
pub async fn next_cmd(&mut self, timeout_ms: u64) -> Option<CallCommand> {
tokio::time::timeout(Duration::from_millis(timeout_ms), self.cmd_rx.recv())
.await
.ok()
.flatten()
}
pub async fn assert_cmd<F>(&mut self, timeout_ms: u64, label: &str, matcher: F)
where
F: FnOnce(&CallCommand) -> bool,
{
let cmd = self
.next_cmd(timeout_ms)
.await
.unwrap_or_else(|| panic!("timed out waiting for command '{label}'"));
assert!(matcher(&cmd), "command mismatch for '{label}': got {cmd:?}");
}
pub fn drain_cmds(&mut self) -> Vec<CallCommand> {
let mut out = Vec::new();
while let Ok(cmd) = self.cmd_rx.try_recv() {
out.push(cmd);
}
out
}
pub fn cancel(&self) {
self.cancel.cancel();
}
pub async fn join(self) -> anyhow::Result<()> {
self.join_handle.await.expect("MockCallStack task panicked")
}
pub fn timeout(&self, id: impl Into<String>) -> &Self {
let _ = self.event_tx.send(ControllerEvent::Timeout(id.into()));
self
}
pub async fn enter(&mut self) {
tokio::time::sleep(Duration::from_millis(50)).await;
}
}