use super::agent_registry::{AgentRegistry, PresenceState, RoutingStrategy};
use super::{AppAction, ApplicationContext, CallApp, CallAppType, CallController, PlaybackHandle};
use crate::call::{
DialStrategy, FailureAction, Location, QueueFallbackAction, QueueHoldConfig, QueuePlan,
VoicePrompts,
};
use crate::callrecord::CallRecordHangupReason;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Default)]
pub struct QueueStats {
pub queue_id: String,
pub calls_offered: u64,
pub calls_answered: u64,
pub calls_abandoned: u64,
pub total_wait_secs: u64,
pub total_handle_secs: u64,
pub current_waiting: u32,
pub available_agents: u32,
}
impl QueueStats {
pub fn avg_wait_secs(&self) -> f64 {
if self.calls_answered > 0 {
self.total_wait_secs as f64 / self.calls_answered as f64
} else {
0.0
}
}
pub fn avg_handle_secs(&self) -> f64 {
if self.calls_answered > 0 {
self.total_handle_secs as f64 / self.calls_answered as f64
} else {
0.0
}
}
pub fn service_level(&self, _threshold_secs: u64) -> f64 {
if self.calls_offered > 0 {
let sla_calls = self.calls_answered; (sla_calls as f64 / self.calls_offered as f64) * 100.0
} else {
100.0
}
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub enum NoAnswerAction {
#[default]
Voicemail,
Hangup,
Callback,
FallbackSkill,
BackToIvr,
}
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub name: String,
pub accept_immediately: bool,
pub hold: Option<QueueHoldConfig>,
pub fallback: Option<QueueFallbackAction>,
pub agents: Vec<Location>,
pub strategy: DialStrategy,
pub ring_timeout: Option<Duration>,
pub skill_routing_enabled: bool,
pub required_skills: Vec<String>,
pub sla_threshold_secs: u64,
pub max_wait_secs: u64,
pub announce_position: bool,
pub announce_wait_time: bool,
pub retry_interval_secs: u64,
pub max_retries: u32,
pub autonomous_routing: bool,
pub routing_strategy: RoutingStrategy,
pub acd_policy: Option<String>,
pub no_answer_action: NoAnswerAction,
pub fallback_skill_group: Option<String>,
pub sla_monitoring: bool,
pub metrics_enabled: bool,
pub callback_enabled: bool,
pub callback_retry_secs: u64,
pub voice_prompts: Option<VoicePrompts>,
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
name: String::new(),
accept_immediately: true,
hold: None,
fallback: None,
agents: Vec::new(),
strategy: DialStrategy::Sequential(Vec::new()),
ring_timeout: Some(Duration::from_secs(20)),
skill_routing_enabled: false,
required_skills: Vec::new(),
sla_threshold_secs: 20,
max_wait_secs: 300,
announce_position: false,
announce_wait_time: false,
retry_interval_secs: 5,
max_retries: 2,
autonomous_routing: false,
routing_strategy: RoutingStrategy::LongestIdle,
acd_policy: None,
no_answer_action: NoAnswerAction::Voicemail,
fallback_skill_group: None,
sla_monitoring: false,
metrics_enabled: false,
callback_enabled: false,
callback_retry_secs: 300,
voice_prompts: None,
}
}
}
impl QueueConfig {
pub fn to_plan(&self) -> QueuePlan {
QueuePlan {
accept_immediately: self.accept_immediately,
passthrough_ringback: false,
hold: self.hold.clone(),
fallback: self.fallback.clone(),
dial_strategy: Some(self.strategy.clone()),
ring_timeout: self.ring_timeout,
acd_policy: self.acd_policy.clone(),
label: Some(self.name.clone()),
retry_codes: None,
no_trying_timeout: None,
voice_prompts: self.voice_prompts.clone(),
queue_name: self.name.clone(),
failure_audio: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum QueueState {
Init,
Answering,
PlayingHold { attempt: u32 },
PlayingTransferPrompt { agent_uri: String },
PlayingBusyPrompt,
PlayingNoAnswerPrompt,
DialingAgents { attempt: u32 },
Connected { agent_uri: String },
ExecutingFallback,
Done,
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum AgentUnavailableReason {
Busy,
NoAnswer,
}
pub struct QueueApp {
plan: QueuePlan,
config: QueueConfig,
state: QueueState,
hold_playback: Option<PlaybackHandle>,
answered: bool,
current_agent_idx: usize,
dial_attempts: u32,
dynamic_agents: Option<Vec<Location>>,
agent_registry: Option<Arc<dyn AgentRegistry>>,
call_id: String,
enqueued_at: Option<Instant>,
stats: Arc<RwLock<HashMap<String, QueueStats>>>,
}
impl QueueApp {
pub fn new(plan: QueuePlan, config: QueueConfig) -> Self {
Self {
plan,
config,
state: QueueState::Init,
hold_playback: None,
answered: false,
current_agent_idx: 0,
dial_attempts: 0,
dynamic_agents: None,
agent_registry: None,
call_id: String::new(),
enqueued_at: None,
stats: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn with_agent_registry(mut self, registry: Arc<dyn AgentRegistry>) -> Self {
self.agent_registry = Some(registry);
self
}
pub fn with_call_id(mut self, call_id: String) -> Self {
self.call_id = call_id;
self
}
pub fn with_stats(mut self, stats: Arc<RwLock<HashMap<String, QueueStats>>>) -> Self {
self.stats = stats;
self
}
async fn update_stats(&self, queue_id: &str, f: impl FnOnce(&mut QueueStats)) {
let mut stats = self.stats.write().await;
let stat = stats
.entry(queue_id.to_string())
.or_insert_with(|| QueueStats {
queue_id: queue_id.to_string(),
..Default::default()
});
f(stat);
}
async fn execute_fallback(&mut self) -> anyhow::Result<AppAction> {
info!("Queue: executing fallback action");
self.state = QueueState::ExecutingFallback;
let action = match &self.plan.fallback {
Some(QueueFallbackAction::Failure(failure_action)) => {
self.get_fallback_action(failure_action)
}
Some(QueueFallbackAction::Redirect { target }) => {
info!(target = %target, "Queue: fallback redirect");
AppAction::Transfer(target.to_string())
}
Some(QueueFallbackAction::Queue { name }) => {
if name.starts_with("skill-group:") {
let skill_group_id = name.strip_prefix("skill-group:").unwrap_or(name).trim();
info!(skill_group = %skill_group_id, "Queue: fallback to skill group");
AppAction::Transfer(format!("skill-group:{}", skill_group_id))
} else {
info!(queue = %name, "Queue: fallback to another queue");
AppAction::Transfer(format!("queue:{}", name))
}
}
None => AppAction::Hangup {
reason: Some(CallRecordHangupReason::ServerUnavailable),
code: Some(486),
},
};
Ok(action)
}
fn get_fallback_action(&self, action: &FailureAction) -> AppAction {
match action {
FailureAction::Hangup { code, reason } => {
info!(?code, ?reason, "Queue: hangup fallback");
AppAction::Hangup {
reason: reason
.as_ref()
.map(|_| CallRecordHangupReason::ServerUnavailable),
code: code.as_ref().map(|c| c.code()),
}
}
FailureAction::PlayThenHangup {
audio_file: _,
use_early_media: _,
status_code,
reason,
} => {
info!("Queue: play then hangup fallback");
AppAction::Hangup {
reason: reason
.as_ref()
.map(|_| CallRecordHangupReason::ServerUnavailable),
code: Some(status_code.code()),
}
}
FailureAction::Transfer(endpoint) => {
info!(target = ?endpoint, "Queue: transfer fallback");
match endpoint {
crate::call::TransferEndpoint::Uri(uri) => AppAction::Transfer(uri.to_string()),
crate::call::TransferEndpoint::Queue(queue_name) => {
AppAction::Transfer(format!("queue:{}", queue_name))
}
crate::call::TransferEndpoint::Ivr(ivr_name) => {
AppAction::Transfer(format!("ivr:{}", ivr_name))
}
}
}
}
}
async fn start_hold_music(&mut self, ctrl: &mut CallController) -> anyhow::Result<()> {
if let Some(ref hold) = self.plan.hold
&& let Some(ref audio_file) = hold.audio_file
{
debug!(file = %audio_file, "Queue: starting hold music");
self.hold_playback = Some(ctrl.play_audio(audio_file, true).await?);
}
Ok(())
}
async fn _stop_hold_music(&mut self) {
if self.hold_playback.take().is_some() {
debug!("Queue: stopping hold music");
}
}
fn get_agents(&self) -> Vec<&Location> {
if let Some(ref agents) = self.dynamic_agents {
return agents.iter().collect();
}
match &self.plan.dial_strategy {
Some(DialStrategy::Sequential(locations)) => locations.iter().collect(),
Some(DialStrategy::Parallel(locations)) => locations.iter().collect(),
None => Vec::new(),
}
}
fn is_parallel(&self) -> bool {
matches!(self.plan.dial_strategy, Some(DialStrategy::Parallel(_)))
}
async fn resolve_agents(&mut self) {
if let Some(ref registry) = self.agent_registry {
let queue_id = self.config.name.as_str();
let skills = &self.config.required_skills;
let agents = registry.find_available_agents(skills).await;
if !agents.is_empty() {
let locations: Vec<Location> = agents
.into_iter()
.map(|agent| Location {
aor: agent.uri.parse().unwrap_or_default(),
contact_raw: Some(agent.uri),
..Default::default()
})
.collect();
info!(
"Queue: resolved {} dynamic agents for queue '{}'",
locations.len(),
queue_id
);
self.dynamic_agents = Some(locations);
}
}
}
async fn announce_position(&self, _ctrl: &mut CallController) -> anyhow::Result<()> {
debug!("Queue: position announcement (not implemented)");
Ok(())
}
async fn announce_wait_time(&self, _ctrl: &mut CallController) -> anyhow::Result<()> {
debug!("Queue: wait time announcement (not implemented)");
Ok(())
}
async fn handle_agent_unavailable(
&mut self,
ctrl: &mut CallController,
reason: AgentUnavailableReason,
) -> anyhow::Result<AppAction> {
if !self.is_parallel() {
self.current_agent_idx += 1;
}
self.dial_attempts += 1;
let agents = self.get_agents();
if self.current_agent_idx >= agents.len() {
return match reason {
AgentUnavailableReason::Busy => self.play_busy_and_then_fallback(ctrl).await,
AgentUnavailableReason::NoAnswer => {
self.play_no_answer_and_then_fallback(ctrl).await
}
};
}
self.state = QueueState::DialingAgents {
attempt: self.dial_attempts,
};
Ok(AppAction::Continue)
}
async fn play_busy_and_then_fallback(
&mut self,
ctrl: &mut CallController,
) -> anyhow::Result<AppAction> {
let queue_id = self.config.name.clone();
let wait_secs = self.enqueued_at.map(|t| t.elapsed().as_secs()).unwrap_or(0);
self.update_stats(&queue_id, |stats| {
stats.calls_abandoned += 1;
})
.await;
info!(
queue = %queue_id,
wait_secs,
"Queue: call abandoned, playing busy prompt or fallback"
);
let prompts = self
.plan
.voice_prompts
.as_ref()
.or(self.config.voice_prompts.as_ref());
if let Some(path) = prompts.and_then(|p| p.busy_prompt.as_ref()) {
info!("Queue: playing busy prompt before fallback");
self.state = QueueState::PlayingBusyPrompt;
ctrl.play_audio(path.clone(), false).await?;
return Ok(AppAction::Continue);
}
self.execute_fallback().await
}
async fn play_no_answer_and_then_fallback(
&mut self,
ctrl: &mut CallController,
) -> anyhow::Result<AppAction> {
let queue_id = self.config.name.clone();
let wait_secs = self.enqueued_at.map(|t| t.elapsed().as_secs()).unwrap_or(0);
self.update_stats(&queue_id, |stats| {
stats.calls_abandoned += 1;
})
.await;
info!(
queue = %queue_id,
wait_secs,
"Queue: call abandoned, playing no-answer prompt or fallback"
);
let prompts = self
.plan
.voice_prompts
.as_ref()
.or(self.config.voice_prompts.as_ref());
if let Some(path) = prompts.and_then(|p| p.no_answer_prompt.as_ref()) {
info!("Queue: playing no-answer prompt before fallback");
self.state = QueueState::PlayingNoAnswerPrompt;
ctrl.play_audio(path.clone(), false).await?;
return Ok(AppAction::Continue);
}
self.execute_fallback().await
}
}
#[async_trait]
impl CallApp for QueueApp {
fn app_type(&self) -> CallAppType {
CallAppType::Queue
}
fn name(&self) -> &str {
self.plan.label.as_deref().unwrap_or("queue")
}
async fn on_enter(
&mut self,
ctrl: &mut CallController,
ctx: &ApplicationContext,
) -> anyhow::Result<AppAction> {
let queue_id = self.config.name.clone();
info!(queue = %queue_id, "Queue: entering queue application");
self.state = QueueState::Answering;
self.enqueued_at = Some(Instant::now());
ctx.set_queue_name(&queue_id).await;
self.update_stats(&queue_id, |stats| {
stats.calls_offered += 1;
stats.current_waiting += 1;
})
.await;
if self.config.skill_routing_enabled {
self.resolve_agents().await;
}
let agents = self.get_agents();
if agents.is_empty() {
warn!("Queue: no agents configured, executing fallback");
if !self.answered {
let prompts = self
.plan
.voice_prompts
.as_ref()
.or(self.config.voice_prompts.as_ref());
if prompts.and_then(|p| p.busy_prompt.as_ref()).is_some() {
ctrl.answer().await?;
self.answered = true;
}
}
return self.play_busy_and_then_fallback(ctrl).await;
}
if self.plan.accept_immediately {
info!("Queue: answering call immediately");
ctrl.answer().await?;
self.answered = true;
}
self.start_hold_music(ctrl).await?;
if self.config.announce_position {
self.announce_position(ctrl).await?;
}
if self.config.announce_wait_time {
self.announce_wait_time(ctrl).await?;
}
if self.config.autonomous_routing
&& let Some(ref registry) = self.agent_registry
{
let skills = &self.config.required_skills;
let strategy = self.config.routing_strategy;
if let Some(agent) = registry
.select_agent_with_policy(skills, strategy, self.config.acd_policy.as_deref())
.await
{
info!(agent_id = %agent.agent_id, uri = %agent.uri, "Queue: auto-selecting agent");
let _ = registry
.update_presence(&agent.agent_id, PresenceState::Ringing)
.await;
let _call_id = ctrl
.originate_call(&agent.uri, Some(self.call_id.clone()))
.await?;
ctrl.notify_event(
"queue.agent_ringing",
serde_json::json!({
"call_id": self.call_id,
"agent_id": agent.agent_id,
"agent_uri": agent.uri,
"queue_id": queue_id,
}),
)
.await?;
self.state = QueueState::DialingAgents { attempt: 1 };
self.dial_attempts = 1;
let ring_timeout = self.config.ring_timeout.unwrap_or(Duration::from_secs(20));
ctrl.set_timeout("agent_ring_timeout", ring_timeout);
return Ok(AppAction::Continue);
} else {
warn!("Queue: no available agents for skill routing");
if !self.answered {
let prompts = self
.plan
.voice_prompts
.as_ref()
.or(self.config.voice_prompts.as_ref());
if prompts.and_then(|p| p.busy_prompt.as_ref()).is_some() {
ctrl.answer().await?;
self.answered = true;
}
}
return self.play_busy_and_then_fallback(ctrl).await;
}
}
self.state = QueueState::DialingAgents { attempt: 1 };
self.dial_attempts = 1;
Ok(AppAction::Continue)
}
async fn on_audio_complete(
&mut self,
_track_id: String,
ctrl: &mut CallController,
_ctx: &ApplicationContext,
) -> anyhow::Result<AppAction> {
debug!("Queue: audio playback completed");
match &self.state {
QueueState::PlayingHold { .. } => {
self.start_hold_music(ctrl).await?;
}
QueueState::PlayingTransferPrompt { agent_uri } => {
let agent_uri = agent_uri.clone();
self.state = QueueState::Connected {
agent_uri: agent_uri.clone(),
};
let queue_id = self.config.name.clone();
let wait_secs = self.enqueued_at.map(|t| t.elapsed().as_secs()).unwrap_or(0);
info!(
queue = %queue_id,
agent = %agent_uri,
wait_secs,
"Queue: call connected to agent (after prompt)"
);
return Ok(AppAction::Transfer(agent_uri));
}
QueueState::PlayingBusyPrompt => {
return self.execute_fallback().await;
}
QueueState::PlayingNoAnswerPrompt => {
return self.execute_fallback().await;
}
_ => {}
}
Ok(AppAction::Continue)
}
async fn on_external_event(
&mut self,
event: super::AppEvent,
ctrl: &mut CallController,
_ctx: &ApplicationContext,
) -> anyhow::Result<AppAction> {
let queue_id = self.config.name.clone();
match event {
super::AppEvent::Custom { name, data } => match name.as_str() {
"agent_connected" => {
if let Some(agent_uri) = data.get("agent_uri").and_then(|v| v.as_str()) {
info!(agent = %agent_uri, "Queue: agent connected");
self._stop_hold_music().await;
let wait_secs =
self.enqueued_at.map(|t| t.elapsed().as_secs()).unwrap_or(0);
self.update_stats(&queue_id, |stats| {
stats.calls_answered += 1;
stats.total_wait_secs += wait_secs;
stats.current_waiting = stats.current_waiting.saturating_sub(1);
})
.await;
if let Some(ref registry) = self.agent_registry {
let agent_id = data
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or(agent_uri);
let _ = registry.start_call(agent_id).await;
}
let prompts = self
.plan
.voice_prompts
.as_ref()
.or(self.config.voice_prompts.as_ref());
if let Some(path) = prompts.and_then(|p| p.transfer_prompt.as_ref()) {
info!("Queue: playing transfer prompt before connecting agent");
self.state = QueueState::PlayingTransferPrompt {
agent_uri: agent_uri.to_string(),
};
ctrl.play_audio(path.clone(), false).await?;
return Ok(AppAction::Continue);
}
self.state = QueueState::Connected {
agent_uri: agent_uri.to_string(),
};
info!(
queue = %queue_id,
agent = %agent_uri,
wait_secs,
"Queue: call connected to agent"
);
return Ok(AppAction::Transfer(agent_uri.to_string()));
}
Ok(AppAction::Continue)
}
"agent_ringing" => {
if let Some(agent_id) = data.get("agent_id").and_then(|v| v.as_str()) {
info!(agent = %agent_id, "Queue: agent ringing");
if let Some(ref registry) = self.agent_registry {
let _ = registry
.update_presence(agent_id, PresenceState::Ringing)
.await;
}
}
Ok(AppAction::Continue)
}
"agent_busy" => {
info!("Queue: agent busy");
if let Some(agent_id) = data.get("agent_id").and_then(|v| v.as_str())
&& let Some(ref registry) = self.agent_registry
{
let _ = registry
.update_presence(agent_id, PresenceState::Busy)
.await;
}
self.handle_agent_unavailable(ctrl, AgentUnavailableReason::Busy)
.await
}
"agent_no_answer" => {
info!("Queue: agent no answer");
if let Some(agent_id) = data.get("agent_id").and_then(|v| v.as_str())
&& let Some(ref registry) = self.agent_registry
{
let _ = registry
.update_presence(agent_id, PresenceState::Available)
.await;
}
self.handle_agent_unavailable(ctrl, AgentUnavailableReason::NoAnswer)
.await
}
"all_agents_busy" => {
warn!("Queue: all agents busy");
self.play_busy_and_then_fallback(ctrl).await
}
_ => Ok(AppAction::Continue),
},
_ => Ok(AppAction::Continue),
}
}
async fn on_timeout(
&mut self,
id: String,
ctrl: &mut CallController,
_ctx: &ApplicationContext,
) -> anyhow::Result<AppAction> {
match id.as_str() {
"agent_ring_timeout" => {
info!("Queue: agent ring timeout, handling no-answer");
if let Some(ref registry) = self.agent_registry {
let agents = registry.list_agents().await;
for agent in agents {
if matches!(agent.presence, PresenceState::Ringing) {
let _ = registry
.update_presence(&agent.agent_id, PresenceState::Available)
.await;
ctrl.notify_event(
"queue.agent_no_answer",
serde_json::json!({
"call_id": self.call_id,
"agent_id": agent.agent_id,
"queue_id": self.config.name,
}),
)
.await?;
break;
}
}
}
self.handle_agent_unavailable(ctrl, AgentUnavailableReason::NoAnswer)
.await
}
"max_wait_timeout" => {
info!("Queue: max wait timeout, executing fallback");
ctrl.notify_event(
"queue.timeout",
serde_json::json!({
"call_id": self.call_id,
"queue_id": self.config.name,
"wait_secs": self.enqueued_at.map(|t| t.elapsed().as_secs()).unwrap_or(0),
}),
)
.await?;
self.play_busy_and_then_fallback(ctrl).await
}
_ => Ok(AppAction::Continue),
}
}
async fn on_exit(&mut self, reason: super::ExitReason) -> anyhow::Result<()> {
info!(?reason, "Queue: exiting queue application");
if !matches!(
self.state,
QueueState::Connected { .. } | QueueState::PlayingTransferPrompt { .. }
) {
let queue_id = self.config.name.clone();
self.update_stats(&queue_id, |stats| {
stats.current_waiting = stats.current_waiting.saturating_sub(1);
})
.await;
}
self.state = QueueState::Done;
Ok(())
}
}
pub fn build_queue_app(plan: QueuePlan, config: QueueConfig) -> QueueApp {
QueueApp::new(plan, config)
}