1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 processor::SubscribeProcessor,
11 recorder::RecorderOption,
12 stream::{MediaStream, MediaStreamBuilder},
13 track::{
14 Track, TrackConfig,
15 file::FileTrack,
16 media_pass::MediaPassTrack,
17 rtc::{RtcTrack, RtcTrackConfig},
18 tts::SynthesisHandle,
19 websocket::{WebsocketBytesReceiver, WebsocketTrack},
20 },
21 },
22 synthesis::{SynthesisCommand, SynthesisOption},
23 transcription::TranscriptionOption,
24};
25use crate::{
26 app::AppState,
27 call::{
28 CommandReceiver, CommandSender,
29 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
30 },
31 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
32 useragent::{
33 invitation::PendingDialog,
34 public_address::{
35 build_public_contact_uri, contact_needs_public_resolution, find_local_addr_for_uri,
36 },
37 },
38};
39use anyhow::Result;
40use audio_codec::CodecType;
41use chrono::{DateTime, Utc};
42use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
43use serde::{Deserialize, Serialize};
44use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
45use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, info, warn};
48
49#[cfg(test)]
50mod tests {
51 use super::*;
52 use crate::app::AppStateBuilder;
53 use crate::callrecord::CallRecordHangupReason;
54 use crate::config::Config;
55 use crate::media::track::tts::SynthesisHandle;
56 use crate::synthesis::SynthesisCommand;
57 use tokio::sync::mpsc;
58
59 #[tokio::test]
60 async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
61 let mut config = Config::default();
62 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
64 let stream_engine = Arc::new(StreamEngine::default());
65 let app_state = AppStateBuilder::new()
66 .with_config(config)
67 .with_stream_engine(stream_engine)
68 .build()
69 .await?;
70
71 let cancel_token = CancellationToken::new();
72 let session_id = "test-session".to_string();
73 let track_config = TrackConfig::default();
74
75 let mut option = crate::CallOption::default();
76 option.tts = Some(crate::synthesis::SynthesisOption::default());
77
78 let active_call = Arc::new(ActiveCall::new(
79 ActiveCallType::Sip,
80 cancel_token.clone(),
81 session_id.clone(),
82 app_state.invitation.clone(),
83 app_state.clone(),
84 track_config,
85 None,
86 false,
87 None,
88 None,
89 None,
90 ));
91
92 {
93 let mut state = active_call.call_state.write().await;
94 state.option = Some(option);
95 }
96
97 let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
98 let initial_ssrc = 12345;
99 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
100
101 {
103 let mut state = active_call.call_state.write().await;
104 state.tts_handle = Some(handle);
105 state.current_play_id = Some("play_1".to_string());
106 }
107
108 active_call
110 .do_tts(
111 "hangup now".to_string(),
112 None,
113 Some("play_1".to_string()),
114 Some(true),
115 false,
116 true,
117 None,
118 None,
119 false,
120 None,
121 )
122 .await?;
123
124 {
126 let state = active_call.call_state.read().await;
127 assert!(state.auto_hangup.is_some());
128 let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
129 assert_eq!(
130 h_ssrc, initial_ssrc,
131 "SSRC should be reused from existing handle"
132 );
133 assert_eq!(reason, CallRecordHangupReason::BySystem);
134 }
135
136 let cmd = rx.try_recv().expect("Should have received tts command");
138 assert_eq!(cmd.text, "hangup now");
139
140 Ok(())
141 }
142
143 #[tokio::test]
144 async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
145 let mut config = Config::default();
146 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
148 let stream_engine = Arc::new(StreamEngine::default());
149 let app_state = AppStateBuilder::new()
150 .with_config(config)
151 .with_stream_engine(stream_engine)
152 .build()
153 .await?;
154
155 let active_call = Arc::new(ActiveCall::new(
156 ActiveCallType::Sip,
157 CancellationToken::new(),
158 "test-session".to_string(),
159 app_state.invitation.clone(),
160 app_state.clone(),
161 TrackConfig::default(),
162 None,
163 false,
164 None,
165 None,
166 None,
167 ));
168
169 let mut tts_opt = crate::synthesis::SynthesisOption::default();
170 tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
171 let mut option = crate::CallOption::default();
172 option.tts = Some(tts_opt);
173 {
174 let mut state = active_call.call_state.write().await;
175 state.option = Some(option);
176 }
177
178 let (tx, _rx) = mpsc::unbounded_channel();
179 let initial_ssrc = 111;
180 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
181
182 {
183 let mut state = active_call.call_state.write().await;
184 state.tts_handle = Some(handle);
185 state.current_play_id = Some("play_1".to_string());
186 }
187
188 active_call
190 .do_tts(
191 "new play".to_string(),
192 None,
193 Some("play_2".to_string()),
194 Some(true),
195 false,
196 true,
197 None,
198 None,
199 false,
200 None,
201 )
202 .await?;
203
204 {
206 let state = active_call.call_state.read().await;
207 let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
208 assert_ne!(
209 h_ssrc, initial_ssrc,
210 "Should use a new SSRC for different play_id"
211 );
212 }
213
214 Ok(())
215 }
216}
217
218#[derive(Deserialize)]
219#[serde(rename_all = "camelCase")]
220pub struct CallParams {
221 pub id: Option<String>,
222 #[serde(rename = "dump")]
223 pub dump_events: Option<bool>,
224 #[serde(rename = "ping")]
225 pub ping_interval: Option<u32>,
226 pub server_side_track: Option<String>,
227}
228
229#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
230#[serde(rename_all = "camelCase")]
231pub enum ActiveCallType {
232 Webrtc,
233 B2bua,
234 WebSocket,
235 #[default]
236 Sip,
237}
238
239#[derive(Default)]
240pub struct ActiveCallState {
241 pub session_id: String,
242 pub start_time: DateTime<Utc>,
243 pub ring_time: Option<DateTime<Utc>>,
244 pub answer_time: Option<DateTime<Utc>>,
245 pub hangup_reason: Option<CallRecordHangupReason>,
246 pub last_status_code: u16,
247 pub option: Option<CallOption>,
248 pub answer: Option<String>,
249 pub ssrc: u32,
250 pub refer_callstate: Option<ActiveCallStateRef>,
251 pub extras: Option<HashMap<String, serde_json::Value>>,
252 pub is_refer: bool,
253 pub sip_hangup_headers_template: Option<HashMap<String, String>>,
254
255 pub tts_handle: Option<SynthesisHandle>,
257 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
258 pub wait_input_timeout: Option<u32>,
259 pub moh: Option<String>,
260 pub current_play_id: Option<String>,
261 pub audio_receiver: Option<WebsocketBytesReceiver>,
262 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
263 pub pending_asr_resume: Option<(u32, TranscriptionOption)>,
264}
265
266pub type ActiveCallRef = Arc<ActiveCall>;
267pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
268
269pub struct ActiveCall {
270 pub call_state: ActiveCallStateRef,
271 pub cancel_token: CancellationToken,
272 pub call_type: ActiveCallType,
273 pub session_id: String,
274 pub media_stream: Arc<MediaStream>,
275 pub track_config: TrackConfig,
276 pub event_sender: EventSender,
277 pub app_state: AppState,
278 pub invitation: Invitation,
279 pub cmd_sender: CommandSender,
280 pub dump_events: bool,
281 pub server_side_track_id: TrackId,
282}
283
284pub struct ActiveCallGuard {
285 pub call: ActiveCallRef,
286 pub active_calls: usize,
287}
288
289impl ActiveCallGuard {
290 pub fn new(call: ActiveCallRef) -> Self {
291 let active_calls = {
292 call.app_state
293 .total_calls
294 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
295 let mut calls = call.app_state.active_calls.lock().unwrap();
296 calls.insert(call.session_id.clone(), call.clone());
297 calls.len()
298 };
299 Self { call, active_calls }
300 }
301}
302
303impl Drop for ActiveCallGuard {
304 fn drop(&mut self) {
305 self.call
306 .app_state
307 .active_calls
308 .lock()
309 .unwrap()
310 .remove(&self.call.session_id);
311 }
312}
313
314pub struct ActiveCallReceiver {
315 pub cmd_receiver: CommandReceiver,
316 pub dump_cmd_receiver: CommandReceiver,
317 pub dump_event_receiver: EventReceiver,
318}
319
320impl ActiveCall {
321 pub fn new(
322 call_type: ActiveCallType,
323 cancel_token: CancellationToken,
324 session_id: String,
325 invitation: Invitation,
326 app_state: AppState,
327 track_config: TrackConfig,
328 audio_receiver: Option<WebsocketBytesReceiver>,
329 dump_events: bool,
330 server_side_track_id: Option<TrackId>,
331 extras: Option<HashMap<String, serde_json::Value>>,
332 sip_hangup_headers_template: Option<HashMap<String, String>>,
333 ) -> Self {
334 let event_sender = crate::event::create_event_sender();
335 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
336 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
337 .with_id(session_id.clone())
338 .with_cancel_token(cancel_token.child_token());
339 let media_stream = Arc::new(media_stream_builder.build());
340 let start_time = Utc::now();
341 let call_type_str = match &call_type {
343 ActiveCallType::Sip => "sip",
344 ActiveCallType::WebSocket => "websocket",
345 ActiveCallType::Webrtc => "webrtc",
346 ActiveCallType::B2bua => "b2bua",
347 };
348 let extras = {
349 let mut e = extras.unwrap_or_default();
350 e.entry(crate::playbook::BUILTIN_SESSION_ID.to_string())
351 .or_insert_with(|| serde_json::Value::String(session_id.clone()));
352 e.entry(crate::playbook::BUILTIN_CALL_TYPE.to_string())
353 .or_insert_with(|| serde_json::Value::String(call_type_str.to_string()));
354 e.entry(crate::playbook::BUILTIN_START_TIME.to_string())
355 .or_insert_with(|| serde_json::Value::String(start_time.to_rfc3339()));
356 Some(e)
357 };
358 let call_state = Arc::new(RwLock::new(ActiveCallState {
359 session_id: session_id.clone(),
360 start_time,
361 ssrc: rand::random::<u32>(),
362 extras,
363 audio_receiver,
364 sip_hangup_headers_template,
365 ..Default::default()
366 }));
367 Self {
368 cancel_token,
369 call_type,
370 session_id,
371 call_state,
372 media_stream,
373 track_config,
374 event_sender,
375 app_state,
376 invitation,
377 cmd_sender,
378 dump_events,
379 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
380 }
381 }
382
383 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
384 self.cmd_sender
385 .send(command)
386 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
387 Ok(())
388 }
389
390 pub fn new_receiver(&self) -> ActiveCallReceiver {
394 ActiveCallReceiver {
395 cmd_receiver: self.cmd_sender.subscribe(),
396 dump_cmd_receiver: self.cmd_sender.subscribe(),
397 dump_event_receiver: self.event_sender.subscribe(),
398 }
399 }
400
401 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
402 let ActiveCallReceiver {
403 mut cmd_receiver,
404 dump_cmd_receiver,
405 dump_event_receiver,
406 } = receiver;
407
408 let process_command_loop = async move {
409 while let Ok(command) = cmd_receiver.recv().await {
410 match self.dispatch(command).await {
411 Ok(_) => (),
412 Err(e) => {
413 warn!(session_id = self.session_id, "{}", e);
414 self.event_sender
415 .send(SessionEvent::Error {
416 track_id: self.session_id.clone(),
417 timestamp: crate::media::get_timestamp(),
418 sender: "command".to_string(),
419 error: e.to_string(),
420 code: None,
421 })
422 .ok();
423 }
424 }
425 }
426 };
427 self.app_state
428 .total_calls
429 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
430
431 tokio::join!(
432 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
433 async {
434 select! {
435 _ = process_command_loop => {
436 info!(session_id = self.session_id, "command loop done");
437 }
438 _ = self.process() => {
439 info!(session_id = self.session_id, "call serve done");
440 }
441 _ = self.cancel_token.cancelled() => {
442 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
443 }
444 }
445 self.cancel_token.cancel();
446 }
447 );
448 Ok(())
449 }
450
451 async fn process(&self) -> Result<()> {
452 let mut event_receiver = self.event_sender.subscribe();
453
454 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
455 let input_timeout_expire_ref = input_timeout_expire.clone();
456 let event_sender = self.event_sender.clone();
457 let wait_input_timeout_loop = async {
458 loop {
459 let (start_time, expire) = { *input_timeout_expire.lock().await };
460 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
461 info!(session_id = self.session_id, "wait input timeout reached");
462 *input_timeout_expire.lock().await = (0, 0);
463 event_sender
464 .send(SessionEvent::Silence {
465 track_id: self.server_side_track_id.clone(),
466 timestamp: crate::media::get_timestamp(),
467 start_time,
468 duration: expire as u64,
469 samples: None,
470 })
471 .ok();
472 }
473 sleep(Duration::from_millis(100)).await;
474 }
475 };
476 let server_side_track_id = self.server_side_track_id.clone();
477 let event_hook_loop = async move {
478 while let Ok(event) = event_receiver.recv().await {
479 match event {
480 SessionEvent::Speaking { .. }
481 | SessionEvent::Dtmf { .. }
482 | SessionEvent::AsrDelta { .. }
483 | SessionEvent::AsrFinal { .. }
484 | SessionEvent::TrackStart { .. } => {
485 *input_timeout_expire_ref.lock().await = (0, 0);
486 }
487 SessionEvent::TrackEnd {
488 track_id,
489 play_id,
490 ssrc,
491 ..
492 } => {
493 if track_id != server_side_track_id {
494 continue;
495 }
496
497 let (moh_path, auto_hangup, wait_timeout_val) = {
498 let mut state = self.call_state.write().await;
499 if play_id != state.current_play_id {
500 debug!(
501 session_id = self.session_id,
502 ?play_id,
503 current = ?state.current_play_id,
504 "ignoring interrupted track end"
505 );
506 continue;
507 }
508 state.current_play_id = None;
509 (
510 state.moh.clone(),
511 state.auto_hangup.clone(),
512 state.wait_input_timeout.take(),
513 )
514 };
515
516 if let Some(path) = moh_path {
517 info!(session_id = self.session_id, "looping moh: {}", path);
518 let ssrc = rand::random::<u32>();
519 let file_track = FileTrack::new(self.server_side_track_id.clone())
520 .with_play_id(Some(path.clone()))
521 .with_ssrc(ssrc)
522 .with_path(path.clone())
523 .with_cancel_token(self.cancel_token.child_token());
524 self.update_track_wrapper(Box::new(file_track), Some(path))
525 .await;
526 continue;
527 }
528
529 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
530 if hangup_ssrc == ssrc {
531 info!(
532 session_id = self.session_id,
533 ssrc, "auto hangup when track end track_id:{}", track_id
534 );
535 self.do_hangup(Some(hangup_reason), None, None).await.ok();
536 }
537 }
538
539 if let Some(timeout) = wait_timeout_val {
540 let expire = if timeout > 0 {
541 (crate::media::get_timestamp(), timeout)
542 } else {
543 (0, 0)
544 };
545 *input_timeout_expire_ref.lock().await = expire;
546 }
547 }
548 SessionEvent::Interrupt { receiver } => {
549 let track_id =
550 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
551 if track_id == self.server_side_track_id {
552 debug!(
553 session_id = self.session_id,
554 "received interrupt event, stopping playback"
555 );
556 self.do_interrupt(true).await.ok();
557 }
558 }
559 SessionEvent::Inactivity { track_id, .. } => {
560 info!(
561 session_id = self.session_id,
562 track_id, "inactivity timeout reached, hanging up"
563 );
564 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None, None)
565 .await
566 .ok();
567 }
568 SessionEvent::Hangup { refer, .. } => {
569 if refer == Some(true) {
571 let mut cs = self.call_state.write().await;
572 if let Some((refer_ssrc, asr_option)) = cs.pending_asr_resume.take() {
573 let is_refer_hangup = cs
575 .refer_callstate
576 .as_ref()
577 .map(|rcs| {
578 rcs.try_read()
579 .map(|g| g.ssrc == refer_ssrc)
580 .unwrap_or(false)
581 })
582 .unwrap_or(false);
583
584 if is_refer_hangup {
585 drop(cs); info!(
587 session_id = self.session_id,
588 "Refer call ended, resuming parent ASR"
589 );
590
591 match self
593 .app_state
594 .stream_engine
595 .create_asr_processor(
596 self.server_side_track_id.clone(),
597 self.cancel_token.child_token(),
598 asr_option,
599 self.event_sender.clone(),
600 )
601 .await
602 {
603 Ok(asr_processor) => {
604 if let Err(e) = self
605 .media_stream
606 .append_processor(
607 &self.server_side_track_id,
608 asr_processor,
609 )
610 .await
611 {
612 warn!(
613 session_id = self.session_id,
614 "Failed to resume ASR after refer: {}", e
615 );
616 }
617 }
618 Err(e) => {
619 warn!(
620 session_id = self.session_id,
621 "Failed to create ASR processor for resume: {}", e
622 );
623 }
624 }
625 }
626 }
627 }
628 }
629 SessionEvent::Error { track_id, .. } => {
630 if track_id != server_side_track_id {
631 continue;
632 }
633
634 let moh_info = {
635 let mut state = self.call_state.write().await;
636 if let Some(path) = state.moh.clone() {
637 let fallback = "./config/sounds/refer_moh.wav".to_string();
638 let next_path = if path != fallback
639 && std::path::Path::new(&fallback).exists()
640 {
641 info!(
642 session_id = self.session_id,
643 "moh error, switching to fallback: {}", fallback
644 );
645 state.moh = Some(fallback.clone());
646 fallback
647 } else {
648 info!(
649 session_id = self.session_id,
650 "looping moh on error: {}", path
651 );
652 path
653 };
654 Some(next_path)
655 } else {
656 None
657 }
658 };
659
660 if let Some(next_path) = moh_info {
661 let ssrc = rand::random::<u32>();
662 let file_track = FileTrack::new(self.server_side_track_id.clone())
663 .with_play_id(Some(next_path.clone()))
664 .with_ssrc(ssrc)
665 .with_path(next_path.clone())
666 .with_cancel_token(self.cancel_token.child_token());
667 self.update_track_wrapper(Box::new(file_track), Some(next_path))
668 .await;
669 continue;
670 }
671 }
672 _ => {}
673 }
674 }
675 };
676
677 select! {
678 _ = wait_input_timeout_loop=>{
679 info!(session_id = self.session_id, "wait input timeout loop done");
680 }
681 _ = self.media_stream.serve() => {
682 info!(session_id = self.session_id, "media stream loop done");
683 }
684 _ = event_hook_loop => {
685 info!(session_id = self.session_id, "event loop done");
686 }
687 }
688 Ok(())
689 }
690
691 async fn dispatch(&self, command: Command) -> Result<()> {
692 match command {
693 Command::Invite { option } => self.do_invite(option).await,
694 Command::Accept { option } => self.do_accept(option).await,
695 Command::Reject { reason, code } => {
696 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
697 .await
698 }
699 Command::Ringing {
700 ringtone,
701 recorder,
702 early_media,
703 } => self.do_ringing(ringtone, recorder, early_media).await,
704 Command::Tts {
705 text,
706 speaker,
707 play_id,
708 auto_hangup,
709 streaming,
710 end_of_stream,
711 option,
712 wait_input_timeout,
713 base64,
714 cache_key,
715 } => {
716 self.do_tts(
717 text,
718 speaker,
719 play_id,
720 auto_hangup,
721 streaming.unwrap_or_default(),
722 end_of_stream.unwrap_or_default(),
723 option,
724 wait_input_timeout,
725 base64.unwrap_or_default(),
726 cache_key,
727 )
728 .await
729 }
730 Command::Play {
731 url,
732 play_id,
733 auto_hangup,
734 wait_input_timeout,
735 } => {
736 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
737 .await
738 }
739 Command::Hangup {
740 reason,
741 initiator,
742 headers,
743 } => {
744 let reason = reason.map(|r| {
745 r.parse::<CallRecordHangupReason>()
746 .unwrap_or(CallRecordHangupReason::BySystem)
747 });
748 self.do_hangup(reason, initiator, headers).await
749 }
750 Command::Refer {
751 caller,
752 callee,
753 options,
754 } => self.do_refer(caller, callee, options).await,
755 Command::Mute { track_id } => self.do_mute(track_id).await,
756 Command::Unmute { track_id } => self.do_unmute(track_id).await,
757 Command::Pause {} => self.do_pause().await,
758 Command::Resume {} => self.do_resume().await,
759 Command::Interrupt {
760 graceful: passage,
761 fade_out_ms: _,
762 } => self.do_interrupt(passage.unwrap_or_default()).await,
763 Command::History { speaker, text } => self.do_history(speaker, text).await,
764 }
765 }
766
767 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
768 if let Some(recorder_option) = &option.recorder {
769 let mut recorder_file = recorder_option.recorder_file.clone();
770 if recorder_file.contains("{id}") {
771 recorder_file = recorder_file.replace("{id}", &self.session_id);
772 }
773
774 let recorder_file = if recorder_file.is_empty() {
775 self.app_state.get_recorder_file(&self.session_id)
776 } else {
777 let p = Path::new(&recorder_file);
778 p.is_absolute()
779 .then(|| recorder_file.clone())
780 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
781 };
782 info!(
783 session_id = self.session_id,
784 recorder_file, "created recording file"
785 );
786
787 let track_samplerate = self.track_config.samplerate;
788 let recorder_samplerate = if track_samplerate > 0 {
789 track_samplerate
790 } else {
791 recorder_option.samplerate
792 };
793 let recorder_ptime = if recorder_option.ptime == 0 {
794 200
795 } else {
796 recorder_option.ptime
797 };
798 let requested_format = recorder_option
799 .format
800 .unwrap_or(self.app_state.config.recorder_format());
801 let format = requested_format.effective();
802 if requested_format != format {
803 warn!(
804 session_id = self.session_id,
805 requested = requested_format.extension(),
806 "Recorder format fallback to wav due to unsupported feature"
807 );
808 }
809 let mut recorder_config = RecorderOption {
810 recorder_file,
811 samplerate: recorder_samplerate,
812 ptime: recorder_ptime,
813 format: Some(format),
814 };
815 recorder_config.ensure_path_extension(format);
816 Some(recorder_config)
817 } else {
818 None
819 }
820 }
821
822 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
823 {
825 let state = self.call_state.read().await;
826 option = state.merge_option(option);
827 }
828
829 option.check_default();
830 if let Some(opt) = self.build_record_option(&option) {
831 self.media_stream.update_recorder_option(opt).await;
832 }
833
834 if let Some(opt) = &option.media_pass {
835 let track_id = self.server_side_track_id.clone();
836 let cancel_token = self.cancel_token.child_token();
837 let ssrc = rand::random::<u32>();
838 let media_pass_track = MediaPassTrack::new(
839 self.session_id.clone(),
840 ssrc,
841 track_id,
842 cancel_token,
843 opt.clone(),
844 );
845 self.update_track_wrapper(Box::new(media_pass_track), None)
846 .await;
847 }
848
849 info!(
850 session_id = self.session_id,
851 call_type = ?self.call_type,
852 sender,
853 ?option,
854 "caller with option"
855 );
856
857 match self.setup_caller_track(&option).await {
858 Ok(_) => return Ok(option),
859 Err(e) => {
860 self.app_state
861 .total_failed_calls
862 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
863 let error_event = crate::event::SessionEvent::Error {
864 track_id: self.session_id.clone(),
865 timestamp: crate::media::get_timestamp(),
866 sender,
867 error: e.to_string(),
868 code: None,
869 };
870 self.event_sender.send(error_event).ok();
871 self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
872 .await
873 .ok();
874 return Err(e);
875 }
876 }
877 }
878
879 async fn do_invite(&self, option: CallOption) -> Result<()> {
880 self.invite_or_accept(option, "invite".to_string())
881 .await
882 .map(|_| ())
883 }
884
885 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
886 let has_pending = self
887 .invitation
888 .find_dialog_id_by_session_id(&self.session_id)
889 .is_some();
890 let ready_to_answer_val = {
891 let state = self.call_state.read().await;
892 state.ready_to_answer.is_none()
893 };
894
895 if ready_to_answer_val {
896 if !has_pending {
897 warn!(session_id = self.session_id, "no pending call to accept");
899 let rejet_event = crate::event::SessionEvent::Reject {
900 track_id: self.session_id.clone(),
901 timestamp: crate::media::get_timestamp(),
902 reason: "no pending call".to_string(),
903 refer: None,
904 code: Some(486),
905 };
906 self.event_sender.send(rejet_event).ok();
907 self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
908 .await
909 .ok();
910 return Err(anyhow::anyhow!("no pending call to accept"));
911 }
912 option = self.invite_or_accept(option, "accept".to_string()).await?;
913 } else {
914 option.check_default();
915 self.call_state.write().await.option = Some(option.clone());
916 }
917 info!(session_id = self.session_id, ?option, "accepting call");
918 let ready = self.call_state.write().await.ready_to_answer.take();
919 if let Some((answer, track, dialog)) = ready {
920 info!(
921 session_id = self.session_id,
922 track_id = track.as_ref().map(|t| t.id()),
923 "ready to answer with track"
924 );
925
926 let headers = vec![rsip::Header::ContentType(
927 "application/sdp".to_string().into(),
928 )];
929
930 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
931 Ok(_) => {
932 {
933 let mut state = self.call_state.write().await;
934 state.answer = Some(answer);
935 state.answer_time = Some(Utc::now());
936 }
937 self.finish_caller_stack(&option, track).await?;
938 }
939 Err(e) => {
940 warn!(session_id = self.session_id, "failed to accept call: {}", e);
941 return Err(anyhow::anyhow!("failed to accept call"));
942 }
943 }
944 }
945 return Ok(());
946 }
947
948 async fn do_reject(
949 &self,
950 code: Option<rsip::StatusCode>,
951 reason: Option<String>,
952 ) -> Result<()> {
953 match self
954 .invitation
955 .find_dialog_id_by_session_id(&self.session_id)
956 {
957 Some(id) => {
958 info!(
959 session_id = self.session_id,
960 ?reason,
961 ?code,
962 "rejecting call"
963 );
964 self.invitation.hangup(id, code, reason).await
965 }
966 None => Ok(()),
967 }
968 }
969
970 async fn do_ringing(
971 &self,
972 ringtone: Option<String>,
973 recorder: Option<RecorderOption>,
974 early_media: Option<bool>,
975 ) -> Result<()> {
976 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
977 if ready_to_answer_val {
978 let option = CallOption {
979 recorder,
980 ..Default::default()
981 };
982 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
983 }
984
985 let state = self.call_state.read().await;
986 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
987 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
988 let headers = vec![rsip::Header::ContentType(
989 "application/sdp".to_string().into(),
990 )];
991 (Some(headers), Some(answer.as_bytes().to_vec()))
992 } else {
993 (None, None)
994 };
995
996 dialog.ringing(headers, body).ok();
997 info!(
998 session_id = self.session_id,
999 ringtone, early_media, "playing ringtone"
1000 );
1001 if let Some(ringtone_url) = ringtone {
1002 drop(state);
1003 self.do_play(ringtone_url, None, None, None).await.ok();
1004 } else {
1005 info!(session_id = self.session_id, "no ringtone to play");
1006 }
1007 }
1008 Ok(())
1009 }
1010
1011 async fn do_tts(
1012 &self,
1013 text: String,
1014 speaker: Option<String>,
1015 play_id: Option<String>,
1016 auto_hangup: Option<bool>,
1017 streaming: bool,
1018 end_of_stream: bool,
1019 option: Option<SynthesisOption>,
1020 wait_input_timeout: Option<u32>,
1021 base64: bool,
1022 cache_key: Option<String>,
1023 ) -> Result<()> {
1024 let tts_option = {
1025 let call_state = self.call_state.read().await;
1026 match call_state.option.clone().unwrap_or_default().tts {
1027 Some(opt) => opt.merge_with(option),
1028 None => {
1029 if let Some(opt) = option {
1030 opt
1031 } else {
1032 return Err(anyhow::anyhow!("no tts option available"));
1033 }
1034 }
1035 }
1036 };
1037 let speaker = match speaker {
1038 Some(s) => Some(s),
1039 None => tts_option.speaker.clone(),
1040 };
1041
1042 let mut play_command = SynthesisCommand {
1043 text,
1044 speaker,
1045 play_id: play_id.clone(),
1046 streaming,
1047 end_of_stream,
1048 option: tts_option,
1049 base64,
1050 cache_key,
1051 };
1052 info!(
1053 session_id = self.session_id,
1054 provider = ?play_command.option.provider,
1055 text = %play_command.text.chars().take(10).collect::<String>(),
1056 speaker = play_command.speaker.as_deref(),
1057 auto_hangup = auto_hangup.unwrap_or_default(),
1058 play_id = play_command.play_id.as_deref(),
1059 streaming = play_command.streaming,
1060 end_of_stream = play_command.end_of_stream,
1061 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
1062 is_base64 = play_command.base64,
1063 cache_key = play_command.cache_key.as_deref(),
1064 "new synthesis"
1065 );
1066
1067 let ssrc = rand::random::<u32>();
1068 let (should_interrupt, picked_ssrc) = {
1069 let mut state = self.call_state.write().await;
1070
1071 let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
1072 if play_id.is_some() && state.current_play_id != play_id {
1073 (ssrc, true)
1074 } else {
1075 (handle.ssrc, false)
1076 }
1077 } else {
1078 (ssrc, false)
1079 };
1080
1081 state.auto_hangup = match auto_hangup {
1082 Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
1083 _ => state.auto_hangup.clone(),
1084 };
1085 state.wait_input_timeout = wait_input_timeout;
1086
1087 state.current_play_id = play_id.clone();
1088 (changed, target_ssrc)
1089 };
1090
1091 if should_interrupt {
1092 let _ = self.do_interrupt(false).await;
1093 }
1094
1095 let existing_handle = self.call_state.read().await.tts_handle.clone();
1096 if let Some(tts_handle) = existing_handle {
1097 match tts_handle.try_send(play_command) {
1098 Ok(_) => return Ok(()),
1099 Err(e) => {
1100 play_command = e.0;
1101 }
1102 }
1103 }
1104
1105 let (new_handle, tts_track) = StreamEngine::create_tts_track(
1106 self.app_state.stream_engine.clone(),
1107 self.cancel_token.child_token(),
1108 self.session_id.clone(),
1109 self.server_side_track_id.clone(),
1110 picked_ssrc,
1111 play_id.clone(),
1112 streaming,
1113 &play_command.option,
1114 )
1115 .await?;
1116
1117 new_handle.try_send(play_command)?;
1118 self.call_state.write().await.tts_handle = Some(new_handle);
1119 self.update_track_wrapper(tts_track, play_id).await;
1120 Ok(())
1121 }
1122
1123 async fn do_play(
1124 &self,
1125 url: String,
1126 play_id: Option<String>,
1127 auto_hangup: Option<bool>,
1128 wait_input_timeout: Option<u32>,
1129 ) -> Result<()> {
1130 let ssrc = rand::random::<u32>();
1131 info!(
1132 session_id = self.session_id,
1133 ssrc, url, play_id, auto_hangup, "play file track"
1134 );
1135
1136 let play_id = play_id.or(Some(url.clone()));
1137
1138 let file_track = FileTrack::new(self.server_side_track_id.clone())
1139 .with_play_id(play_id.clone())
1140 .with_ssrc(ssrc)
1141 .with_path(url)
1142 .with_cancel_token(self.cancel_token.child_token());
1143
1144 {
1145 let mut state = self.call_state.write().await;
1146 state.tts_handle = None;
1147 state.auto_hangup = match auto_hangup {
1148 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1149 _ => None,
1150 };
1151 state.wait_input_timeout = wait_input_timeout;
1152 }
1153
1154 self.update_track_wrapper(Box::new(file_track), play_id)
1155 .await;
1156 Ok(())
1157 }
1158
1159 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1160 self.event_sender
1161 .send(SessionEvent::AddHistory {
1162 sender: Some(self.session_id.clone()),
1163 timestamp: crate::media::get_timestamp(),
1164 speaker,
1165 text,
1166 })
1167 .map(|_| ())
1168 .map_err(Into::into)
1169 }
1170
1171 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1172 {
1173 let mut state = self.call_state.write().await;
1174 state.tts_handle = None;
1175 state.moh = None;
1176 }
1177 self.media_stream
1178 .remove_track(&self.server_side_track_id, graceful)
1179 .await;
1180 Ok(())
1181 }
1182 async fn do_pause(&self) -> Result<()> {
1183 Ok(())
1184 }
1185 async fn do_resume(&self) -> Result<()> {
1186 Ok(())
1187 }
1188 async fn do_hangup(
1189 &self,
1190 reason: Option<CallRecordHangupReason>,
1191 initiator: Option<String>,
1192 headers: Option<HashMap<String, String>>,
1193 ) -> Result<()> {
1194 info!(
1195 session_id = self.session_id,
1196 ?reason,
1197 ?initiator,
1198 ?headers,
1199 "do_hangup"
1200 );
1201
1202 if let Some(headers) = headers {
1204 let h_val = serde_json::to_value(&headers).unwrap_or_default();
1205
1206 {
1207 let mut state = self.call_state.write().await;
1208 let mut extras = state.extras.take().unwrap_or_default();
1209 extras.insert("_hangup_headers".to_string(), h_val.clone());
1210 state.extras = Some(extras);
1211 }
1212 } else {
1213 }
1214
1215 let hangup_reason = match initiator.as_deref() {
1216 Some("caller") => CallRecordHangupReason::ByCaller,
1217 Some("callee") => CallRecordHangupReason::ByCallee,
1218 Some("system") => CallRecordHangupReason::Autohangup,
1219 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1220 };
1221
1222 self.media_stream
1223 .stop(Some(hangup_reason.to_string()), initiator);
1224
1225 self.call_state
1226 .write()
1227 .await
1228 .set_hangup_reason(hangup_reason);
1229 Ok(())
1230 }
1231
1232 async fn do_refer(
1233 &self,
1234 caller: String,
1235 callee: String,
1236 refer_option: Option<ReferOption>,
1237 ) -> Result<()> {
1238 self.do_interrupt(false).await.ok();
1239
1240 let pause_parent_asr = refer_option
1242 .as_ref()
1243 .and_then(|o| o.pause_parent_asr)
1244 .unwrap_or(false);
1245
1246 let original_asr_option = if pause_parent_asr {
1248 let cs = self.call_state.read().await;
1249 cs.option.as_ref().and_then(|o| o.asr.clone())
1250 } else {
1251 None
1252 };
1253
1254 if pause_parent_asr {
1256 info!(
1257 session_id = self.session_id,
1258 "Pausing parent call ASR during refer"
1259 );
1260 self.media_stream
1261 .remove_processor::<crate::media::asr_processor::AsrProcessor>(
1262 &self.server_side_track_id,
1263 )
1264 .await
1265 .ok();
1266 }
1267
1268 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1269 if let Some(ref path) = moh {
1270 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1271 let fallback = "./config/sounds/refer_moh.wav";
1272 if std::path::Path::new(fallback).exists() {
1273 info!(
1274 session_id = self.session_id,
1275 "moh {} not found, using fallback {}", path, fallback
1276 );
1277 moh = Some(fallback.to_string());
1278 }
1279 }
1280 }
1281 let ref_call_id = refer_option
1282 .as_ref()
1283 .and_then(|o| o.call_id.clone())
1284 .unwrap_or_else(|| format!("ref-{}-{}", rand::random::<u32>(), self.session_id));
1285
1286 let session_id = self.session_id.clone();
1287 let track_id = self.server_side_track_id.clone();
1288
1289 let recorder = {
1290 let cs = self.call_state.read().await;
1291 cs.option
1292 .as_ref()
1293 .map(|o| o.recorder.clone())
1294 .unwrap_or_default()
1295 };
1296
1297 let call_option = CallOption {
1298 caller: Some(caller),
1299 callee: Some(callee.clone()),
1300 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1301 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1302 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1303 recorder,
1304 ..Default::default()
1305 };
1306
1307 let mut invite_option = call_option.build_invite_option()?;
1308 invite_option.call_id = Some(ref_call_id);
1309
1310 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1311
1312 {
1313 let cs = self.call_state.read().await;
1314 if let Some(opt) = cs.option.as_ref() {
1315 if let Some(callee) = opt.callee.as_ref() {
1316 headers.push(rsip::Header::Other(
1317 "X-Referred-To".to_string(),
1318 callee.clone(),
1319 ));
1320 }
1321 if let Some(caller) = opt.caller.as_ref() {
1322 headers.push(rsip::Header::Other(
1323 "X-Referred-From".to_string(),
1324 caller.clone(),
1325 ));
1326 }
1327 }
1328 }
1329
1330 headers.push(rsip::Header::Other(
1331 "X-Referred-Id".to_string(),
1332 self.session_id.clone(),
1333 ));
1334
1335 let ssrc = rand::random::<u32>();
1336 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1337 start_time: Utc::now(),
1338 ssrc,
1339 option: Some(call_option.clone()),
1340 is_refer: true,
1341 ..Default::default()
1342 }));
1343
1344 {
1345 let mut cs = self.call_state.write().await;
1346 cs.refer_callstate.replace(refer_call_state.clone());
1347 }
1348
1349 let auto_hangup_requested = refer_option
1350 .as_ref()
1351 .and_then(|o| o.auto_hangup)
1352 .unwrap_or(true);
1353
1354 if auto_hangup_requested {
1355 self.call_state.write().await.auto_hangup =
1356 Some((ssrc, CallRecordHangupReason::ByRefer));
1357 } else {
1358 self.call_state.write().await.auto_hangup = None;
1359 }
1360
1361 if !auto_hangup_requested && pause_parent_asr && original_asr_option.is_some() {
1363 let asr_option = original_asr_option.unwrap();
1364 self.call_state.write().await.pending_asr_resume = Some((ssrc, asr_option));
1365 }
1366
1367 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1368
1369 info!(
1370 session_id = self.session_id,
1371 ssrc,
1372 auto_hangup = auto_hangup_requested,
1373 callee,
1374 timeout_secs,
1375 "do_refer"
1376 );
1377
1378 let r = tokio::time::timeout(
1379 Duration::from_secs(timeout_secs as u64),
1380 self.create_outgoing_sip_track(
1381 self.cancel_token.child_token(),
1382 refer_call_state.clone(),
1383 &track_id,
1384 invite_option,
1385 &call_option,
1386 moh,
1387 auto_hangup_requested,
1388 ),
1389 )
1390 .await;
1391
1392 {
1393 self.call_state.write().await.moh = None;
1394 }
1395
1396 let result = match r {
1397 Ok(res) => res,
1398 Err(_) => {
1399 warn!(
1400 session_id = session_id,
1401 "refer sip track creation timed out after {} seconds", timeout_secs
1402 );
1403 self.event_sender
1404 .send(SessionEvent::Reject {
1405 track_id,
1406 timestamp: crate::media::get_timestamp(),
1407 reason: "Timeout when refer".into(),
1408 code: Some(408),
1409 refer: Some(true),
1410 })
1411 .ok();
1412 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1413 }
1414 };
1415
1416 match result {
1417 Ok(answer) => {
1418 self.event_sender
1419 .send(SessionEvent::Answer {
1420 timestamp: crate::media::get_timestamp(),
1421 track_id,
1422 sdp: answer,
1423 refer: Some(true),
1424 })
1425 .ok();
1426 }
1427 Err(e) => {
1428 warn!(
1429 session_id = session_id,
1430 "failed to create refer sip track: {}", e
1431 );
1432 match &e {
1433 rsipstack::Error::DialogError(reason, _, code) => {
1434 self.event_sender
1435 .send(SessionEvent::Reject {
1436 track_id,
1437 timestamp: crate::media::get_timestamp(),
1438 reason: reason.clone(),
1439 code: Some(code.code() as u32),
1440 refer: Some(true),
1441 })
1442 .ok();
1443 }
1444 _ => {}
1445 }
1446 return Err(e.into());
1447 }
1448 }
1449 Ok(())
1450 }
1451
1452 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1453 self.media_stream.mute_track(track_id).await;
1454 Ok(())
1455 }
1456
1457 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1458 self.media_stream.unmute_track(track_id).await;
1459 Ok(())
1460 }
1461
1462 pub async fn cleanup(&self) -> Result<()> {
1463 self.call_state.write().await.tts_handle = None;
1464 self.media_stream.cleanup().await.ok();
1465 Ok(())
1466 }
1467
1468 pub fn get_callrecord(&self) -> Option<CallRecord> {
1469 self.call_state.try_read().ok().map(|call_state| {
1470 call_state.build_callrecord(
1471 self.app_state.clone(),
1472 self.session_id.clone(),
1473 self.call_type.clone(),
1474 )
1475 })
1476 }
1477
1478 async fn dump_to_file(
1479 &self,
1480 dump_file: &mut File,
1481 cmd_receiver: &mut CommandReceiver,
1482 event_receiver: &mut EventReceiver,
1483 ) {
1484 loop {
1485 select! {
1486 _ = self.cancel_token.cancelled() => {
1487 break;
1488 }
1489 Ok(cmd) = cmd_receiver.recv() => {
1490 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1491 .await;
1492 }
1493 Ok(event) = event_receiver.recv() => {
1494 if matches!(event, SessionEvent::Binary{..}) {
1495 continue;
1496 }
1497 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1498 .await;
1499 }
1500 };
1501 }
1502 }
1503
1504 async fn dump_loop(
1505 &self,
1506 dump_events: bool,
1507 mut dump_cmd_receiver: CommandReceiver,
1508 mut dump_event_receiver: EventReceiver,
1509 ) {
1510 if !dump_events {
1511 return;
1512 }
1513
1514 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1515 let mut dump_file = match File::options()
1516 .create(true)
1517 .append(true)
1518 .open(&file_name)
1519 .await
1520 {
1521 Ok(file) => file,
1522 Err(e) => {
1523 warn!(
1524 session_id = self.session_id,
1525 file_name, "failed to open dump events file: {}", e
1526 );
1527 return;
1528 }
1529 };
1530 self.dump_to_file(
1531 &mut dump_file,
1532 &mut dump_cmd_receiver,
1533 &mut dump_event_receiver,
1534 )
1535 .await;
1536
1537 while let Ok(event) = dump_event_receiver.try_recv() {
1538 if matches!(event, SessionEvent::Binary { .. }) {
1539 continue;
1540 }
1541 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1542 }
1543 }
1544
1545 pub async fn create_rtp_track(
1546 &self,
1547 track_id: TrackId,
1548 ssrc: u32,
1549 enable_srtp: Option<bool>,
1550 ) -> Result<RtcTrack> {
1551 let mut rtc_config = RtcTrackConfig::default();
1552 let use_srtp = enable_srtp
1554 .or(self.app_state.config.enable_srtp)
1555 .unwrap_or(false);
1556 rtc_config.mode = if use_srtp {
1557 rustrtc::TransportMode::Srtp
1558 } else {
1559 rustrtc::TransportMode::Rtp
1560 };
1561
1562 if let Some(codecs) = &self.app_state.config.codecs {
1563 let mut codec_types = Vec::new();
1564 for c in codecs {
1565 match c.to_lowercase().as_str() {
1566 "pcmu" => codec_types.push(CodecType::PCMU),
1567 "pcma" => codec_types.push(CodecType::PCMA),
1568 "g722" => codec_types.push(CodecType::G722),
1569 "g729" => codec_types.push(CodecType::G729),
1570 #[cfg(feature = "opus")]
1571 "opus" => codec_types.push(CodecType::Opus),
1572 "dtmf" | "2833" | "telephone_event" => {
1573 codec_types.push(CodecType::TelephoneEvent)
1574 }
1575 _ => {}
1576 }
1577 }
1578 if !codec_types.is_empty() {
1579 rtc_config.preferred_codec = Some(codec_types[0].clone());
1580 rtc_config.codecs = codec_types;
1581 }
1582 }
1583
1584 if rtc_config.preferred_codec.is_none() {
1585 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1586 }
1587
1588 rtc_config.rtp_port_range = self
1589 .app_state
1590 .config
1591 .rtp_start_port
1592 .zip(self.app_state.config.rtp_end_port);
1593
1594 if let Some(ref external_ip) = self.app_state.config.external_ip {
1595 rtc_config.external_ip = Some(external_ip.clone());
1596 }
1597 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1598 rtc_config.bind_ip = Some(bind_ip.clone());
1599 }
1600
1601 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1602 rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
1603
1604 let mut track = RtcTrack::new(
1605 self.cancel_token.child_token(),
1606 track_id,
1607 self.track_config.clone(),
1608 rtc_config,
1609 )
1610 .with_ssrc(ssrc);
1611
1612 track.create().await?;
1613
1614 Ok(track)
1615 }
1616
1617 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1618 let hangup_headers = option
1619 .sip
1620 .as_ref()
1621 .and_then(|s| s.hangup_headers.as_ref())
1622 .map(|headers_map| {
1623 headers_map
1624 .iter()
1625 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1626 .collect::<Vec<rsip::Header>>()
1627 });
1628 self.call_state.write().await.option = Some(option.clone());
1629 info!(
1630 session_id = self.session_id,
1631 call_type = ?self.call_type,
1632 "setup caller track"
1633 );
1634
1635 let track = match self.call_type {
1636 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1637 ActiveCallType::WebSocket => {
1638 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1639 if let Some(receiver) = audio_receiver {
1640 Some(self.create_websocket_track(receiver).await?)
1641 } else {
1642 None
1643 }
1644 }
1645 ActiveCallType::Sip => {
1646 if let Some(dialog_id) = self
1647 .invitation
1648 .find_dialog_id_by_session_id(&self.session_id)
1649 {
1650 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1651 return self
1652 .prepare_incoming_sip_track(
1653 self.cancel_token.clone(),
1654 self.call_state.clone(),
1655 &self.session_id,
1656 pending_dialog,
1657 hangup_headers,
1658 )
1659 .await;
1660 }
1661 }
1662
1663 let mut option = option.clone();
1665 if option.sip.is_none()
1666 || option
1667 .sip
1668 .as_ref()
1669 .and_then(|s| s.username.as_ref())
1670 .is_none()
1671 {
1672 if let Some(callee) = &option.callee {
1673 if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1674 if option.sip.is_none() {
1675 option.sip = Some(crate::SipOption {
1676 username: Some(cred.username.clone()),
1677 password: Some(cred.password.clone()),
1678 realm: cred.realm.clone(),
1679 ..Default::default()
1680 });
1681 }
1682 }
1683 }
1684 }
1685
1686 let mut invite_option = option.build_invite_option()?;
1687 invite_option.call_id = Some(self.session_id.clone());
1688
1689 match self
1690 .create_outgoing_sip_track(
1691 self.cancel_token.clone(),
1692 self.call_state.clone(),
1693 &self.session_id,
1694 invite_option,
1695 &option,
1696 None,
1697 false,
1698 )
1699 .await
1700 {
1701 Ok(answer) => {
1702 self.event_sender
1703 .send(SessionEvent::Answer {
1704 timestamp: crate::media::get_timestamp(),
1705 track_id: self.session_id.clone(),
1706 sdp: answer,
1707 refer: Some(false),
1708 })
1709 .ok();
1710 return Ok(());
1711 }
1712 Err(e) => {
1713 warn!(
1714 session_id = self.session_id,
1715 "failed to create sip track: {}", e
1716 );
1717 match &e {
1718 rsipstack::Error::DialogError(reason, _, code) => {
1719 self.event_sender
1720 .send(SessionEvent::Reject {
1721 track_id: self.session_id.clone(),
1722 timestamp: crate::media::get_timestamp(),
1723 reason: reason.clone(),
1724 code: Some(code.code() as u32),
1725 refer: Some(false),
1726 })
1727 .ok();
1728 }
1729 _ => {}
1730 }
1731 return Err(e.into());
1732 }
1733 }
1734 }
1735 ActiveCallType::B2bua => {
1736 if let Some(dialog_id) = self
1737 .invitation
1738 .find_dialog_id_by_session_id(&self.session_id)
1739 {
1740 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1741 return self
1742 .prepare_incoming_sip_track(
1743 self.cancel_token.clone(),
1744 self.call_state.clone(),
1745 &self.session_id,
1746 pending_dialog,
1747 hangup_headers,
1748 )
1749 .await;
1750 }
1751 }
1752
1753 warn!(
1754 session_id = self.session_id,
1755 "no pending dialog found for B2BUA call"
1756 );
1757 return Err(anyhow::anyhow!(
1758 "no pending dialog found for session_id: {}",
1759 self.session_id
1760 ));
1761 }
1762 };
1763 match track {
1764 Some(track) => {
1765 self.finish_caller_stack(&option, Some(track)).await?;
1766 }
1767 None => {
1768 warn!(session_id = self.session_id, "no track created for caller");
1769 return Err(anyhow::anyhow!("no track created for caller"));
1770 }
1771 }
1772 Ok(())
1773 }
1774
1775 async fn finish_caller_stack(
1776 &self,
1777 option: &CallOption,
1778 track: Option<Box<dyn Track>>,
1779 ) -> Result<()> {
1780 if let Some(track) = track {
1781 self.setup_track_with_stream(&option, track).await?;
1782 }
1783
1784 {
1785 let call_state = self.call_state.read().await;
1786 if let Some(ref answer) = call_state.answer {
1787 info!(
1788 session_id = self.session_id,
1789 "sending answer event: {}", answer,
1790 );
1791 self.event_sender
1792 .send(SessionEvent::Answer {
1793 timestamp: crate::media::get_timestamp(),
1794 track_id: self.session_id.clone(),
1795 sdp: answer.clone(),
1796 refer: Some(false),
1797 })
1798 .ok();
1799 } else {
1800 warn!(
1801 session_id = self.session_id,
1802 "no answer in state to send event"
1803 );
1804 }
1805 }
1806 Ok(())
1807 }
1808
1809 pub async fn setup_track_with_stream(
1810 &self,
1811 option: &CallOption,
1812 mut track: Box<dyn Track>,
1813 ) -> Result<()> {
1814 let processors = match StreamEngine::create_processors(
1815 self.app_state.stream_engine.clone(),
1816 track.as_ref(),
1817 self.cancel_token.child_token(),
1818 self.event_sender.clone(),
1819 self.media_stream.packet_sender.clone(),
1820 option,
1821 )
1822 .await
1823 {
1824 Ok(processors) => processors,
1825 Err(e) => {
1826 warn!(
1827 session_id = self.session_id,
1828 "failed to prepare stream processors: {}", e
1829 );
1830 vec![]
1831 }
1832 };
1833
1834 for processor in processors {
1836 track.append_processor(processor);
1837 }
1838
1839 self.update_track_wrapper(track, None).await;
1840 Ok(())
1841 }
1842
1843 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1844 let (ambiance_opt, subscribe) = {
1845 let state = self.call_state.read().await;
1846 let mut opt = state
1847 .option
1848 .as_ref()
1849 .and_then(|o| o.ambiance.clone())
1850 .unwrap_or_default();
1851
1852 if let Some(global) = &self.app_state.config.ambiance {
1853 opt.merge(global);
1854 }
1855
1856 let subscribe = state
1857 .option
1858 .as_ref()
1859 .and_then(|o| o.subscribe)
1860 .unwrap_or_default();
1861
1862 (opt, subscribe)
1863 };
1864 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1865 match AmbianceProcessor::new(ambiance_opt).await {
1866 Ok(ambiance) => {
1867 info!(session_id = self.session_id, "loaded ambiance processor");
1868 track.append_processor(Box::new(ambiance));
1869 }
1870 Err(e) => {
1871 tracing::error!("failed to load ambiance wav {}", e);
1872 }
1873 }
1874 }
1875
1876 if subscribe && self.call_type != ActiveCallType::WebSocket {
1877 let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1878 (0, self.server_side_track_id.clone())
1879 } else {
1880 (1, self.session_id.clone())
1881 };
1882 let sub_processor =
1883 SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1884 track.append_processor(Box::new(sub_processor));
1885 }
1886
1887 self.call_state.write().await.current_play_id = play_id.clone();
1888 self.media_stream.update_track(track, play_id).await;
1889 }
1890
1891 pub async fn create_websocket_track(
1892 &self,
1893 audio_receiver: WebsocketBytesReceiver,
1894 ) -> Result<Box<dyn Track>> {
1895 let (ssrc, codec) = {
1896 let call_state = self.call_state.read().await;
1897 (
1898 call_state.ssrc,
1899 call_state
1900 .option
1901 .as_ref()
1902 .map(|o| o.codec.clone())
1903 .unwrap_or_default(),
1904 )
1905 };
1906
1907 let ws_track = WebsocketTrack::new(
1908 self.cancel_token.child_token(),
1909 self.session_id.clone(),
1910 self.track_config.clone(),
1911 self.event_sender.clone(),
1912 audio_receiver,
1913 codec,
1914 ssrc,
1915 );
1916
1917 {
1918 let mut call_state = self.call_state.write().await;
1919 call_state.answer_time = Some(Utc::now());
1920 call_state.answer = Some("".to_string());
1921 call_state.last_status_code = 200;
1922 }
1923
1924 Ok(Box::new(ws_track))
1925 }
1926
1927 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1928 let (ssrc, option) = {
1929 let call_state = self.call_state.read().await;
1930 (
1931 call_state.ssrc,
1932 call_state.option.clone().unwrap_or_default(),
1933 )
1934 };
1935
1936 let mut rtc_config = RtcTrackConfig::default();
1937 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1939
1940 if let Some(codecs) = &self.app_state.config.codecs {
1941 let mut codec_types = Vec::new();
1942 for c in codecs {
1943 match c.to_lowercase().as_str() {
1944 "pcmu" => codec_types.push(CodecType::PCMU),
1945 "pcma" => codec_types.push(CodecType::PCMA),
1946 "g722" => codec_types.push(CodecType::G722),
1947 "g729" => codec_types.push(CodecType::G729),
1948 #[cfg(feature = "opus")]
1949 "opus" => codec_types.push(CodecType::Opus),
1950 "dtmf" | "2833" | "telephone_event" => {
1951 codec_types.push(CodecType::TelephoneEvent)
1952 }
1953 _ => {}
1954 }
1955 }
1956 if !codec_types.is_empty() {
1957 rtc_config.preferred_codec = Some(codec_types[0].clone());
1958 rtc_config.codecs = codec_types;
1959 }
1960 }
1961
1962 if let Some(ref external_ip) = self.app_state.config.external_ip {
1963 rtc_config.external_ip = Some(external_ip.clone());
1964 }
1965 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1966 rtc_config.bind_ip = Some(bind_ip.clone());
1967 }
1968
1969 let mut webrtc_track = RtcTrack::new(
1970 self.cancel_token.child_token(),
1971 self.session_id.clone(),
1972 self.track_config.clone(),
1973 rtc_config,
1974 )
1975 .with_ssrc(ssrc);
1976
1977 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1978 let offer = match option.enable_ipv6 {
1979 Some(false) | None => {
1980 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1981 }
1982 _ => option.offer.clone().unwrap_or("".to_string()),
1983 };
1984 let answer: Option<String>;
1985 match webrtc_track.handshake(offer, timeout).await {
1986 Ok(answer_sdp) => {
1987 answer = match option.enable_ipv6 {
1988 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1989 Some(true) => Some(answer_sdp.to_string()),
1990 };
1991 }
1992 Err(e) => {
1993 warn!(session_id = self.session_id, "failed to setup track: {}", e);
1994 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1995 }
1996 }
1997
1998 {
1999 let mut call_state = self.call_state.write().await;
2000 call_state.answer_time = Some(Utc::now());
2001 call_state.answer = answer;
2002 call_state.last_status_code = 200;
2003 }
2004 Ok(Box::new(webrtc_track))
2005 }
2006
2007 async fn create_outgoing_sip_track(
2008 &self,
2009 cancel_token: CancellationToken,
2010 call_state_ref: ActiveCallStateRef,
2011 track_id: &String,
2012 mut invite_option: InviteOption,
2013 call_option: &CallOption,
2014 moh: Option<String>,
2015 auto_hangup: bool,
2016 ) -> Result<String, rsipstack::Error> {
2017 let ssrc = call_state_ref.read().await.ssrc;
2018 let per_call_srtp = call_option.sip.as_ref().and_then(|s| s.enable_srtp);
2019 let rtp_track = self
2020 .create_rtp_track(track_id.clone(), ssrc, per_call_srtp)
2021 .await
2022 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2023
2024 let offer = Some(
2025 rtp_track
2026 .local_description()
2027 .await
2028 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
2029 );
2030
2031 {
2032 let mut cs = call_state_ref.write().await;
2033 if let Some(o) = cs.option.as_mut() {
2034 o.offer = offer.clone();
2035 }
2036 cs.start_time = Utc::now();
2037 };
2038
2039 invite_option.offer = offer.clone().map(|s| s.into());
2040
2041 let needs_contact = contact_needs_public_resolution(&invite_option.contact);
2044
2045 if needs_contact {
2046 let addrs = self.invitation.dialog_layer.endpoint.get_addrs();
2047 if let Some(addr) = find_local_addr_for_uri(&addrs, &invite_option.callee) {
2048 let contact_username = invite_option
2049 .contact
2050 .auth
2051 .as_ref()
2052 .map(|auth| auth.user.as_str())
2053 .or_else(|| {
2054 invite_option
2055 .caller
2056 .auth
2057 .as_ref()
2058 .map(|auth| auth.user.as_str())
2059 });
2060 invite_option.contact = build_public_contact_uri(
2061 &self.app_state.learned_public_address,
2062 self.app_state.auto_learn_public_address_enabled(),
2063 &addr,
2064 contact_username,
2065 Some(&invite_option.contact),
2066 );
2067 } else {
2068 return Err(rsipstack::Error::Error(format!(
2069 "missing local SIP address for callee transport: {}",
2070 invite_option.callee
2071 )));
2072 }
2073 }
2074
2075 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
2076
2077 if let Some(moh) = moh {
2078 let ssrc_and_moh = {
2079 let mut state = call_state_ref.write().await;
2080 state.moh = Some(moh.clone());
2081 if state.current_play_id.is_none() {
2082 let ssrc = rand::random::<u32>();
2083 Some((ssrc, moh.clone()))
2084 } else {
2085 info!(
2086 session_id = self.session_id,
2087 "Something is playing, MOH will start after it ends"
2088 );
2089 None
2090 }
2091 };
2092
2093 if let Some((ssrc, moh_path)) = ssrc_and_moh {
2094 let file_track = FileTrack::new(self.server_side_track_id.clone())
2095 .with_play_id(Some(moh_path.clone()))
2096 .with_ssrc(ssrc)
2097 .with_path(moh_path.clone())
2098 .with_cancel_token(self.cancel_token.child_token());
2099 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
2100 .await;
2101 }
2102 } else {
2103 let track = rtp_track_to_setup.take().unwrap();
2104 self.setup_track_with_stream(&call_option, track)
2105 .await
2106 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2107 }
2108
2109 info!(
2110 session_id = self.session_id,
2111 track_id,
2112 contact = %invite_option.contact,
2113 "invite {} -> {} offer: \n{}",
2114 invite_option.caller,
2115 invite_option.callee,
2116 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
2117 );
2118
2119 let (dlg_state_sender, dlg_state_receiver) =
2120 self.invitation.dialog_layer.new_dialog_state_channel();
2121
2122 let states = InviteDialogStates {
2123 is_client: true,
2124 session_id: self.session_id.clone(),
2125 track_id: track_id.clone(),
2126 event_sender: self.event_sender.clone(),
2127 media_stream: self.media_stream.clone(),
2128 call_state: call_state_ref.clone(),
2129 cancel_token,
2130 terminated_reason: None,
2131 has_early_media: false,
2132 };
2133
2134 let hangup_headers = call_option
2135 .sip
2136 .as_ref()
2137 .and_then(|s| s.hangup_headers.as_ref())
2138 .map(|headers_map| {
2139 headers_map
2140 .iter()
2141 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
2142 .collect::<Vec<rsip::Header>>()
2143 });
2144
2145 let mut client_dialog_handler = DialogStateReceiverGuard::new(
2146 self.invitation.dialog_layer.clone(),
2147 dlg_state_receiver,
2148 hangup_headers,
2149 );
2150
2151 crate::spawn(async move {
2152 client_dialog_handler.process_dialog(states).await;
2153 });
2154
2155 let (dialog_id, answer) = self
2156 .invitation
2157 .invite(invite_option, dlg_state_sender)
2158 .await?;
2159
2160 self.call_state.write().await.moh = None;
2161
2162 if let Some(track) = rtp_track_to_setup {
2163 info!(
2164 session_id = self.session_id,
2165 track_id, "Stopping MOH and setting up RTP track"
2166 );
2167 self.media_stream
2168 .remove_track(&self.server_side_track_id, false)
2169 .await;
2170
2171 self.setup_track_with_stream(&call_option, track)
2172 .await
2173 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2174 }
2175
2176 let answer = match answer {
2177 Some(answer) => {
2178 let s = String::from_utf8_lossy(&answer).to_string();
2179 if s.trim().is_empty() {
2180 let cs = call_state_ref.read().await;
2184 match cs.answer.clone() {
2185 Some(early_sdp) if !early_sdp.is_empty() => {
2186 info!(
2187 session_id = self.session_id,
2188 "200 OK has empty body; using early-media SDP from 183"
2189 );
2190 (early_sdp, true )
2191 }
2192 _ => {
2193 warn!(
2194 session_id = self.session_id,
2195 "200 OK has empty body and no early-media SDP available"
2196 );
2197 (s, false)
2198 }
2199 }
2200 } else {
2201 (s, false)
2202 }
2203 }
2204 None => {
2205 let cs = call_state_ref.read().await;
2207 match cs.answer.clone() {
2208 Some(early_sdp) if !early_sdp.is_empty() => {
2209 info!(
2210 session_id = self.session_id,
2211 "200 OK had no answer; using early-media SDP from 183"
2212 );
2213 (early_sdp, true )
2214 }
2215 _ => {
2216 warn!(session_id = self.session_id, "no answer received");
2217 return Err(rsipstack::Error::DialogError(
2218 "No answer received".to_string(),
2219 dialog_id,
2220 rsip::StatusCode::NotAcceptableHere,
2221 ));
2222 }
2223 }
2224 }
2225 };
2226 let (answer, remote_description_already_applied) = answer;
2227
2228 {
2229 let mut cs = call_state_ref.write().await;
2230 if cs.answer.is_none() {
2231 cs.answer = Some(answer.clone());
2232 }
2233 if auto_hangup {
2234 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2235 }
2236 }
2237 if !remote_description_already_applied {
2238 self.media_stream
2239 .update_remote_description(&track_id, &answer)
2240 .await
2241 .ok();
2242 }
2243
2244 Ok(answer)
2245 }
2246
2247 pub fn is_webrtc_sdp(sdp: &str) -> bool {
2249 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2250 && sdp.contains("a=fingerprint:")
2251 }
2252
2253 pub async fn setup_answer_track(
2254 &self,
2255 ssrc: u32,
2256 option: &CallOption,
2257 offer: String,
2258 ) -> Result<(String, Box<dyn Track>)> {
2259 let offer = match option.enable_ipv6 {
2260 Some(false) | None => strip_ipv6_candidates(&offer),
2261 _ => offer.clone(),
2262 };
2263
2264 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2265
2266 let mut media_track = if Self::is_webrtc_sdp(&offer) {
2267 let mut rtc_config = RtcTrackConfig::default();
2268 rtc_config.mode = rustrtc::TransportMode::WebRtc;
2269 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2270 if let Some(ref external_ip) = self.app_state.config.external_ip {
2271 rtc_config.external_ip = Some(external_ip.clone());
2272 }
2273 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2274 rtc_config.bind_ip = Some(bind_ip.clone());
2275 }
2276 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2277 rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
2278
2279 let webrtc_track = RtcTrack::new(
2280 self.cancel_token.child_token(),
2281 self.session_id.clone(),
2282 self.track_config.clone(),
2283 rtc_config,
2284 )
2285 .with_ssrc(ssrc);
2286
2287 Box::new(webrtc_track) as Box<dyn Track>
2288 } else {
2289 let per_call_srtp = option.sip.as_ref().and_then(|s| s.enable_srtp);
2290 let rtp_track = self
2291 .create_rtp_track(self.session_id.clone(), ssrc, per_call_srtp)
2292 .await?;
2293 Box::new(rtp_track) as Box<dyn Track>
2294 };
2295
2296 let answer = match media_track.handshake(offer.clone(), timeout).await {
2297 Ok(answer) => answer,
2298 Err(e) => {
2299 return Err(anyhow::anyhow!("handshake failed: {e}"));
2300 }
2301 };
2302
2303 return Ok((answer, media_track));
2304 }
2305
2306 pub async fn prepare_incoming_sip_track(
2307 &self,
2308 cancel_token: CancellationToken,
2309 call_state_ref: ActiveCallStateRef,
2310 track_id: &String,
2311 pending_dialog: PendingDialog,
2312 hangup_headers: Option<Vec<rsip::Header>>,
2313 ) -> Result<()> {
2314 let state_receiver = pending_dialog.state_receiver;
2315 let states = InviteDialogStates {
2318 is_client: false,
2319 session_id: self.session_id.clone(),
2320 track_id: track_id.clone(),
2321 event_sender: self.event_sender.clone(),
2322 media_stream: self.media_stream.clone(),
2323 call_state: self.call_state.clone(),
2324 cancel_token,
2325 terminated_reason: None,
2326 has_early_media: false,
2327 };
2328
2329 let initial_request = pending_dialog.dialog.initial_request();
2330 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2331
2332 let (ssrc, option) = {
2333 let call_state = call_state_ref.read().await;
2334 (
2335 call_state.ssrc,
2336 call_state.option.clone().unwrap_or_default(),
2337 )
2338 };
2339
2340 match self.setup_answer_track(ssrc, &option, offer).await {
2341 Ok((offer, track)) => {
2342 self.setup_track_with_stream(&option, track).await?;
2343 {
2344 let mut state = self.call_state.write().await;
2345 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2346 }
2347 }
2348 Err(e) => {
2349 return Err(anyhow::anyhow!("error creating track: {}", e));
2350 }
2351 }
2352
2353 let mut client_dialog_handler = DialogStateReceiverGuard::new(
2354 self.invitation.dialog_layer.clone(),
2355 state_receiver,
2356 hangup_headers,
2357 );
2358
2359 crate::spawn(async move {
2360 client_dialog_handler.process_dialog(states).await;
2361 });
2362 Ok(())
2363 }
2364}
2365
2366impl Drop for ActiveCall {
2367 fn drop(&mut self) {
2368 info!(session_id = self.session_id, "dropping active call");
2369 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2370 if let Some(record) = self.get_callrecord() {
2371 if let Err(e) = sender.send(record) {
2372 warn!(
2373 session_id = self.session_id,
2374 "failed to send call record: {}", e
2375 );
2376 }
2377 }
2378 }
2379 }
2380}
2381
2382impl ActiveCallState {
2383 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2384 if let Some(existing) = &self.option {
2385 if option.asr.is_none() {
2386 option.asr = existing.asr.clone();
2387 }
2388 if option.tts.is_none() {
2389 option.tts = existing.tts.clone();
2390 }
2391 if option.vad.is_none() {
2392 option.vad = existing.vad.clone();
2393 }
2394 if option.denoise.is_none() {
2395 option.denoise = existing.denoise;
2396 }
2397 if option.recorder.is_none() {
2398 option.recorder = existing.recorder.clone();
2399 }
2400 if option.eou.is_none() {
2401 option.eou = existing.eou.clone();
2402 }
2403 if option.extra.is_none() {
2404 option.extra = existing.extra.clone();
2405 }
2406 if option.ambiance.is_none() {
2407 option.ambiance = existing.ambiance.clone();
2408 }
2409 }
2410 option
2411 }
2412
2413 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2414 if self.hangup_reason.is_none() {
2415 self.hangup_reason = Some(reason);
2416 }
2417 }
2418
2419 pub fn build_hangup_event(
2420 &self,
2421 track_id: TrackId,
2422 initiator: Option<String>,
2423 ) -> crate::event::SessionEvent {
2424 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2425 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2426 let extra = self.extras.clone();
2427
2428 crate::event::SessionEvent::Hangup {
2429 track_id,
2430 timestamp: crate::media::get_timestamp(),
2431 reason: Some(format!("{:?}", self.hangup_reason)),
2432 initiator,
2433 start_time: self.start_time.to_rfc3339(),
2434 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2435 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2436 hangup_time: Utc::now().to_rfc3339(),
2437 extra,
2438 from: from.map(|f| f.into()),
2439 to: to.map(|f| f.into()),
2440 refer: Some(self.is_refer),
2441 }
2442 }
2443
2444 pub fn build_callrecord(
2445 &self,
2446 app_state: AppState,
2447 session_id: String,
2448 call_type: ActiveCallType,
2449 ) -> CallRecord {
2450 let option = self.option.clone().unwrap_or_default();
2451 let recorder = if option.recorder.is_some() {
2452 let recorder_file = app_state.get_recorder_file(&session_id);
2453 if std::path::Path::new(&recorder_file).exists() {
2454 let file_size = std::fs::metadata(&recorder_file)
2455 .map(|m| m.len())
2456 .unwrap_or(0);
2457 vec![crate::callrecord::CallRecordMedia {
2458 track_id: session_id.clone(),
2459 path: recorder_file,
2460 size: file_size,
2461 extra: None,
2462 }]
2463 } else {
2464 vec![]
2465 }
2466 } else {
2467 vec![]
2468 };
2469
2470 let dump_event_file = app_state.get_dump_events_file(&session_id);
2471 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2472 Some(dump_event_file)
2473 } else {
2474 None
2475 };
2476
2477 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2478 if let Ok(rc) = rc.try_read() {
2479 Some(Box::new(rc.build_callrecord(
2480 app_state.clone(),
2481 rc.session_id.clone(),
2482 ActiveCallType::B2bua,
2483 )))
2484 } else {
2485 None
2486 }
2487 });
2488
2489 let caller = option.caller.clone().unwrap_or_default();
2490 let callee = option.callee.clone().unwrap_or_default();
2491
2492 CallRecord {
2493 option: Some(option),
2494 call_id: session_id,
2495 call_type,
2496 start_time: self.start_time,
2497 ring_time: self.ring_time.clone(),
2498 answer_time: self.answer_time.clone(),
2499 end_time: Utc::now(),
2500 caller,
2501 callee,
2502 hangup_reason: self.hangup_reason.clone(),
2503 hangup_messages: Vec::new(),
2504 status_code: self.last_status_code,
2505 extras: self.extras.clone(),
2506 dump_event_file,
2507 recorder,
2508 refer_callrecord,
2509 }
2510 }
2511}