use crate::proxy::proxy_call::sip_session::SipSessionHandle;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use serde::Serialize;
use std::collections::HashMap;
use tokio::sync::Notify;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize)]
pub enum ActiveProxyCallStatus {
Ringing,
Talking,
}
impl std::fmt::Display for ActiveProxyCallStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ActiveProxyCallStatus::Ringing => write!(f, "ringing"),
ActiveProxyCallStatus::Talking => write!(f, "talking"),
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct ActiveProxyCallEntry {
pub session_id: String,
pub caller: Option<String>,
pub callee: Option<String>,
pub direction: String,
pub started_at: DateTime<Utc>,
pub answered_at: Option<DateTime<Utc>>,
pub status: ActiveProxyCallStatus,
}
#[derive(Default)]
struct RegistryState {
entries: HashMap<String, ActiveProxyCallEntry>,
handles: HashMap<String, SipSessionHandle>,
handles_by_dialog: HashMap<String, SipSessionHandle>,
dialog_by_session: HashMap<String, Vec<String>>,
}
pub struct ActiveProxyCallRegistry {
inner: Mutex<RegistryState>,
change_notify: Notify,
}
impl Default for ActiveProxyCallRegistry {
fn default() -> Self {
Self::new()
}
}
impl ActiveProxyCallRegistry {
pub fn new() -> Self {
Self {
inner: Mutex::new(RegistryState::default()),
change_notify: Notify::new(),
}
}
fn notify_waiters(&self) {
self.change_notify.notify_waiters();
}
pub fn upsert(&self, entry: ActiveProxyCallEntry, handle: SipSessionHandle) {
let mut guard = self.inner.lock();
guard.entries.insert(entry.session_id.clone(), entry);
guard
.handles
.insert(handle.session_id().to_string(), handle);
drop(guard);
self.notify_waiters();
}
pub fn register_dialog(&self, dialog_id: String, handle: SipSessionHandle) {
let mut guard = self.inner.lock();
guard
.dialog_by_session
.entry(handle.session_id().to_string())
.or_default()
.push(dialog_id.clone());
guard.handles_by_dialog.insert(dialog_id, handle);
}
pub fn unregister_dialog(&self, dialog_id: &str) {
let mut guard = self.inner.lock();
if let Some(handle) = guard.handles_by_dialog.remove(dialog_id)
&& let Some(dialogs) = guard.dialog_by_session.get_mut(handle.session_id())
{
dialogs.retain(|d| d != dialog_id);
if dialogs.is_empty() {
guard.dialog_by_session.remove(handle.session_id());
}
}
}
pub fn get_handle_by_dialog(&self, dialog_id: &str) -> Option<SipSessionHandle> {
let guard = self.inner.lock();
guard.handles_by_dialog.get(dialog_id).cloned()
}
pub fn update<F>(&self, session_id: &str, updater: F)
where
F: FnOnce(&mut ActiveProxyCallEntry),
{
if let Some(entry) = self.inner.lock().entries.get_mut(session_id) {
updater(entry);
}
self.notify_waiters();
}
pub fn remove(&self, session_id: &str) {
let mut guard = self.inner.lock();
guard.entries.remove(session_id);
guard.handles.remove(session_id);
if let Some(dialog_ids) = guard.dialog_by_session.remove(session_id) {
for dialog_id in dialog_ids {
guard.handles_by_dialog.remove(&dialog_id);
}
}
}
pub fn count(&self) -> usize {
self.inner.lock().entries.len()
}
pub fn list_recent(&self, limit: usize) -> Vec<ActiveProxyCallEntry> {
let mut entries: Vec<_> = self.inner.lock().entries.values().cloned().collect();
entries.sort_by_key(|b| std::cmp::Reverse(b.started_at));
if entries.len() > limit {
entries.truncate(limit);
}
entries
}
pub fn get(&self, session_id: &str) -> Option<ActiveProxyCallEntry> {
self.inner.lock().entries.get(session_id).cloned()
}
pub fn get_handle(&self, session_id: &str) -> Option<SipSessionHandle> {
self.inner.lock().handles.get(session_id).cloned()
}
pub fn session_ids(&self) -> Vec<String> {
self.inner.lock().entries.keys().cloned().collect()
}
pub fn len(&self) -> usize {
self.count()
}
pub async fn wait_for_status(
&self,
session_id: &str,
target: ActiveProxyCallStatus,
timeout: std::time::Duration,
) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if let Some(entry) = self.get(session_id) {
if entry.status == target {
return true;
}
}
if tokio::time::Instant::now() >= deadline {
return false;
}
tokio::select! {
_ = self.change_notify.notified() => {}
_ = tokio::time::sleep_until(deadline) => return false,
}
}
}
pub fn is_empty(&self) -> bool {
self.count() == 0
}
pub fn register_handle(&self, session_id: String, handle: SipSessionHandle) {
let entry = ActiveProxyCallEntry {
session_id: session_id.clone(),
caller: None,
callee: None,
direction: "inbound".to_string(),
started_at: Utc::now(),
answered_at: None,
status: ActiveProxyCallStatus::Ringing,
};
self.upsert(entry, handle);
}
#[cfg(test)]
pub fn handles_by_dialog_count(&self) -> usize {
self.inner.lock().handles_by_dialog.len()
}
#[cfg(test)]
pub fn dialog_by_session_count(&self) -> usize {
self.inner.lock().dialog_by_session.len()
}
pub fn cleanup_stale(&self, max_age: std::time::Duration) -> usize {
let cutoff = Utc::now()
- chrono::Duration::from_std(max_age).unwrap_or_else(|_| chrono::Duration::hours(1));
let mut guard = self.inner.lock();
let stale_ids: Vec<String> = guard
.entries
.iter()
.filter(|(_, entry)| {
let last_activity = entry.answered_at.unwrap_or(entry.started_at);
last_activity < cutoff
})
.map(|(id, _)| id.clone())
.collect();
let count = stale_ids.len();
for id in stale_ids {
guard.entries.remove(&id);
guard.handles.remove(&id);
if let Some(dialog_ids) = guard.dialog_by_session.remove(&id) {
for dialog_id in dialog_ids {
guard.handles_by_dialog.remove(&dialog_id);
}
}
}
count
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::proxy::proxy_call::sip_session::SipSession;
fn make_handle(session_id: &str) -> SipSessionHandle {
use crate::call::runtime::SessionId;
let id = SessionId::from(session_id);
let (handle, _cmd_rx) = SipSession::with_handle(id);
handle
}
fn make_entry(session_id: &str) -> ActiveProxyCallEntry {
ActiveProxyCallEntry {
session_id: session_id.to_string(),
caller: None,
callee: None,
direction: "outbound".to_string(),
started_at: chrono::Utc::now(),
answered_at: None,
status: ActiveProxyCallStatus::Ringing,
}
}
#[test]
fn test_remove_cleans_all_dialog_handles() {
let registry = ActiveProxyCallRegistry::new();
let session = "session-1";
let handle = make_handle(session);
let entry = make_entry(session);
registry.upsert(entry, handle.clone());
registry.register_dialog("server-dialog".to_string(), handle.clone());
assert_eq!(registry.handles_by_dialog_count(), 1);
registry.register_dialog("callee-dialog-1".to_string(), handle.clone());
assert_eq!(registry.handles_by_dialog_count(), 2);
registry.register_dialog("callee-dialog-2".to_string(), handle.clone());
assert_eq!(registry.handles_by_dialog_count(), 3);
assert_eq!(registry.inner.lock().dialog_by_session[session].len(), 3);
registry.remove(session);
assert_eq!(registry.count(), 0, "entry should be gone");
assert_eq!(
registry.handles_by_dialog_count(),
0,
"all dialog handles must be cleaned up (was leaking before fix)"
);
assert_eq!(
registry.dialog_by_session_count(),
0,
"dialog_by_session must be empty"
);
}
#[test]
fn test_single_trunk_call_no_leak() {
let registry = ActiveProxyCallRegistry::new();
let session = "session-single";
let handle = make_handle(session);
registry.upsert(make_entry(session), handle.clone());
registry.register_dialog("server-dlg".to_string(), handle.clone());
registry.register_dialog("callee-dlg".to_string(), handle.clone());
assert_eq!(registry.handles_by_dialog_count(), 2);
registry.remove(session);
assert_eq!(registry.handles_by_dialog_count(), 0);
assert_eq!(registry.dialog_by_session_count(), 0);
}
#[test]
fn test_unregister_dialog_partial() {
let registry = ActiveProxyCallRegistry::new();
let session = "session-partial";
let handle = make_handle(session);
registry.upsert(make_entry(session), handle.clone());
registry.register_dialog("dlg-a".to_string(), handle.clone());
registry.register_dialog("dlg-b".to_string(), handle.clone());
registry.unregister_dialog("dlg-a");
assert_eq!(registry.handles_by_dialog_count(), 1, "dlg-b should remain");
assert_eq!(registry.inner.lock().dialog_by_session[session].len(), 1);
registry.unregister_dialog("dlg-b");
assert_eq!(registry.handles_by_dialog_count(), 0);
assert_eq!(registry.dialog_by_session_count(), 0);
}
#[test]
fn test_multiple_sessions_independent() {
let registry = ActiveProxyCallRegistry::new();
let h1 = make_handle("s1");
let h2 = make_handle("s2");
registry.upsert(make_entry("s1"), h1.clone());
registry.upsert(make_entry("s2"), h2.clone());
registry.register_dialog("s1-server".to_string(), h1.clone());
registry.register_dialog("s1-callee".to_string(), h1.clone());
registry.register_dialog("s2-server".to_string(), h2.clone());
registry.register_dialog("s2-callee".to_string(), h2.clone());
assert_eq!(registry.handles_by_dialog_count(), 4);
registry.remove("s1");
assert_eq!(registry.count(), 1, "s2 still active");
assert_eq!(
registry.handles_by_dialog_count(),
2,
"only s2 dialogs remain"
);
registry.remove("s2");
assert_eq!(registry.count(), 0);
assert_eq!(registry.handles_by_dialog_count(), 0);
}
}