1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 processor::SubscribeProcessor,
11 recorder::RecorderOption,
12 stream::{MediaStream, MediaStreamBuilder},
13 track::{
14 Track, TrackConfig,
15 file::FileTrack,
16 media_pass::MediaPassTrack,
17 rtc::{RtcTrack, RtcTrackConfig},
18 tts::SynthesisHandle,
19 websocket::{WebsocketBytesReceiver, WebsocketTrack},
20 },
21 },
22 synthesis::{SynthesisCommand, SynthesisOption},
23 transcription::TranscriptionOption,
24};
25use crate::{
26 app::AppState,
27 call::{
28 CommandReceiver, CommandSender,
29 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
30 },
31 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
32 useragent::invitation::PendingDialog,
33};
34use anyhow::Result;
35use audio_codec::CodecType;
36use chrono::{DateTime, Utc};
37use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
38use serde::{Deserialize, Serialize};
39use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
40use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
41use tokio_util::sync::CancellationToken;
42use tracing::{debug, info, warn};
43
44#[cfg(test)]
45mod tests {
46 use super::*;
47 use crate::app::AppStateBuilder;
48 use crate::callrecord::CallRecordHangupReason;
49 use crate::config::Config;
50 use crate::media::track::tts::SynthesisHandle;
51 use crate::synthesis::SynthesisCommand;
52 use tokio::sync::mpsc;
53
54 #[tokio::test]
55 async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
56 let mut config = Config::default();
57 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
59 let stream_engine = Arc::new(StreamEngine::default());
60 let app_state = AppStateBuilder::new()
61 .with_config(config)
62 .with_stream_engine(stream_engine)
63 .build()
64 .await?;
65
66 let cancel_token = CancellationToken::new();
67 let session_id = "test-session".to_string();
68 let track_config = TrackConfig::default();
69
70 let mut option = crate::CallOption::default();
71 option.tts = Some(crate::synthesis::SynthesisOption::default());
72
73 let active_call = Arc::new(ActiveCall::new(
74 ActiveCallType::Sip,
75 cancel_token.clone(),
76 session_id.clone(),
77 app_state.invitation.clone(),
78 app_state.clone(),
79 track_config,
80 None,
81 false,
82 None,
83 None,
84 None,
85 ));
86
87 {
88 let mut state = active_call.call_state.write().await;
89 state.option = Some(option);
90 }
91
92 let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
93 let initial_ssrc = 12345;
94 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
95
96 {
98 let mut state = active_call.call_state.write().await;
99 state.tts_handle = Some(handle);
100 state.current_play_id = Some("play_1".to_string());
101 }
102
103 active_call
105 .do_tts(
106 "hangup now".to_string(),
107 None,
108 Some("play_1".to_string()),
109 Some(true),
110 false,
111 true,
112 None,
113 None,
114 false,
115 None,
116 )
117 .await?;
118
119 {
121 let state = active_call.call_state.read().await;
122 assert!(state.auto_hangup.is_some());
123 let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
124 assert_eq!(
125 h_ssrc, initial_ssrc,
126 "SSRC should be reused from existing handle"
127 );
128 assert_eq!(reason, CallRecordHangupReason::BySystem);
129 }
130
131 let cmd = rx.try_recv().expect("Should have received tts command");
133 assert_eq!(cmd.text, "hangup now");
134
135 Ok(())
136 }
137
138 #[tokio::test]
139 async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
140 let mut config = Config::default();
141 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
143 let stream_engine = Arc::new(StreamEngine::default());
144 let app_state = AppStateBuilder::new()
145 .with_config(config)
146 .with_stream_engine(stream_engine)
147 .build()
148 .await?;
149
150 let active_call = Arc::new(ActiveCall::new(
151 ActiveCallType::Sip,
152 CancellationToken::new(),
153 "test-session".to_string(),
154 app_state.invitation.clone(),
155 app_state.clone(),
156 TrackConfig::default(),
157 None,
158 false,
159 None,
160 None,
161 None,
162 ));
163
164 let mut tts_opt = crate::synthesis::SynthesisOption::default();
165 tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
166 let mut option = crate::CallOption::default();
167 option.tts = Some(tts_opt);
168 {
169 let mut state = active_call.call_state.write().await;
170 state.option = Some(option);
171 }
172
173 let (tx, _rx) = mpsc::unbounded_channel();
174 let initial_ssrc = 111;
175 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
176
177 {
178 let mut state = active_call.call_state.write().await;
179 state.tts_handle = Some(handle);
180 state.current_play_id = Some("play_1".to_string());
181 }
182
183 active_call
185 .do_tts(
186 "new play".to_string(),
187 None,
188 Some("play_2".to_string()),
189 Some(true),
190 false,
191 true,
192 None,
193 None,
194 false,
195 None,
196 )
197 .await?;
198
199 {
201 let state = active_call.call_state.read().await;
202 let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
203 assert_ne!(
204 h_ssrc, initial_ssrc,
205 "Should use a new SSRC for different play_id"
206 );
207 }
208
209 Ok(())
210 }
211}
212
213#[derive(Deserialize)]
214#[serde(rename_all = "camelCase")]
215pub struct CallParams {
216 pub id: Option<String>,
217 #[serde(rename = "dump")]
218 pub dump_events: Option<bool>,
219 #[serde(rename = "ping")]
220 pub ping_interval: Option<u32>,
221 pub server_side_track: Option<String>,
222}
223
224#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
225#[serde(rename_all = "camelCase")]
226pub enum ActiveCallType {
227 Webrtc,
228 B2bua,
229 WebSocket,
230 #[default]
231 Sip,
232}
233
234#[derive(Default)]
235pub struct ActiveCallState {
236 pub session_id: String,
237 pub start_time: DateTime<Utc>,
238 pub ring_time: Option<DateTime<Utc>>,
239 pub answer_time: Option<DateTime<Utc>>,
240 pub hangup_reason: Option<CallRecordHangupReason>,
241 pub last_status_code: u16,
242 pub option: Option<CallOption>,
243 pub answer: Option<String>,
244 pub ssrc: u32,
245 pub refer_callstate: Option<ActiveCallStateRef>,
246 pub extras: Option<HashMap<String, serde_json::Value>>,
247 pub is_refer: bool,
248 pub sip_hangup_headers_template: Option<HashMap<String, String>>,
249
250 pub tts_handle: Option<SynthesisHandle>,
252 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
253 pub wait_input_timeout: Option<u32>,
254 pub moh: Option<String>,
255 pub current_play_id: Option<String>,
256 pub audio_receiver: Option<WebsocketBytesReceiver>,
257 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
258 pub pending_asr_resume: Option<(u32, TranscriptionOption)>,
259}
260
261pub type ActiveCallRef = Arc<ActiveCall>;
262pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
263
264pub struct ActiveCall {
265 pub call_state: ActiveCallStateRef,
266 pub cancel_token: CancellationToken,
267 pub call_type: ActiveCallType,
268 pub session_id: String,
269 pub media_stream: Arc<MediaStream>,
270 pub track_config: TrackConfig,
271 pub event_sender: EventSender,
272 pub app_state: AppState,
273 pub invitation: Invitation,
274 pub cmd_sender: CommandSender,
275 pub dump_events: bool,
276 pub server_side_track_id: TrackId,
277}
278
279pub struct ActiveCallGuard {
280 pub call: ActiveCallRef,
281 pub active_calls: usize,
282}
283
284impl ActiveCallGuard {
285 pub fn new(call: ActiveCallRef) -> Self {
286 let active_calls = {
287 call.app_state
288 .total_calls
289 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
290 let mut calls = call.app_state.active_calls.lock().unwrap();
291 calls.insert(call.session_id.clone(), call.clone());
292 calls.len()
293 };
294 Self { call, active_calls }
295 }
296}
297
298impl Drop for ActiveCallGuard {
299 fn drop(&mut self) {
300 self.call
301 .app_state
302 .active_calls
303 .lock()
304 .unwrap()
305 .remove(&self.call.session_id);
306 }
307}
308
309pub struct ActiveCallReceiver {
310 pub cmd_receiver: CommandReceiver,
311 pub dump_cmd_receiver: CommandReceiver,
312 pub dump_event_receiver: EventReceiver,
313}
314
315impl ActiveCall {
316 pub fn new(
317 call_type: ActiveCallType,
318 cancel_token: CancellationToken,
319 session_id: String,
320 invitation: Invitation,
321 app_state: AppState,
322 track_config: TrackConfig,
323 audio_receiver: Option<WebsocketBytesReceiver>,
324 dump_events: bool,
325 server_side_track_id: Option<TrackId>,
326 extras: Option<HashMap<String, serde_json::Value>>,
327 sip_hangup_headers_template: Option<HashMap<String, String>>,
328 ) -> Self {
329 let event_sender = crate::event::create_event_sender();
330 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
331 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
332 .with_id(session_id.clone())
333 .with_cancel_token(cancel_token.child_token());
334 let media_stream = Arc::new(media_stream_builder.build());
335 let start_time = Utc::now();
336 let call_type_str = match &call_type {
338 ActiveCallType::Sip => "sip",
339 ActiveCallType::WebSocket => "websocket",
340 ActiveCallType::Webrtc => "webrtc",
341 ActiveCallType::B2bua => "b2bua",
342 };
343 let extras = {
344 let mut e = extras.unwrap_or_default();
345 e.entry(crate::playbook::BUILTIN_SESSION_ID.to_string())
346 .or_insert_with(|| serde_json::Value::String(session_id.clone()));
347 e.entry(crate::playbook::BUILTIN_CALL_TYPE.to_string())
348 .or_insert_with(|| serde_json::Value::String(call_type_str.to_string()));
349 e.entry(crate::playbook::BUILTIN_START_TIME.to_string())
350 .or_insert_with(|| serde_json::Value::String(start_time.to_rfc3339()));
351 Some(e)
352 };
353 let call_state = Arc::new(RwLock::new(ActiveCallState {
354 session_id: session_id.clone(),
355 start_time,
356 ssrc: rand::random::<u32>(),
357 extras,
358 audio_receiver,
359 sip_hangup_headers_template,
360 ..Default::default()
361 }));
362 Self {
363 cancel_token,
364 call_type,
365 session_id,
366 call_state,
367 media_stream,
368 track_config,
369 event_sender,
370 app_state,
371 invitation,
372 cmd_sender,
373 dump_events,
374 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
375 }
376 }
377
378 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
379 self.cmd_sender
380 .send(command)
381 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
382 Ok(())
383 }
384
385 pub fn new_receiver(&self) -> ActiveCallReceiver {
389 ActiveCallReceiver {
390 cmd_receiver: self.cmd_sender.subscribe(),
391 dump_cmd_receiver: self.cmd_sender.subscribe(),
392 dump_event_receiver: self.event_sender.subscribe(),
393 }
394 }
395
396 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
397 let ActiveCallReceiver {
398 mut cmd_receiver,
399 dump_cmd_receiver,
400 dump_event_receiver,
401 } = receiver;
402
403 let process_command_loop = async move {
404 while let Ok(command) = cmd_receiver.recv().await {
405 match self.dispatch(command).await {
406 Ok(_) => (),
407 Err(e) => {
408 warn!(session_id = self.session_id, "{}", e);
409 self.event_sender
410 .send(SessionEvent::Error {
411 track_id: self.session_id.clone(),
412 timestamp: crate::media::get_timestamp(),
413 sender: "command".to_string(),
414 error: e.to_string(),
415 code: None,
416 })
417 .ok();
418 }
419 }
420 }
421 };
422 self.app_state
423 .total_calls
424 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
425
426 tokio::join!(
427 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
428 async {
429 select! {
430 _ = process_command_loop => {
431 info!(session_id = self.session_id, "command loop done");
432 }
433 _ = self.process() => {
434 info!(session_id = self.session_id, "call serve done");
435 }
436 _ = self.cancel_token.cancelled() => {
437 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
438 }
439 }
440 self.cancel_token.cancel();
441 }
442 );
443 Ok(())
444 }
445
446 async fn process(&self) -> Result<()> {
447 let mut event_receiver = self.event_sender.subscribe();
448
449 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
450 let input_timeout_expire_ref = input_timeout_expire.clone();
451 let event_sender = self.event_sender.clone();
452 let wait_input_timeout_loop = async {
453 loop {
454 let (start_time, expire) = { *input_timeout_expire.lock().await };
455 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
456 info!(session_id = self.session_id, "wait input timeout reached");
457 *input_timeout_expire.lock().await = (0, 0);
458 event_sender
459 .send(SessionEvent::Silence {
460 track_id: self.server_side_track_id.clone(),
461 timestamp: crate::media::get_timestamp(),
462 start_time,
463 duration: expire as u64,
464 samples: None,
465 })
466 .ok();
467 }
468 sleep(Duration::from_millis(100)).await;
469 }
470 };
471 let server_side_track_id = self.server_side_track_id.clone();
472 let event_hook_loop = async move {
473 while let Ok(event) = event_receiver.recv().await {
474 match event {
475 SessionEvent::Speaking { .. }
476 | SessionEvent::Dtmf { .. }
477 | SessionEvent::AsrDelta { .. }
478 | SessionEvent::AsrFinal { .. }
479 | SessionEvent::TrackStart { .. } => {
480 *input_timeout_expire_ref.lock().await = (0, 0);
481 }
482 SessionEvent::TrackEnd {
483 track_id,
484 play_id,
485 ssrc,
486 ..
487 } => {
488 if track_id != server_side_track_id {
489 continue;
490 }
491
492 let (moh_path, auto_hangup, wait_timeout_val) = {
493 let mut state = self.call_state.write().await;
494 if play_id != state.current_play_id {
495 debug!(
496 session_id = self.session_id,
497 ?play_id,
498 current = ?state.current_play_id,
499 "ignoring interrupted track end"
500 );
501 continue;
502 }
503 state.current_play_id = None;
504 (
505 state.moh.clone(),
506 state.auto_hangup.clone(),
507 state.wait_input_timeout.take(),
508 )
509 };
510
511 if let Some(path) = moh_path {
512 info!(session_id = self.session_id, "looping moh: {}", path);
513 let ssrc = rand::random::<u32>();
514 let file_track = FileTrack::new(self.server_side_track_id.clone())
515 .with_play_id(Some(path.clone()))
516 .with_ssrc(ssrc)
517 .with_path(path.clone())
518 .with_cancel_token(self.cancel_token.child_token());
519 self.update_track_wrapper(Box::new(file_track), Some(path))
520 .await;
521 continue;
522 }
523
524 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
525 if hangup_ssrc == ssrc {
526 info!(
527 session_id = self.session_id,
528 ssrc, "auto hangup when track end track_id:{}", track_id
529 );
530 self.do_hangup(Some(hangup_reason), None, None).await.ok();
531 }
532 }
533
534 if let Some(timeout) = wait_timeout_val {
535 let expire = if timeout > 0 {
536 (crate::media::get_timestamp(), timeout)
537 } else {
538 (0, 0)
539 };
540 *input_timeout_expire_ref.lock().await = expire;
541 }
542 }
543 SessionEvent::Interrupt { receiver } => {
544 let track_id =
545 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
546 if track_id == self.server_side_track_id {
547 debug!(
548 session_id = self.session_id,
549 "received interrupt event, stopping playback"
550 );
551 self.do_interrupt(true).await.ok();
552 }
553 }
554 SessionEvent::Inactivity { track_id, .. } => {
555 info!(
556 session_id = self.session_id,
557 track_id, "inactivity timeout reached, hanging up"
558 );
559 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None, None)
560 .await
561 .ok();
562 }
563 SessionEvent::Hangup { refer, .. } => {
564 if refer == Some(true) {
566 let mut cs = self.call_state.write().await;
567 if let Some((refer_ssrc, asr_option)) = cs.pending_asr_resume.take() {
568 let is_refer_hangup = cs
570 .refer_callstate
571 .as_ref()
572 .map(|rcs| {
573 rcs.try_read()
574 .map(|g| g.ssrc == refer_ssrc)
575 .unwrap_or(false)
576 })
577 .unwrap_or(false);
578
579 if is_refer_hangup {
580 drop(cs); info!(
582 session_id = self.session_id,
583 "Refer call ended, resuming parent ASR"
584 );
585
586 match self
588 .app_state
589 .stream_engine
590 .create_asr_processor(
591 self.server_side_track_id.clone(),
592 self.cancel_token.child_token(),
593 asr_option,
594 self.event_sender.clone(),
595 )
596 .await
597 {
598 Ok(asr_processor) => {
599 if let Err(e) = self
600 .media_stream
601 .append_processor(
602 &self.server_side_track_id,
603 asr_processor,
604 )
605 .await
606 {
607 warn!(
608 session_id = self.session_id,
609 "Failed to resume ASR after refer: {}", e
610 );
611 }
612 }
613 Err(e) => {
614 warn!(
615 session_id = self.session_id,
616 "Failed to create ASR processor for resume: {}", e
617 );
618 }
619 }
620 }
621 }
622 }
623 }
624 SessionEvent::Error { track_id, .. } => {
625 if track_id != server_side_track_id {
626 continue;
627 }
628
629 let moh_info = {
630 let mut state = self.call_state.write().await;
631 if let Some(path) = state.moh.clone() {
632 let fallback = "./config/sounds/refer_moh.wav".to_string();
633 let next_path = if path != fallback
634 && std::path::Path::new(&fallback).exists()
635 {
636 info!(
637 session_id = self.session_id,
638 "moh error, switching to fallback: {}", fallback
639 );
640 state.moh = Some(fallback.clone());
641 fallback
642 } else {
643 info!(
644 session_id = self.session_id,
645 "looping moh on error: {}", path
646 );
647 path
648 };
649 Some(next_path)
650 } else {
651 None
652 }
653 };
654
655 if let Some(next_path) = moh_info {
656 let ssrc = rand::random::<u32>();
657 let file_track = FileTrack::new(self.server_side_track_id.clone())
658 .with_play_id(Some(next_path.clone()))
659 .with_ssrc(ssrc)
660 .with_path(next_path.clone())
661 .with_cancel_token(self.cancel_token.child_token());
662 self.update_track_wrapper(Box::new(file_track), Some(next_path))
663 .await;
664 continue;
665 }
666 }
667 _ => {}
668 }
669 }
670 };
671
672 select! {
673 _ = wait_input_timeout_loop=>{
674 info!(session_id = self.session_id, "wait input timeout loop done");
675 }
676 _ = self.media_stream.serve() => {
677 info!(session_id = self.session_id, "media stream loop done");
678 }
679 _ = event_hook_loop => {
680 info!(session_id = self.session_id, "event loop done");
681 }
682 }
683 Ok(())
684 }
685
686 async fn dispatch(&self, command: Command) -> Result<()> {
687 match command {
688 Command::Invite { option } => self.do_invite(option).await,
689 Command::Accept { option } => self.do_accept(option).await,
690 Command::Reject { reason, code } => {
691 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
692 .await
693 }
694 Command::Ringing {
695 ringtone,
696 recorder,
697 early_media,
698 } => self.do_ringing(ringtone, recorder, early_media).await,
699 Command::Tts {
700 text,
701 speaker,
702 play_id,
703 auto_hangup,
704 streaming,
705 end_of_stream,
706 option,
707 wait_input_timeout,
708 base64,
709 cache_key,
710 } => {
711 self.do_tts(
712 text,
713 speaker,
714 play_id,
715 auto_hangup,
716 streaming.unwrap_or_default(),
717 end_of_stream.unwrap_or_default(),
718 option,
719 wait_input_timeout,
720 base64.unwrap_or_default(),
721 cache_key,
722 )
723 .await
724 }
725 Command::Play {
726 url,
727 play_id,
728 auto_hangup,
729 wait_input_timeout,
730 } => {
731 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
732 .await
733 }
734 Command::Hangup {
735 reason,
736 initiator,
737 headers,
738 } => {
739 let reason = reason.map(|r| {
740 r.parse::<CallRecordHangupReason>()
741 .unwrap_or(CallRecordHangupReason::BySystem)
742 });
743 self.do_hangup(reason, initiator, headers).await
744 }
745 Command::Refer {
746 caller,
747 callee,
748 options,
749 } => self.do_refer(caller, callee, options).await,
750 Command::Mute { track_id } => self.do_mute(track_id).await,
751 Command::Unmute { track_id } => self.do_unmute(track_id).await,
752 Command::Pause {} => self.do_pause().await,
753 Command::Resume {} => self.do_resume().await,
754 Command::Interrupt {
755 graceful: passage,
756 fade_out_ms: _,
757 } => self.do_interrupt(passage.unwrap_or_default()).await,
758 Command::History { speaker, text } => self.do_history(speaker, text).await,
759 }
760 }
761
762 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
763 if let Some(recorder_option) = &option.recorder {
764 let mut recorder_file = recorder_option.recorder_file.clone();
765 if recorder_file.contains("{id}") {
766 recorder_file = recorder_file.replace("{id}", &self.session_id);
767 }
768
769 let recorder_file = if recorder_file.is_empty() {
770 self.app_state.get_recorder_file(&self.session_id)
771 } else {
772 let p = Path::new(&recorder_file);
773 p.is_absolute()
774 .then(|| recorder_file.clone())
775 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
776 };
777 info!(
778 session_id = self.session_id,
779 recorder_file, "created recording file"
780 );
781
782 let track_samplerate = self.track_config.samplerate;
783 let recorder_samplerate = if track_samplerate > 0 {
784 track_samplerate
785 } else {
786 recorder_option.samplerate
787 };
788 let recorder_ptime = if recorder_option.ptime == 0 {
789 200
790 } else {
791 recorder_option.ptime
792 };
793 let requested_format = recorder_option
794 .format
795 .unwrap_or(self.app_state.config.recorder_format());
796 let format = requested_format.effective();
797 if requested_format != format {
798 warn!(
799 session_id = self.session_id,
800 requested = requested_format.extension(),
801 "Recorder format fallback to wav due to unsupported feature"
802 );
803 }
804 let mut recorder_config = RecorderOption {
805 recorder_file,
806 samplerate: recorder_samplerate,
807 ptime: recorder_ptime,
808 format: Some(format),
809 };
810 recorder_config.ensure_path_extension(format);
811 Some(recorder_config)
812 } else {
813 None
814 }
815 }
816
817 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
818 {
820 let state = self.call_state.read().await;
821 option = state.merge_option(option);
822 }
823
824 option.check_default();
825 if let Some(opt) = self.build_record_option(&option) {
826 self.media_stream.update_recorder_option(opt).await;
827 }
828
829 if let Some(opt) = &option.media_pass {
830 let track_id = self.server_side_track_id.clone();
831 let cancel_token = self.cancel_token.child_token();
832 let ssrc = rand::random::<u32>();
833 let media_pass_track = MediaPassTrack::new(
834 self.session_id.clone(),
835 ssrc,
836 track_id,
837 cancel_token,
838 opt.clone(),
839 );
840 self.update_track_wrapper(Box::new(media_pass_track), None)
841 .await;
842 }
843
844 info!(
845 session_id = self.session_id,
846 call_type = ?self.call_type,
847 sender,
848 ?option,
849 "caller with option"
850 );
851
852 match self.setup_caller_track(&option).await {
853 Ok(_) => return Ok(option),
854 Err(e) => {
855 self.app_state
856 .total_failed_calls
857 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
858 let error_event = crate::event::SessionEvent::Error {
859 track_id: self.session_id.clone(),
860 timestamp: crate::media::get_timestamp(),
861 sender,
862 error: e.to_string(),
863 code: None,
864 };
865 self.event_sender.send(error_event).ok();
866 self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
867 .await
868 .ok();
869 return Err(e);
870 }
871 }
872 }
873
874 async fn do_invite(&self, option: CallOption) -> Result<()> {
875 self.invite_or_accept(option, "invite".to_string())
876 .await
877 .map(|_| ())
878 }
879
880 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
881 let has_pending = self
882 .invitation
883 .find_dialog_id_by_session_id(&self.session_id)
884 .is_some();
885 let ready_to_answer_val = {
886 let state = self.call_state.read().await;
887 state.ready_to_answer.is_none()
888 };
889
890 if ready_to_answer_val {
891 if !has_pending {
892 warn!(session_id = self.session_id, "no pending call to accept");
894 let rejet_event = crate::event::SessionEvent::Reject {
895 track_id: self.session_id.clone(),
896 timestamp: crate::media::get_timestamp(),
897 reason: "no pending call".to_string(),
898 refer: None,
899 code: Some(486),
900 };
901 self.event_sender.send(rejet_event).ok();
902 self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
903 .await
904 .ok();
905 return Err(anyhow::anyhow!("no pending call to accept"));
906 }
907 option = self.invite_or_accept(option, "accept".to_string()).await?;
908 } else {
909 option.check_default();
910 self.call_state.write().await.option = Some(option.clone());
911 }
912 info!(session_id = self.session_id, ?option, "accepting call");
913 let ready = self.call_state.write().await.ready_to_answer.take();
914 if let Some((answer, track, dialog)) = ready {
915 info!(
916 session_id = self.session_id,
917 track_id = track.as_ref().map(|t| t.id()),
918 "ready to answer with track"
919 );
920
921 let headers = vec![rsip::Header::ContentType(
922 "application/sdp".to_string().into(),
923 )];
924
925 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
926 Ok(_) => {
927 {
928 let mut state = self.call_state.write().await;
929 state.answer = Some(answer);
930 state.answer_time = Some(Utc::now());
931 }
932 self.finish_caller_stack(&option, track).await?;
933 }
934 Err(e) => {
935 warn!(session_id = self.session_id, "failed to accept call: {}", e);
936 return Err(anyhow::anyhow!("failed to accept call"));
937 }
938 }
939 }
940 return Ok(());
941 }
942
943 async fn do_reject(
944 &self,
945 code: Option<rsip::StatusCode>,
946 reason: Option<String>,
947 ) -> Result<()> {
948 match self
949 .invitation
950 .find_dialog_id_by_session_id(&self.session_id)
951 {
952 Some(id) => {
953 info!(
954 session_id = self.session_id,
955 ?reason,
956 ?code,
957 "rejecting call"
958 );
959 self.invitation.hangup(id, code, reason).await
960 }
961 None => Ok(()),
962 }
963 }
964
965 async fn do_ringing(
966 &self,
967 ringtone: Option<String>,
968 recorder: Option<RecorderOption>,
969 early_media: Option<bool>,
970 ) -> Result<()> {
971 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
972 if ready_to_answer_val {
973 let option = CallOption {
974 recorder,
975 ..Default::default()
976 };
977 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
978 }
979
980 let state = self.call_state.read().await;
981 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
982 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
983 let headers = vec![rsip::Header::ContentType(
984 "application/sdp".to_string().into(),
985 )];
986 (Some(headers), Some(answer.as_bytes().to_vec()))
987 } else {
988 (None, None)
989 };
990
991 dialog.ringing(headers, body).ok();
992 info!(
993 session_id = self.session_id,
994 ringtone, early_media, "playing ringtone"
995 );
996 if let Some(ringtone_url) = ringtone {
997 drop(state);
998 self.do_play(ringtone_url, None, None, None).await.ok();
999 } else {
1000 info!(session_id = self.session_id, "no ringtone to play");
1001 }
1002 }
1003 Ok(())
1004 }
1005
1006 async fn do_tts(
1007 &self,
1008 text: String,
1009 speaker: Option<String>,
1010 play_id: Option<String>,
1011 auto_hangup: Option<bool>,
1012 streaming: bool,
1013 end_of_stream: bool,
1014 option: Option<SynthesisOption>,
1015 wait_input_timeout: Option<u32>,
1016 base64: bool,
1017 cache_key: Option<String>,
1018 ) -> Result<()> {
1019 let tts_option = {
1020 let call_state = self.call_state.read().await;
1021 match call_state.option.clone().unwrap_or_default().tts {
1022 Some(opt) => opt.merge_with(option),
1023 None => {
1024 if let Some(opt) = option {
1025 opt
1026 } else {
1027 return Err(anyhow::anyhow!("no tts option available"));
1028 }
1029 }
1030 }
1031 };
1032 let speaker = match speaker {
1033 Some(s) => Some(s),
1034 None => tts_option.speaker.clone(),
1035 };
1036
1037 let mut play_command = SynthesisCommand {
1038 text,
1039 speaker,
1040 play_id: play_id.clone(),
1041 streaming,
1042 end_of_stream,
1043 option: tts_option,
1044 base64,
1045 cache_key,
1046 };
1047 info!(
1048 session_id = self.session_id,
1049 provider = ?play_command.option.provider,
1050 text = %play_command.text.chars().take(10).collect::<String>(),
1051 speaker = play_command.speaker.as_deref(),
1052 auto_hangup = auto_hangup.unwrap_or_default(),
1053 play_id = play_command.play_id.as_deref(),
1054 streaming = play_command.streaming,
1055 end_of_stream = play_command.end_of_stream,
1056 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
1057 is_base64 = play_command.base64,
1058 cache_key = play_command.cache_key.as_deref(),
1059 "new synthesis"
1060 );
1061
1062 let ssrc = rand::random::<u32>();
1063 let (should_interrupt, picked_ssrc) = {
1064 let mut state = self.call_state.write().await;
1065
1066 let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
1067 if play_id.is_some() && state.current_play_id != play_id {
1068 (ssrc, true)
1069 } else {
1070 (handle.ssrc, false)
1071 }
1072 } else {
1073 (ssrc, false)
1074 };
1075
1076 state.auto_hangup = match auto_hangup {
1077 Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
1078 _ => state.auto_hangup.clone(),
1079 };
1080 state.wait_input_timeout = wait_input_timeout;
1081
1082 state.current_play_id = play_id.clone();
1083 (changed, target_ssrc)
1084 };
1085
1086 if should_interrupt {
1087 let _ = self.do_interrupt(false).await;
1088 }
1089
1090 let existing_handle = self.call_state.read().await.tts_handle.clone();
1091 if let Some(tts_handle) = existing_handle {
1092 match tts_handle.try_send(play_command) {
1093 Ok(_) => return Ok(()),
1094 Err(e) => {
1095 play_command = e.0;
1096 }
1097 }
1098 }
1099
1100 let (new_handle, tts_track) = StreamEngine::create_tts_track(
1101 self.app_state.stream_engine.clone(),
1102 self.cancel_token.child_token(),
1103 self.session_id.clone(),
1104 self.server_side_track_id.clone(),
1105 picked_ssrc,
1106 play_id.clone(),
1107 streaming,
1108 &play_command.option,
1109 )
1110 .await?;
1111
1112 new_handle.try_send(play_command)?;
1113 self.call_state.write().await.tts_handle = Some(new_handle);
1114 self.update_track_wrapper(tts_track, play_id).await;
1115 Ok(())
1116 }
1117
1118 async fn do_play(
1119 &self,
1120 url: String,
1121 play_id: Option<String>,
1122 auto_hangup: Option<bool>,
1123 wait_input_timeout: Option<u32>,
1124 ) -> Result<()> {
1125 let ssrc = rand::random::<u32>();
1126 info!(
1127 session_id = self.session_id,
1128 ssrc, url, play_id, auto_hangup, "play file track"
1129 );
1130
1131 let play_id = play_id.or(Some(url.clone()));
1132
1133 let file_track = FileTrack::new(self.server_side_track_id.clone())
1134 .with_play_id(play_id.clone())
1135 .with_ssrc(ssrc)
1136 .with_path(url)
1137 .with_cancel_token(self.cancel_token.child_token());
1138
1139 {
1140 let mut state = self.call_state.write().await;
1141 state.tts_handle = None;
1142 state.auto_hangup = match auto_hangup {
1143 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1144 _ => None,
1145 };
1146 state.wait_input_timeout = wait_input_timeout;
1147 }
1148
1149 self.update_track_wrapper(Box::new(file_track), play_id)
1150 .await;
1151 Ok(())
1152 }
1153
1154 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1155 self.event_sender
1156 .send(SessionEvent::AddHistory {
1157 sender: Some(self.session_id.clone()),
1158 timestamp: crate::media::get_timestamp(),
1159 speaker,
1160 text,
1161 })
1162 .map(|_| ())
1163 .map_err(Into::into)
1164 }
1165
1166 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1167 {
1168 let mut state = self.call_state.write().await;
1169 state.tts_handle = None;
1170 state.moh = None;
1171 }
1172 self.media_stream
1173 .remove_track(&self.server_side_track_id, graceful)
1174 .await;
1175 Ok(())
1176 }
1177 async fn do_pause(&self) -> Result<()> {
1178 Ok(())
1179 }
1180 async fn do_resume(&self) -> Result<()> {
1181 Ok(())
1182 }
1183 async fn do_hangup(
1184 &self,
1185 reason: Option<CallRecordHangupReason>,
1186 initiator: Option<String>,
1187 headers: Option<HashMap<String, String>>,
1188 ) -> Result<()> {
1189 info!(
1190 session_id = self.session_id,
1191 ?reason,
1192 ?initiator,
1193 ?headers,
1194 "do_hangup"
1195 );
1196
1197 if let Some(headers) = headers {
1199 let h_val = serde_json::to_value(&headers).unwrap_or_default();
1200
1201 {
1203 let mut state = self.call_state.write().await;
1204 let mut extras = state.extras.take().unwrap_or_default();
1205 extras.insert("_hangup_headers".to_string(), h_val.clone());
1206 state.extras = Some(extras);
1207 }
1208
1209 {
1211 let mut pending = self.app_state.pending_params.lock().await;
1212 let params = pending
1213 .entry(self.session_id.clone())
1214 .or_insert_with(std::collections::HashMap::new);
1215 params.insert("_hangup_headers".to_string(), h_val);
1216 }
1217 } else {
1218 let state = self.call_state.read().await;
1220 if let Some(extras) = &state.extras {
1221 if let Some(h_val) = extras.get("_hangup_headers") {
1222 let mut pending = self.app_state.pending_params.lock().await;
1223 let params = pending
1224 .entry(self.session_id.clone())
1225 .or_insert_with(std::collections::HashMap::new);
1226 params.insert("_hangup_headers".to_string(), h_val.clone());
1227 }
1228 }
1229 }
1230
1231 let hangup_reason = match initiator.as_deref() {
1233 Some("caller") => CallRecordHangupReason::ByCaller,
1234 Some("callee") => CallRecordHangupReason::ByCallee,
1235 Some("system") => CallRecordHangupReason::Autohangup,
1236 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1237 };
1238
1239 self.media_stream
1240 .stop(Some(hangup_reason.to_string()), initiator);
1241
1242 self.call_state
1243 .write()
1244 .await
1245 .set_hangup_reason(hangup_reason);
1246 Ok(())
1247 }
1248
1249 async fn do_refer(
1250 &self,
1251 caller: String,
1252 callee: String,
1253 refer_option: Option<ReferOption>,
1254 ) -> Result<()> {
1255 self.do_interrupt(false).await.ok();
1256
1257 let pause_parent_asr = refer_option
1259 .as_ref()
1260 .and_then(|o| o.pause_parent_asr)
1261 .unwrap_or(false);
1262
1263 let original_asr_option = if pause_parent_asr {
1265 let cs = self.call_state.read().await;
1266 cs.option.as_ref().and_then(|o| o.asr.clone())
1267 } else {
1268 None
1269 };
1270
1271 if pause_parent_asr {
1273 info!(
1274 session_id = self.session_id,
1275 "Pausing parent call ASR during refer"
1276 );
1277 self.media_stream
1278 .remove_processor::<crate::media::asr_processor::AsrProcessor>(
1279 &self.server_side_track_id,
1280 )
1281 .await
1282 .ok();
1283 }
1284
1285 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1286 if let Some(ref path) = moh {
1287 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1288 let fallback = "./config/sounds/refer_moh.wav";
1289 if std::path::Path::new(fallback).exists() {
1290 info!(
1291 session_id = self.session_id,
1292 "moh {} not found, using fallback {}", path, fallback
1293 );
1294 moh = Some(fallback.to_string());
1295 }
1296 }
1297 }
1298 let ref_call_id = refer_option
1299 .as_ref()
1300 .and_then(|o| o.call_id.clone())
1301 .unwrap_or_else(|| format!("ref-{}-{}", rand::random::<u32>(), self.session_id));
1302
1303 let session_id = self.session_id.clone();
1304 let track_id = self.server_side_track_id.clone();
1305
1306 let recorder = {
1307 let cs = self.call_state.read().await;
1308 cs.option
1309 .as_ref()
1310 .map(|o| o.recorder.clone())
1311 .unwrap_or_default()
1312 };
1313
1314 let call_option = CallOption {
1315 caller: Some(caller),
1316 callee: Some(callee.clone()),
1317 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1318 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1319 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1320 recorder,
1321 ..Default::default()
1322 };
1323
1324 let mut invite_option = call_option.build_invite_option()?;
1325 invite_option.call_id = Some(ref_call_id);
1326
1327 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1328
1329 {
1330 let cs = self.call_state.read().await;
1331 if let Some(opt) = cs.option.as_ref() {
1332 if let Some(callee) = opt.callee.as_ref() {
1333 headers.push(rsip::Header::Other(
1334 "X-Referred-To".to_string(),
1335 callee.clone(),
1336 ));
1337 }
1338 if let Some(caller) = opt.caller.as_ref() {
1339 headers.push(rsip::Header::Other(
1340 "X-Referred-From".to_string(),
1341 caller.clone(),
1342 ));
1343 }
1344 }
1345 }
1346
1347 headers.push(rsip::Header::Other(
1348 "X-Referred-Id".to_string(),
1349 self.session_id.clone(),
1350 ));
1351
1352 let ssrc = rand::random::<u32>();
1353 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1354 start_time: Utc::now(),
1355 ssrc,
1356 option: Some(call_option.clone()),
1357 is_refer: true,
1358 ..Default::default()
1359 }));
1360
1361 {
1362 let mut cs = self.call_state.write().await;
1363 cs.refer_callstate.replace(refer_call_state.clone());
1364 }
1365
1366 let auto_hangup_requested = refer_option
1367 .as_ref()
1368 .and_then(|o| o.auto_hangup)
1369 .unwrap_or(true);
1370
1371 if auto_hangup_requested {
1372 self.call_state.write().await.auto_hangup =
1373 Some((ssrc, CallRecordHangupReason::ByRefer));
1374 } else {
1375 self.call_state.write().await.auto_hangup = None;
1376 }
1377
1378 if !auto_hangup_requested && pause_parent_asr && original_asr_option.is_some() {
1380 let asr_option = original_asr_option.unwrap();
1381 self.call_state.write().await.pending_asr_resume = Some((ssrc, asr_option));
1382 }
1383
1384 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1385
1386 info!(
1387 session_id = self.session_id,
1388 ssrc,
1389 auto_hangup = auto_hangup_requested,
1390 callee,
1391 timeout_secs,
1392 "do_refer"
1393 );
1394
1395 let r = tokio::time::timeout(
1396 Duration::from_secs(timeout_secs as u64),
1397 self.create_outgoing_sip_track(
1398 self.cancel_token.child_token(),
1399 refer_call_state.clone(),
1400 &track_id,
1401 invite_option,
1402 &call_option,
1403 moh,
1404 auto_hangup_requested,
1405 ),
1406 )
1407 .await;
1408
1409 {
1410 self.call_state.write().await.moh = None;
1411 }
1412
1413 let result = match r {
1414 Ok(res) => res,
1415 Err(_) => {
1416 warn!(
1417 session_id = session_id,
1418 "refer sip track creation timed out after {} seconds", timeout_secs
1419 );
1420 self.event_sender
1421 .send(SessionEvent::Reject {
1422 track_id,
1423 timestamp: crate::media::get_timestamp(),
1424 reason: "Timeout when refer".into(),
1425 code: Some(408),
1426 refer: Some(true),
1427 })
1428 .ok();
1429 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1430 }
1431 };
1432
1433 match result {
1434 Ok(answer) => {
1435 self.event_sender
1436 .send(SessionEvent::Answer {
1437 timestamp: crate::media::get_timestamp(),
1438 track_id,
1439 sdp: answer,
1440 refer: Some(true),
1441 })
1442 .ok();
1443 }
1444 Err(e) => {
1445 warn!(
1446 session_id = session_id,
1447 "failed to create refer sip track: {}", e
1448 );
1449 match &e {
1450 rsipstack::Error::DialogError(reason, _, code) => {
1451 self.event_sender
1452 .send(SessionEvent::Reject {
1453 track_id,
1454 timestamp: crate::media::get_timestamp(),
1455 reason: reason.clone(),
1456 code: Some(code.code() as u32),
1457 refer: Some(true),
1458 })
1459 .ok();
1460 }
1461 _ => {}
1462 }
1463 return Err(e.into());
1464 }
1465 }
1466 Ok(())
1467 }
1468
1469 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1470 self.media_stream.mute_track(track_id).await;
1471 Ok(())
1472 }
1473
1474 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1475 self.media_stream.unmute_track(track_id).await;
1476 Ok(())
1477 }
1478
1479 pub async fn cleanup(&self) -> Result<()> {
1480 self.call_state.write().await.tts_handle = None;
1481 self.media_stream.cleanup().await.ok();
1482 Ok(())
1483 }
1484
1485 pub fn get_callrecord(&self) -> Option<CallRecord> {
1486 self.call_state.try_read().ok().map(|call_state| {
1487 call_state.build_callrecord(
1488 self.app_state.clone(),
1489 self.session_id.clone(),
1490 self.call_type.clone(),
1491 )
1492 })
1493 }
1494
1495 async fn dump_to_file(
1496 &self,
1497 dump_file: &mut File,
1498 cmd_receiver: &mut CommandReceiver,
1499 event_receiver: &mut EventReceiver,
1500 ) {
1501 loop {
1502 select! {
1503 _ = self.cancel_token.cancelled() => {
1504 break;
1505 }
1506 Ok(cmd) = cmd_receiver.recv() => {
1507 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1508 .await;
1509 }
1510 Ok(event) = event_receiver.recv() => {
1511 if matches!(event, SessionEvent::Binary{..}) {
1512 continue;
1513 }
1514 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1515 .await;
1516 }
1517 };
1518 }
1519 }
1520
1521 async fn dump_loop(
1522 &self,
1523 dump_events: bool,
1524 mut dump_cmd_receiver: CommandReceiver,
1525 mut dump_event_receiver: EventReceiver,
1526 ) {
1527 if !dump_events {
1528 return;
1529 }
1530
1531 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1532 let mut dump_file = match File::options()
1533 .create(true)
1534 .append(true)
1535 .open(&file_name)
1536 .await
1537 {
1538 Ok(file) => file,
1539 Err(e) => {
1540 warn!(
1541 session_id = self.session_id,
1542 file_name, "failed to open dump events file: {}", e
1543 );
1544 return;
1545 }
1546 };
1547 self.dump_to_file(
1548 &mut dump_file,
1549 &mut dump_cmd_receiver,
1550 &mut dump_event_receiver,
1551 )
1552 .await;
1553
1554 while let Ok(event) = dump_event_receiver.try_recv() {
1555 if matches!(event, SessionEvent::Binary { .. }) {
1556 continue;
1557 }
1558 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1559 }
1560 }
1561
1562 pub async fn create_rtp_track(
1563 &self,
1564 track_id: TrackId,
1565 ssrc: u32,
1566 enable_srtp: Option<bool>,
1567 ) -> Result<RtcTrack> {
1568 let mut rtc_config = RtcTrackConfig::default();
1569 let use_srtp = enable_srtp
1571 .or(self.app_state.config.enable_srtp)
1572 .unwrap_or(false);
1573 rtc_config.mode = if use_srtp {
1574 rustrtc::TransportMode::Srtp
1575 } else {
1576 rustrtc::TransportMode::Rtp
1577 };
1578
1579 if let Some(codecs) = &self.app_state.config.codecs {
1580 let mut codec_types = Vec::new();
1581 for c in codecs {
1582 match c.to_lowercase().as_str() {
1583 "pcmu" => codec_types.push(CodecType::PCMU),
1584 "pcma" => codec_types.push(CodecType::PCMA),
1585 "g722" => codec_types.push(CodecType::G722),
1586 "g729" => codec_types.push(CodecType::G729),
1587 #[cfg(feature = "opus")]
1588 "opus" => codec_types.push(CodecType::Opus),
1589 "dtmf" | "2833" | "telephone_event" => {
1590 codec_types.push(CodecType::TelephoneEvent)
1591 }
1592 _ => {}
1593 }
1594 }
1595 if !codec_types.is_empty() {
1596 rtc_config.preferred_codec = Some(codec_types[0].clone());
1597 rtc_config.codecs = codec_types;
1598 }
1599 }
1600
1601 if rtc_config.preferred_codec.is_none() {
1602 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1603 }
1604
1605 rtc_config.rtp_port_range = self
1606 .app_state
1607 .config
1608 .rtp_start_port
1609 .zip(self.app_state.config.rtp_end_port);
1610
1611 if let Some(ref external_ip) = self.app_state.config.external_ip {
1612 rtc_config.external_ip = Some(external_ip.clone());
1613 }
1614 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1615 rtc_config.bind_ip = Some(bind_ip.clone());
1616 }
1617
1618 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1619 rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
1620
1621 let mut track = RtcTrack::new(
1622 self.cancel_token.child_token(),
1623 track_id,
1624 self.track_config.clone(),
1625 rtc_config,
1626 )
1627 .with_ssrc(ssrc);
1628
1629 track.create().await?;
1630
1631 Ok(track)
1632 }
1633
1634 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1635 let hangup_headers = option
1636 .sip
1637 .as_ref()
1638 .and_then(|s| s.hangup_headers.as_ref())
1639 .map(|headers_map| {
1640 headers_map
1641 .iter()
1642 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1643 .collect::<Vec<rsip::Header>>()
1644 });
1645 self.call_state.write().await.option = Some(option.clone());
1646 info!(
1647 session_id = self.session_id,
1648 call_type = ?self.call_type,
1649 "setup caller track"
1650 );
1651
1652 let track = match self.call_type {
1653 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1654 ActiveCallType::WebSocket => {
1655 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1656 if let Some(receiver) = audio_receiver {
1657 Some(self.create_websocket_track(receiver).await?)
1658 } else {
1659 None
1660 }
1661 }
1662 ActiveCallType::Sip => {
1663 if let Some(dialog_id) = self
1664 .invitation
1665 .find_dialog_id_by_session_id(&self.session_id)
1666 {
1667 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1668 return self
1669 .prepare_incoming_sip_track(
1670 self.cancel_token.clone(),
1671 self.call_state.clone(),
1672 &self.session_id,
1673 pending_dialog,
1674 hangup_headers,
1675 )
1676 .await;
1677 }
1678 }
1679
1680 let mut option = option.clone();
1682 if option.sip.is_none()
1683 || option
1684 .sip
1685 .as_ref()
1686 .and_then(|s| s.username.as_ref())
1687 .is_none()
1688 {
1689 if let Some(callee) = &option.callee {
1690 if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1691 if option.sip.is_none() {
1692 option.sip = Some(crate::SipOption {
1693 username: Some(cred.username.clone()),
1694 password: Some(cred.password.clone()),
1695 realm: cred.realm.clone(),
1696 ..Default::default()
1697 });
1698 }
1699 }
1700 }
1701 }
1702
1703 let mut invite_option = option.build_invite_option()?;
1704 invite_option.call_id = Some(self.session_id.clone());
1705
1706 match self
1707 .create_outgoing_sip_track(
1708 self.cancel_token.clone(),
1709 self.call_state.clone(),
1710 &self.session_id,
1711 invite_option,
1712 &option,
1713 None,
1714 false,
1715 )
1716 .await
1717 {
1718 Ok(answer) => {
1719 self.event_sender
1720 .send(SessionEvent::Answer {
1721 timestamp: crate::media::get_timestamp(),
1722 track_id: self.session_id.clone(),
1723 sdp: answer,
1724 refer: Some(false),
1725 })
1726 .ok();
1727 return Ok(());
1728 }
1729 Err(e) => {
1730 warn!(
1731 session_id = self.session_id,
1732 "failed to create sip track: {}", e
1733 );
1734 match &e {
1735 rsipstack::Error::DialogError(reason, _, code) => {
1736 self.event_sender
1737 .send(SessionEvent::Reject {
1738 track_id: self.session_id.clone(),
1739 timestamp: crate::media::get_timestamp(),
1740 reason: reason.clone(),
1741 code: Some(code.code() as u32),
1742 refer: Some(false),
1743 })
1744 .ok();
1745 }
1746 _ => {}
1747 }
1748 return Err(e.into());
1749 }
1750 }
1751 }
1752 ActiveCallType::B2bua => {
1753 if let Some(dialog_id) = self
1754 .invitation
1755 .find_dialog_id_by_session_id(&self.session_id)
1756 {
1757 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1758 return self
1759 .prepare_incoming_sip_track(
1760 self.cancel_token.clone(),
1761 self.call_state.clone(),
1762 &self.session_id,
1763 pending_dialog,
1764 hangup_headers,
1765 )
1766 .await;
1767 }
1768 }
1769
1770 warn!(
1771 session_id = self.session_id,
1772 "no pending dialog found for B2BUA call"
1773 );
1774 return Err(anyhow::anyhow!(
1775 "no pending dialog found for session_id: {}",
1776 self.session_id
1777 ));
1778 }
1779 };
1780 match track {
1781 Some(track) => {
1782 self.finish_caller_stack(&option, Some(track)).await?;
1783 }
1784 None => {
1785 warn!(session_id = self.session_id, "no track created for caller");
1786 return Err(anyhow::anyhow!("no track created for caller"));
1787 }
1788 }
1789 Ok(())
1790 }
1791
1792 async fn finish_caller_stack(
1793 &self,
1794 option: &CallOption,
1795 track: Option<Box<dyn Track>>,
1796 ) -> Result<()> {
1797 if let Some(track) = track {
1798 self.setup_track_with_stream(&option, track).await?;
1799 }
1800
1801 {
1802 let call_state = self.call_state.read().await;
1803 if let Some(ref answer) = call_state.answer {
1804 info!(
1805 session_id = self.session_id,
1806 "sending answer event: {}", answer,
1807 );
1808 self.event_sender
1809 .send(SessionEvent::Answer {
1810 timestamp: crate::media::get_timestamp(),
1811 track_id: self.session_id.clone(),
1812 sdp: answer.clone(),
1813 refer: Some(false),
1814 })
1815 .ok();
1816 } else {
1817 warn!(
1818 session_id = self.session_id,
1819 "no answer in state to send event"
1820 );
1821 }
1822 }
1823 Ok(())
1824 }
1825
1826 pub async fn setup_track_with_stream(
1827 &self,
1828 option: &CallOption,
1829 mut track: Box<dyn Track>,
1830 ) -> Result<()> {
1831 let processors = match StreamEngine::create_processors(
1832 self.app_state.stream_engine.clone(),
1833 track.as_ref(),
1834 self.cancel_token.child_token(),
1835 self.event_sender.clone(),
1836 self.media_stream.packet_sender.clone(),
1837 option,
1838 )
1839 .await
1840 {
1841 Ok(processors) => processors,
1842 Err(e) => {
1843 warn!(
1844 session_id = self.session_id,
1845 "failed to prepare stream processors: {}", e
1846 );
1847 vec![]
1848 }
1849 };
1850
1851 for processor in processors {
1853 track.append_processor(processor);
1854 }
1855
1856 self.update_track_wrapper(track, None).await;
1857 Ok(())
1858 }
1859
1860 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1861 let (ambiance_opt, subscribe) = {
1862 let state = self.call_state.read().await;
1863 let mut opt = state
1864 .option
1865 .as_ref()
1866 .and_then(|o| o.ambiance.clone())
1867 .unwrap_or_default();
1868
1869 if let Some(global) = &self.app_state.config.ambiance {
1870 opt.merge(global);
1871 }
1872
1873 let subscribe = state
1874 .option
1875 .as_ref()
1876 .and_then(|o| o.subscribe)
1877 .unwrap_or_default();
1878
1879 (opt, subscribe)
1880 };
1881 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1882 match AmbianceProcessor::new(ambiance_opt).await {
1883 Ok(ambiance) => {
1884 info!(session_id = self.session_id, "loaded ambiance processor");
1885 track.append_processor(Box::new(ambiance));
1886 }
1887 Err(e) => {
1888 tracing::error!("failed to load ambiance wav {}", e);
1889 }
1890 }
1891 }
1892
1893 if subscribe && self.call_type != ActiveCallType::WebSocket {
1894 let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1895 (0, self.server_side_track_id.clone())
1896 } else {
1897 (1, self.session_id.clone())
1898 };
1899 let sub_processor =
1900 SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1901 track.append_processor(Box::new(sub_processor));
1902 }
1903
1904 self.call_state.write().await.current_play_id = play_id.clone();
1905 self.media_stream.update_track(track, play_id).await;
1906 }
1907
1908 pub async fn create_websocket_track(
1909 &self,
1910 audio_receiver: WebsocketBytesReceiver,
1911 ) -> Result<Box<dyn Track>> {
1912 let (ssrc, codec) = {
1913 let call_state = self.call_state.read().await;
1914 (
1915 call_state.ssrc,
1916 call_state
1917 .option
1918 .as_ref()
1919 .map(|o| o.codec.clone())
1920 .unwrap_or_default(),
1921 )
1922 };
1923
1924 let ws_track = WebsocketTrack::new(
1925 self.cancel_token.child_token(),
1926 self.session_id.clone(),
1927 self.track_config.clone(),
1928 self.event_sender.clone(),
1929 audio_receiver,
1930 codec,
1931 ssrc,
1932 );
1933
1934 {
1935 let mut call_state = self.call_state.write().await;
1936 call_state.answer_time = Some(Utc::now());
1937 call_state.answer = Some("".to_string());
1938 call_state.last_status_code = 200;
1939 }
1940
1941 Ok(Box::new(ws_track))
1942 }
1943
1944 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1945 let (ssrc, option) = {
1946 let call_state = self.call_state.read().await;
1947 (
1948 call_state.ssrc,
1949 call_state.option.clone().unwrap_or_default(),
1950 )
1951 };
1952
1953 let mut rtc_config = RtcTrackConfig::default();
1954 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1956
1957 if let Some(codecs) = &self.app_state.config.codecs {
1958 let mut codec_types = Vec::new();
1959 for c in codecs {
1960 match c.to_lowercase().as_str() {
1961 "pcmu" => codec_types.push(CodecType::PCMU),
1962 "pcma" => codec_types.push(CodecType::PCMA),
1963 "g722" => codec_types.push(CodecType::G722),
1964 "g729" => codec_types.push(CodecType::G729),
1965 #[cfg(feature = "opus")]
1966 "opus" => codec_types.push(CodecType::Opus),
1967 "dtmf" | "2833" | "telephone_event" => {
1968 codec_types.push(CodecType::TelephoneEvent)
1969 }
1970 _ => {}
1971 }
1972 }
1973 if !codec_types.is_empty() {
1974 rtc_config.preferred_codec = Some(codec_types[0].clone());
1975 rtc_config.codecs = codec_types;
1976 }
1977 }
1978
1979 if let Some(ref external_ip) = self.app_state.config.external_ip {
1980 rtc_config.external_ip = Some(external_ip.clone());
1981 }
1982 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1983 rtc_config.bind_ip = Some(bind_ip.clone());
1984 }
1985
1986 let mut webrtc_track = RtcTrack::new(
1987 self.cancel_token.child_token(),
1988 self.session_id.clone(),
1989 self.track_config.clone(),
1990 rtc_config,
1991 )
1992 .with_ssrc(ssrc);
1993
1994 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1995 let offer = match option.enable_ipv6 {
1996 Some(false) | None => {
1997 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1998 }
1999 _ => option.offer.clone().unwrap_or("".to_string()),
2000 };
2001 let answer: Option<String>;
2002 match webrtc_track.handshake(offer, timeout).await {
2003 Ok(answer_sdp) => {
2004 answer = match option.enable_ipv6 {
2005 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
2006 Some(true) => Some(answer_sdp.to_string()),
2007 };
2008 }
2009 Err(e) => {
2010 warn!(session_id = self.session_id, "failed to setup track: {}", e);
2011 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
2012 }
2013 }
2014
2015 {
2016 let mut call_state = self.call_state.write().await;
2017 call_state.answer_time = Some(Utc::now());
2018 call_state.answer = answer;
2019 call_state.last_status_code = 200;
2020 }
2021 Ok(Box::new(webrtc_track))
2022 }
2023
2024 async fn create_outgoing_sip_track(
2025 &self,
2026 cancel_token: CancellationToken,
2027 call_state_ref: ActiveCallStateRef,
2028 track_id: &String,
2029 mut invite_option: InviteOption,
2030 call_option: &CallOption,
2031 moh: Option<String>,
2032 auto_hangup: bool,
2033 ) -> Result<String, rsipstack::Error> {
2034 let ssrc = call_state_ref.read().await.ssrc;
2035 let per_call_srtp = call_option.sip.as_ref().and_then(|s| s.enable_srtp);
2036 let rtp_track = self
2037 .create_rtp_track(track_id.clone(), ssrc, per_call_srtp)
2038 .await
2039 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2040
2041 let offer = Some(
2042 rtp_track
2043 .local_description()
2044 .await
2045 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
2046 );
2047
2048 {
2049 let mut cs = call_state_ref.write().await;
2050 if let Some(o) = cs.option.as_mut() {
2051 o.offer = offer.clone();
2052 }
2053 cs.start_time = Utc::now();
2054 };
2055
2056 invite_option.offer = offer.clone().map(|s| s.into());
2057
2058 let needs_contact = invite_option.contact.scheme.is_none()
2061 || invite_option
2062 .contact
2063 .host_with_port
2064 .to_string()
2065 .starts_with("127.0.0.1");
2066
2067 if needs_contact {
2068 if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
2069 invite_option.contact = rsip::Uri {
2070 scheme: Some(rsip::Scheme::Sip),
2071 auth: None,
2072 host_with_port: addr.addr.clone(),
2073 params: vec![],
2074 headers: vec![],
2075 };
2076 }
2077 }
2078
2079 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
2080
2081 if let Some(moh) = moh {
2082 let ssrc_and_moh = {
2083 let mut state = call_state_ref.write().await;
2084 state.moh = Some(moh.clone());
2085 if state.current_play_id.is_none() {
2086 let ssrc = rand::random::<u32>();
2087 Some((ssrc, moh.clone()))
2088 } else {
2089 info!(
2090 session_id = self.session_id,
2091 "Something is playing, MOH will start after it ends"
2092 );
2093 None
2094 }
2095 };
2096
2097 if let Some((ssrc, moh_path)) = ssrc_and_moh {
2098 let file_track = FileTrack::new(self.server_side_track_id.clone())
2099 .with_play_id(Some(moh_path.clone()))
2100 .with_ssrc(ssrc)
2101 .with_path(moh_path.clone())
2102 .with_cancel_token(self.cancel_token.child_token());
2103 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
2104 .await;
2105 }
2106 } else {
2107 let track = rtp_track_to_setup.take().unwrap();
2108 self.setup_track_with_stream(&call_option, track)
2109 .await
2110 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2111 }
2112
2113 info!(
2114 session_id = self.session_id,
2115 track_id,
2116 contact = %invite_option.contact,
2117 "invite {} -> {} offer: \n{}",
2118 invite_option.caller,
2119 invite_option.callee,
2120 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
2121 );
2122
2123 let (dlg_state_sender, dlg_state_receiver) =
2124 self.invitation.dialog_layer.new_dialog_state_channel();
2125
2126 let states = InviteDialogStates {
2127 is_client: true,
2128 session_id: self.session_id.clone(),
2129 track_id: track_id.clone(),
2130 event_sender: self.event_sender.clone(),
2131 media_stream: self.media_stream.clone(),
2132 call_state: call_state_ref.clone(),
2133 cancel_token,
2134 terminated_reason: None,
2135 has_early_media: false,
2136 };
2137
2138 let hangup_headers = call_option
2139 .sip
2140 .as_ref()
2141 .and_then(|s| s.hangup_headers.as_ref())
2142 .map(|headers_map| {
2143 headers_map
2144 .iter()
2145 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
2146 .collect::<Vec<rsip::Header>>()
2147 });
2148
2149 let mut client_dialog_handler = DialogStateReceiverGuard::new(
2150 self.invitation.dialog_layer.clone(),
2151 dlg_state_receiver,
2152 hangup_headers,
2153 );
2154
2155 crate::spawn(async move {
2156 client_dialog_handler.process_dialog(states).await;
2157 });
2158
2159 let (dialog_id, answer) = self
2160 .invitation
2161 .invite(invite_option, dlg_state_sender)
2162 .await?;
2163
2164 self.call_state.write().await.moh = None;
2165
2166 if let Some(track) = rtp_track_to_setup {
2167 info!(
2168 session_id = self.session_id,
2169 track_id, "Stopping MOH and setting up RTP track"
2170 );
2171 self.media_stream
2172 .remove_track(&self.server_side_track_id, false)
2173 .await;
2174
2175 self.setup_track_with_stream(&call_option, track)
2176 .await
2177 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2178 }
2179
2180 let answer = match answer {
2181 Some(answer) => {
2182 let s = String::from_utf8_lossy(&answer).to_string();
2183 if s.trim().is_empty() {
2184 let cs = call_state_ref.read().await;
2188 match cs.answer.clone() {
2189 Some(early_sdp) if !early_sdp.is_empty() => {
2190 info!(
2191 session_id = self.session_id,
2192 "200 OK has empty body; using early-media SDP from 183"
2193 );
2194 (early_sdp, true )
2195 }
2196 _ => {
2197 warn!(
2198 session_id = self.session_id,
2199 "200 OK has empty body and no early-media SDP available"
2200 );
2201 (s, false)
2202 }
2203 }
2204 } else {
2205 (s, false)
2206 }
2207 }
2208 None => {
2209 let cs = call_state_ref.read().await;
2211 match cs.answer.clone() {
2212 Some(early_sdp) if !early_sdp.is_empty() => {
2213 info!(
2214 session_id = self.session_id,
2215 "200 OK had no answer; using early-media SDP from 183"
2216 );
2217 (early_sdp, true )
2218 }
2219 _ => {
2220 warn!(session_id = self.session_id, "no answer received");
2221 return Err(rsipstack::Error::DialogError(
2222 "No answer received".to_string(),
2223 dialog_id,
2224 rsip::StatusCode::NotAcceptableHere,
2225 ));
2226 }
2227 }
2228 }
2229 };
2230 let (answer, remote_description_already_applied) = answer;
2231
2232 {
2233 let mut cs = call_state_ref.write().await;
2234 if cs.answer.is_none() {
2235 cs.answer = Some(answer.clone());
2236 }
2237 if auto_hangup {
2238 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2239 }
2240 }
2241 if !remote_description_already_applied {
2242 self.media_stream
2243 .update_remote_description(&track_id, &answer)
2244 .await
2245 .ok();
2246 }
2247
2248 Ok(answer)
2249 }
2250
2251 pub fn is_webrtc_sdp(sdp: &str) -> bool {
2253 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2254 && sdp.contains("a=fingerprint:")
2255 }
2256
2257 pub async fn setup_answer_track(
2258 &self,
2259 ssrc: u32,
2260 option: &CallOption,
2261 offer: String,
2262 ) -> Result<(String, Box<dyn Track>)> {
2263 let offer = match option.enable_ipv6 {
2264 Some(false) | None => strip_ipv6_candidates(&offer),
2265 _ => offer.clone(),
2266 };
2267
2268 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2269
2270 let mut media_track = if Self::is_webrtc_sdp(&offer) {
2271 let mut rtc_config = RtcTrackConfig::default();
2272 rtc_config.mode = rustrtc::TransportMode::WebRtc;
2273 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2274 if let Some(ref external_ip) = self.app_state.config.external_ip {
2275 rtc_config.external_ip = Some(external_ip.clone());
2276 }
2277 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2278 rtc_config.bind_ip = Some(bind_ip.clone());
2279 }
2280 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2281 rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
2282
2283 let webrtc_track = RtcTrack::new(
2284 self.cancel_token.child_token(),
2285 self.session_id.clone(),
2286 self.track_config.clone(),
2287 rtc_config,
2288 )
2289 .with_ssrc(ssrc);
2290
2291 Box::new(webrtc_track) as Box<dyn Track>
2292 } else {
2293 let per_call_srtp = option.sip.as_ref().and_then(|s| s.enable_srtp);
2294 let rtp_track = self
2295 .create_rtp_track(self.session_id.clone(), ssrc, per_call_srtp)
2296 .await?;
2297 Box::new(rtp_track) as Box<dyn Track>
2298 };
2299
2300 let answer = match media_track.handshake(offer.clone(), timeout).await {
2301 Ok(answer) => answer,
2302 Err(e) => {
2303 return Err(anyhow::anyhow!("handshake failed: {e}"));
2304 }
2305 };
2306
2307 return Ok((answer, media_track));
2308 }
2309
2310 pub async fn prepare_incoming_sip_track(
2311 &self,
2312 cancel_token: CancellationToken,
2313 call_state_ref: ActiveCallStateRef,
2314 track_id: &String,
2315 pending_dialog: PendingDialog,
2316 hangup_headers: Option<Vec<rsip::Header>>,
2317 ) -> Result<()> {
2318 let state_receiver = pending_dialog.state_receiver;
2319 let states = InviteDialogStates {
2322 is_client: false,
2323 session_id: self.session_id.clone(),
2324 track_id: track_id.clone(),
2325 event_sender: self.event_sender.clone(),
2326 media_stream: self.media_stream.clone(),
2327 call_state: self.call_state.clone(),
2328 cancel_token,
2329 terminated_reason: None,
2330 has_early_media: false,
2331 };
2332
2333 let initial_request = pending_dialog.dialog.initial_request();
2334 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2335
2336 let (ssrc, option) = {
2337 let call_state = call_state_ref.read().await;
2338 (
2339 call_state.ssrc,
2340 call_state.option.clone().unwrap_or_default(),
2341 )
2342 };
2343
2344 match self.setup_answer_track(ssrc, &option, offer).await {
2345 Ok((offer, track)) => {
2346 self.setup_track_with_stream(&option, track).await?;
2347 {
2348 let mut state = self.call_state.write().await;
2349 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2350 }
2351 }
2352 Err(e) => {
2353 return Err(anyhow::anyhow!("error creating track: {}", e));
2354 }
2355 }
2356
2357 let mut client_dialog_handler = DialogStateReceiverGuard::new(
2358 self.invitation.dialog_layer.clone(),
2359 state_receiver,
2360 hangup_headers,
2361 );
2362
2363 crate::spawn(async move {
2364 client_dialog_handler.process_dialog(states).await;
2365 });
2366 Ok(())
2367 }
2368}
2369
2370impl Drop for ActiveCall {
2371 fn drop(&mut self) {
2372 info!(session_id = self.session_id, "dropping active call");
2373 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2374 if let Some(record) = self.get_callrecord() {
2375 if let Err(e) = sender.send(record) {
2376 warn!(
2377 session_id = self.session_id,
2378 "failed to send call record: {}", e
2379 );
2380 }
2381 }
2382 }
2383 }
2384}
2385
2386impl ActiveCallState {
2387 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2388 if let Some(existing) = &self.option {
2389 if option.asr.is_none() {
2390 option.asr = existing.asr.clone();
2391 }
2392 if option.tts.is_none() {
2393 option.tts = existing.tts.clone();
2394 }
2395 if option.vad.is_none() {
2396 option.vad = existing.vad.clone();
2397 }
2398 if option.denoise.is_none() {
2399 option.denoise = existing.denoise;
2400 }
2401 if option.recorder.is_none() {
2402 option.recorder = existing.recorder.clone();
2403 }
2404 if option.eou.is_none() {
2405 option.eou = existing.eou.clone();
2406 }
2407 if option.extra.is_none() {
2408 option.extra = existing.extra.clone();
2409 }
2410 if option.ambiance.is_none() {
2411 option.ambiance = existing.ambiance.clone();
2412 }
2413 }
2414 option
2415 }
2416
2417 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2418 if self.hangup_reason.is_none() {
2419 self.hangup_reason = Some(reason);
2420 }
2421 }
2422
2423 pub fn build_hangup_event(
2424 &self,
2425 track_id: TrackId,
2426 initiator: Option<String>,
2427 ) -> crate::event::SessionEvent {
2428 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2429 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2430 let extra = self.extras.clone();
2431
2432 crate::event::SessionEvent::Hangup {
2433 track_id,
2434 timestamp: crate::media::get_timestamp(),
2435 reason: Some(format!("{:?}", self.hangup_reason)),
2436 initiator,
2437 start_time: self.start_time.to_rfc3339(),
2438 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2439 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2440 hangup_time: Utc::now().to_rfc3339(),
2441 extra,
2442 from: from.map(|f| f.into()),
2443 to: to.map(|f| f.into()),
2444 refer: Some(self.is_refer),
2445 }
2446 }
2447
2448 pub fn build_callrecord(
2449 &self,
2450 app_state: AppState,
2451 session_id: String,
2452 call_type: ActiveCallType,
2453 ) -> CallRecord {
2454 let option = self.option.clone().unwrap_or_default();
2455 let recorder = if option.recorder.is_some() {
2456 let recorder_file = app_state.get_recorder_file(&session_id);
2457 if std::path::Path::new(&recorder_file).exists() {
2458 let file_size = std::fs::metadata(&recorder_file)
2459 .map(|m| m.len())
2460 .unwrap_or(0);
2461 vec![crate::callrecord::CallRecordMedia {
2462 track_id: session_id.clone(),
2463 path: recorder_file,
2464 size: file_size,
2465 extra: None,
2466 }]
2467 } else {
2468 vec![]
2469 }
2470 } else {
2471 vec![]
2472 };
2473
2474 let dump_event_file = app_state.get_dump_events_file(&session_id);
2475 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2476 Some(dump_event_file)
2477 } else {
2478 None
2479 };
2480
2481 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2482 if let Ok(rc) = rc.try_read() {
2483 Some(Box::new(rc.build_callrecord(
2484 app_state.clone(),
2485 rc.session_id.clone(),
2486 ActiveCallType::B2bua,
2487 )))
2488 } else {
2489 None
2490 }
2491 });
2492
2493 let caller = option.caller.clone().unwrap_or_default();
2494 let callee = option.callee.clone().unwrap_or_default();
2495
2496 CallRecord {
2497 option: Some(option),
2498 call_id: session_id,
2499 call_type,
2500 start_time: self.start_time,
2501 ring_time: self.ring_time.clone(),
2502 answer_time: self.answer_time.clone(),
2503 end_time: Utc::now(),
2504 caller,
2505 callee,
2506 hangup_reason: self.hangup_reason.clone(),
2507 hangup_messages: Vec::new(),
2508 status_code: self.last_status_code,
2509 extras: self.extras.clone(),
2510 dump_event_file,
2511 recorder,
2512 refer_callrecord,
2513 }
2514 }
2515}