use crate::{
call::{CalleeDisplayName, TransactionCookie, TrunkContext},
callrecord::{
CallDetails, CallRecord, CallRecordHangupMessage, CallRecordHangupReason,
CallRecordLastError, CallRecordMedia, CallRecordRewrite, CallRecordSender,
},
models::call_record::extract_sip_username,
proxy::{
proxy_call::state::{CallContext, CallSessionRecordSnapshot},
server::SipServerRef,
},
};
use chrono::{Duration, Utc};
use rsipstack::sip::prelude::HeadersExt;
use std::{collections::HashMap, fs};
pub struct CallReporter {
pub server: SipServerRef,
pub context: CallContext,
pub call_record_sender: Option<CallRecordSender>,
}
impl CallReporter {
pub(super) fn report(&self, snapshot: CallSessionRecordSnapshot) {
let now = Utc::now();
let start_time =
now - Duration::from_std(self.context.start_time.elapsed()).unwrap_or_default();
let ring_time = snapshot.ring_time.map(|rt| {
start_time
+ Duration::from_std(rt.duration_since(self.context.start_time)).unwrap_or_default()
});
let answer_time = snapshot.answer_time.map(|at| {
start_time
+ Duration::from_std(at.duration_since(self.context.start_time)).unwrap_or_default()
});
let call_was_accepted = snapshot.answer_time.is_some();
let status_code = snapshot
.last_error
.as_ref()
.map(|(code, _)| u16::from(code.clone()))
.unwrap_or(200);
let hangup_reason = snapshot.hangup_reason.clone().or_else(|| {
if snapshot.last_error.is_some() {
Some(CallRecordHangupReason::Failed)
} else if call_was_accepted {
Some(CallRecordHangupReason::BySystem)
} else {
Some(CallRecordHangupReason::Failed)
}
});
let original_caller = snapshot
.original_caller
.clone()
.or_else(|| self.context.dialplan.caller.as_ref().map(|c| c.to_string()))
.unwrap_or_default();
let original_callee = snapshot
.original_callee
.clone()
.or_else(|| {
self.context
.dialplan
.original
.to_header()
.ok()
.and_then(|to_header| to_header.uri().ok().map(|uri| uri.to_string()))
})
.or_else(|| {
self.context
.dialplan
.first_target()
.map(|location| location.aor.to_string())
})
.unwrap_or_else(|| "unknown".to_string());
let caller = snapshot
.routed_caller
.clone()
.unwrap_or_else(|| original_caller.clone());
let callee = snapshot
.routed_callee
.clone()
.or_else(|| snapshot.connected_callee.clone())
.unwrap_or_else(|| original_callee.clone());
let last_error = snapshot
.last_error
.as_ref()
.map(|(code, reason)| CallRecordLastError {
code: u16::from(code.clone()),
reason: reason.clone(),
});
let mut hangup_messages = snapshot.hangup_messages.clone();
if hangup_messages.is_empty()
&& let Some((code, reason)) = snapshot.last_error.as_ref()
{
hangup_messages.push(CallRecordHangupMessage {
code: u16::from(code.clone()),
reason: reason.clone(),
target: None,
});
}
let rewrite = CallRecordRewrite {
caller_original: original_caller.clone(),
caller_final: caller.clone(),
callee_original: original_callee.clone(),
callee_final: callee.clone(),
contact: snapshot.routed_contact.clone(),
destination: snapshot.routed_destination.clone(),
};
let sip_leg_roles = build_sip_leg_roles(&snapshot);
let has_sipflow_backend = self.server.sip_flow.as_ref().is_some();
let direction = self.context.dialplan.direction.to_string();
let status = if call_was_accepted {
"completed".to_string()
} else if snapshot.last_error.is_some() {
"failed".to_string()
} else {
"missed".to_string()
};
let (from_number, from_name, department_id, extension_id) =
resolve_user_info(&self.context.cookie, &caller);
let to_number = extract_sip_username(&callee);
let to_name = self
.context
.cookie
.get_extension::<CalleeDisplayName>()
.map(|e| e.0);
let trunk_context = self.context.cookie.get_extension::<TrunkContext>();
let (sip_gateway, sip_trunk_id) = if let Some(ctx) = trunk_context {
(Some(ctx.name.clone()), ctx.id)
} else {
(None, None)
};
let mut recorder = Vec::new();
if call_was_accepted
&& self.context.dialplan.recording.enabled
&& let Some(recorder_config) = self.context.dialplan.recording.option.as_ref()
&& !recorder_config.recorder_file.is_empty()
{
let size = fs::metadata(&recorder_config.recorder_file)
.map(|meta| meta.len())
.unwrap_or(0);
recorder.push(CallRecordMedia {
track_id: "mixed".to_string(),
path: recorder_config.recorder_file.clone(),
size,
extra: None,
});
}
tracing::info!(
recording = ?self.context.dialplan.recording,
has_sipflow_backend = ?has_sipflow_backend,
call_was_accepted,
"Call recording files collected: {:?}",
recorder
);
let recording_path_for_db = recorder.first().map(|media| media.path.clone());
let mut details = CallDetails {
direction,
status,
from_number,
to_number,
caller_name: from_name,
agent_name: to_name,
queue: snapshot.last_queue_name.clone(),
department_id,
extension_id,
sip_trunk_id,
sip_gateway,
recording_url: recording_path_for_db,
rewrite,
last_error,
metadata: snapshot
.extensions
.get::<HashMap<String, String>>()
.cloned(),
..Default::default()
};
if call_was_accepted
&& details.recording_url.is_none()
&& self.server.proxy_config.recording.is_none()
&& let Some(crate::config::SipFlowConfig::Local {
upload:
Some(crate::config::SipFlowUploadConfig::S3 {
bucket,
endpoint,
root,
..
}),
..
}) = self.server.sipflow_config.as_ref()
{
let date_prefix = start_time.format("%Y%m%d").to_string();
let key = format!("{}/{}.wav", date_prefix, self.context.session_id);
let full_key = if root.is_empty() {
key
} else {
format!("{}/{}", root.trim_end_matches('/'), key)
};
details.recording_url = Some(format!(
"{}/{}/{}",
endpoint.trim_end_matches('/'),
bucket.trim_matches('/'),
full_key.trim_start_matches('/')
));
details.recording_duration_secs = Some((now - start_time).num_seconds().max(0) as i32);
}
let record = CallRecord {
call_id: self.context.session_id.clone(),
start_time,
ring_time,
answer_time,
end_time: now,
caller: caller.clone(),
callee: callee.clone(),
status_code,
hangup_reason: hangup_reason.clone(),
hangup_messages: hangup_messages.clone(),
recorder,
sip_leg_roles,
leg_timeline: crate::callrecord::LegTimeline::default(),
details,
extensions: snapshot.extensions,
};
if let Some(ref sender) = self.call_record_sender {
let _ = sender.send(record);
}
}
}
fn build_sip_leg_roles(snapshot: &CallSessionRecordSnapshot) -> HashMap<String, String> {
let mut sip_leg_roles = HashMap::new();
let caller_call_id = snapshot.server_dialog_id.call_id.clone();
sip_leg_roles.insert(caller_call_id.clone(), "caller".to_string());
for call_id in &snapshot.callee_call_ids {
if call_id != &caller_call_id {
sip_leg_roles.insert(call_id.clone(), "callee".to_string());
}
}
sip_leg_roles
}
fn resolve_user_info(
cookie: &TransactionCookie,
caller_uri: &str,
) -> (Option<String>, Option<String>, Option<i64>, Option<i64>) {
let mut from_number = extract_sip_username(caller_uri);
let (from_display_name, department_id, extension_id) = if let Some(user) = cookie.get_user() {
let mut dept_id = None;
let mut is_wholesale = false;
if let Some(deps) = &user.departments {
for d in deps {
if d.starts_with("tenant:") {
is_wholesale = true;
} else if let Ok(id) = d.parse::<i64>() {
dept_id = Some(id);
}
}
}
if is_wholesale {
from_number = Some(user.username.clone());
}
let ext_id = if user.id > 0 {
Some(user.id as i64)
} else {
None
};
(user.display_name, dept_id, ext_id)
} else {
(None, None, None)
};
(from_number, from_display_name, department_id, extension_id)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::call::SipUser;
#[test]
fn test_resolve_user_info_wholesale() {
let cookie = TransactionCookie::default();
let user = SipUser {
username: "1234".to_string(),
display_name: Some("alice".to_string()),
departments: Some(vec!["tenant:100".to_string()]),
..Default::default()
};
cookie.set_user(user);
let caller = "sip:mock-uuid@1.2.3.4";
let (from, from_name, dept, ext) = resolve_user_info(&cookie, caller);
assert_eq!(from, Some("1234".to_string()));
assert_eq!(from_name, Some("alice".to_string()));
assert_eq!(dept, None);
assert_eq!(ext, None);
}
#[test]
fn test_resolve_user_info_mixed() {
let cookie = TransactionCookie::default();
let user = SipUser {
username: "1234".to_string(),
display_name: Some("alice".to_string()),
departments: Some(vec!["tenant:100".to_string(), "5".to_string()]),
id: 99,
..Default::default()
};
cookie.set_user(user);
let caller = "sip:mock-uuid@1.2.3.4";
let (from, from_name, dept, ext) = resolve_user_info(&cookie, caller);
assert_eq!(from, Some("1234".to_string()));
assert_eq!(from_name, Some("alice".to_string()));
assert_eq!(dept, Some(5));
assert_eq!(ext, Some(99));
}
#[test]
fn test_resolve_user_info_normal() {
let cookie = TransactionCookie::default();
let user = SipUser {
username: "1001".to_string(),
display_name: Some("alice".to_string()),
departments: Some(vec!["5".to_string()]),
id: 99,
..Default::default()
};
cookie.set_user(user);
let caller = "sip:1001@1.2.3.4";
let (from, from_name, dept, ext) = resolve_user_info(&cookie, caller);
assert_eq!(from, Some("1001".to_string()));
assert_eq!(from_name, Some("alice".to_string()));
assert_eq!(dept, Some(5));
assert_eq!(ext, Some(99));
}
#[tokio::test]
async fn test_call_reporter_handles_closed_channel() {
use tokio::sync::mpsc;
let (tx, rx) = mpsc::unbounded_channel::<CallRecord>();
drop(rx);
let record: CallRecord = CallRecord {
call_id: "test-session".to_string(),
start_time: chrono::Utc::now(),
ring_time: None,
answer_time: None,
end_time: chrono::Utc::now(),
caller: "caller".to_string(),
callee: "callee".to_string(),
status_code: 200,
hangup_reason: None,
hangup_messages: vec![],
recorder: vec![],
sip_leg_roles: std::collections::HashMap::new(),
leg_timeline: crate::callrecord::LegTimeline::default(),
details: crate::callrecord::CallDetails::default(),
extensions: http::Extensions::new(),
};
let result = tx.send(record);
assert!(
result.is_err(),
"Sending to closed channel should return Err"
);
}
#[test]
fn test_resolve_user_info_without_user() {
let cookie = TransactionCookie::default();
let caller = "sip:anonymous@1.2.3.4";
let (from, from_name, dept, ext) = resolve_user_info(&cookie, caller);
assert_eq!(from, Some("anonymous".to_string()));
assert_eq!(from_name, None);
assert_eq!(dept, None);
assert_eq!(ext, None);
}
#[test]
fn test_resolve_user_info_with_empty_username() {
let cookie = TransactionCookie::default();
let caller = "sip:@1.2.3.4";
let (from, _, _, _) = resolve_user_info(&cookie, caller);
assert!(from.is_some() || from.is_none()); }
#[test]
fn test_build_sip_leg_roles_uses_callee_call_ids() {
let snapshot = CallSessionRecordSnapshot {
ring_time: None,
answer_time: None,
last_error: None,
hangup_reason: None,
hangup_messages: vec![],
original_caller: None,
original_callee: None,
routed_caller: None,
routed_callee: None,
connected_callee: None,
routed_contact: None,
routed_destination: None,
last_queue_name: None,
callee_call_ids: vec!["callee-call-id".to_string()],
server_dialog_id: rsipstack::dialog::DialogId {
call_id: "caller-call-id".to_string(),
local_tag: "local".to_string(),
remote_tag: "remote".to_string(),
},
extensions: http::Extensions::new(),
};
let roles = build_sip_leg_roles(&snapshot);
assert_eq!(
roles.get("caller-call-id").map(String::as_str),
Some("caller")
);
assert_eq!(
roles.get("callee-call-id").map(String::as_str),
Some("callee")
);
}
}