1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 processor::SubscribeProcessor,
11 recorder::RecorderOption,
12 stream::{MediaStream, MediaStreamBuilder},
13 track::{
14 Track, TrackConfig,
15 file::FileTrack,
16 media_pass::MediaPassTrack,
17 rtc::{RtcTrack, RtcTrackConfig},
18 tts::SynthesisHandle,
19 websocket::{WebsocketBytesReceiver, WebsocketTrack},
20 },
21 },
22 synthesis::{SynthesisCommand, SynthesisOption},
23};
24use crate::{
25 app::AppState,
26 call::{
27 CommandReceiver, CommandSender,
28 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
29 },
30 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
31 useragent::invitation::PendingDialog,
32};
33use anyhow::Result;
34use audio_codec::CodecType;
35use chrono::{DateTime, Utc};
36use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
37use serde::{Deserialize, Serialize};
38use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
39use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
40use tokio_util::sync::CancellationToken;
41use tracing::{debug, info, warn};
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46 use crate::app::AppStateBuilder;
47 use crate::callrecord::CallRecordHangupReason;
48 use crate::config::Config;
49 use crate::media::track::tts::SynthesisHandle;
50 use crate::synthesis::SynthesisCommand;
51 use tokio::sync::mpsc;
52
53 #[tokio::test]
54 async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
55 let mut config = Config::default();
56 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
58 let stream_engine = Arc::new(StreamEngine::default());
59 let app_state = AppStateBuilder::new()
60 .with_config(config)
61 .with_stream_engine(stream_engine)
62 .build()
63 .await?;
64
65 let cancel_token = CancellationToken::new();
66 let session_id = "test-session".to_string();
67 let track_config = TrackConfig::default();
68
69 let mut option = crate::CallOption::default();
70 option.tts = Some(crate::synthesis::SynthesisOption::default());
71
72 let active_call = Arc::new(ActiveCall::new(
73 ActiveCallType::Sip,
74 cancel_token.clone(),
75 session_id.clone(),
76 app_state.invitation.clone(),
77 app_state.clone(),
78 track_config,
79 None,
80 false,
81 None,
82 None,
83 ));
84
85 {
86 let mut state = active_call.call_state.write().await;
87 state.option = Some(option);
88 }
89
90 let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
91 let initial_ssrc = 12345;
92 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
93
94 {
96 let mut state = active_call.call_state.write().await;
97 state.tts_handle = Some(handle);
98 state.current_play_id = Some("play_1".to_string());
99 }
100
101 active_call
103 .do_tts(
104 "hangup now".to_string(),
105 None,
106 Some("play_1".to_string()),
107 Some(true),
108 false,
109 true,
110 None,
111 None,
112 false,
113 None,
114 )
115 .await?;
116
117 {
119 let state = active_call.call_state.read().await;
120 assert!(state.auto_hangup.is_some());
121 let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
122 assert_eq!(
123 h_ssrc, initial_ssrc,
124 "SSRC should be reused from existing handle"
125 );
126 assert_eq!(reason, CallRecordHangupReason::BySystem);
127 }
128
129 let cmd = rx.try_recv().expect("Should have received tts command");
131 assert_eq!(cmd.text, "hangup now");
132
133 Ok(())
134 }
135
136 #[tokio::test]
137 async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
138 let mut config = Config::default();
139 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
141 let stream_engine = Arc::new(StreamEngine::default());
142 let app_state = AppStateBuilder::new()
143 .with_config(config)
144 .with_stream_engine(stream_engine)
145 .build()
146 .await?;
147
148 let active_call = Arc::new(ActiveCall::new(
149 ActiveCallType::Sip,
150 CancellationToken::new(),
151 "test-session".to_string(),
152 app_state.invitation.clone(),
153 app_state.clone(),
154 TrackConfig::default(),
155 None,
156 false,
157 None,
158 None,
159 ));
160
161 let mut tts_opt = crate::synthesis::SynthesisOption::default();
162 tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
163 let mut option = crate::CallOption::default();
164 option.tts = Some(tts_opt);
165 {
166 let mut state = active_call.call_state.write().await;
167 state.option = Some(option);
168 }
169
170 let (tx, _rx) = mpsc::unbounded_channel();
171 let initial_ssrc = 111;
172 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
173
174 {
175 let mut state = active_call.call_state.write().await;
176 state.tts_handle = Some(handle);
177 state.current_play_id = Some("play_1".to_string());
178 }
179
180 active_call
182 .do_tts(
183 "new play".to_string(),
184 None,
185 Some("play_2".to_string()),
186 Some(true),
187 false,
188 true,
189 None,
190 None,
191 false,
192 None,
193 )
194 .await?;
195
196 {
198 let state = active_call.call_state.read().await;
199 let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
200 assert_ne!(
201 h_ssrc, initial_ssrc,
202 "Should use a new SSRC for different play_id"
203 );
204 }
205
206 Ok(())
207 }
208}
209
210#[derive(Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct CallParams {
213 pub id: Option<String>,
214 #[serde(rename = "dump")]
215 pub dump_events: Option<bool>,
216 #[serde(rename = "ping")]
217 pub ping_interval: Option<u32>,
218 pub server_side_track: Option<String>,
219}
220
221#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
222#[serde(rename_all = "camelCase")]
223pub enum ActiveCallType {
224 Webrtc,
225 B2bua,
226 WebSocket,
227 #[default]
228 Sip,
229}
230
231#[derive(Default)]
232pub struct ActiveCallState {
233 pub session_id: String,
234 pub start_time: DateTime<Utc>,
235 pub ring_time: Option<DateTime<Utc>>,
236 pub answer_time: Option<DateTime<Utc>>,
237 pub hangup_reason: Option<CallRecordHangupReason>,
238 pub last_status_code: u16,
239 pub option: Option<CallOption>,
240 pub answer: Option<String>,
241 pub ssrc: u32,
242 pub refer_callstate: Option<ActiveCallStateRef>,
243 pub extras: Option<HashMap<String, serde_json::Value>>,
244 pub is_refer: bool,
245
246 pub tts_handle: Option<SynthesisHandle>,
248 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
249 pub wait_input_timeout: Option<u32>,
250 pub moh: Option<String>,
251 pub current_play_id: Option<String>,
252 pub audio_receiver: Option<WebsocketBytesReceiver>,
253 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
254}
255
256pub type ActiveCallRef = Arc<ActiveCall>;
257pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
258
259pub struct ActiveCall {
260 pub call_state: ActiveCallStateRef,
261 pub cancel_token: CancellationToken,
262 pub call_type: ActiveCallType,
263 pub session_id: String,
264 pub media_stream: Arc<MediaStream>,
265 pub track_config: TrackConfig,
266 pub event_sender: EventSender,
267 pub app_state: AppState,
268 pub invitation: Invitation,
269 pub cmd_sender: CommandSender,
270 pub dump_events: bool,
271 pub server_side_track_id: TrackId,
272}
273
274pub struct ActiveCallGuard {
275 pub call: ActiveCallRef,
276 pub active_calls: usize,
277}
278
279impl ActiveCallGuard {
280 pub fn new(call: ActiveCallRef) -> Self {
281 let active_calls = {
282 call.app_state
283 .total_calls
284 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
285 let mut calls = call.app_state.active_calls.lock().unwrap();
286 calls.insert(call.session_id.clone(), call.clone());
287 calls.len()
288 };
289 Self { call, active_calls }
290 }
291}
292
293impl Drop for ActiveCallGuard {
294 fn drop(&mut self) {
295 self.call
296 .app_state
297 .active_calls
298 .lock()
299 .unwrap()
300 .remove(&self.call.session_id);
301 }
302}
303
304pub struct ActiveCallReceiver {
305 pub cmd_receiver: CommandReceiver,
306 pub dump_cmd_receiver: CommandReceiver,
307 pub dump_event_receiver: EventReceiver,
308}
309
310impl ActiveCall {
311 pub fn new(
312 call_type: ActiveCallType,
313 cancel_token: CancellationToken,
314 session_id: String,
315 invitation: Invitation,
316 app_state: AppState,
317 track_config: TrackConfig,
318 audio_receiver: Option<WebsocketBytesReceiver>,
319 dump_events: bool,
320 server_side_track_id: Option<TrackId>,
321 extras: Option<HashMap<String, serde_json::Value>>,
322 ) -> Self {
323 let event_sender = crate::event::create_event_sender();
324 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
325 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
326 .with_id(session_id.clone())
327 .with_cancel_token(cancel_token.child_token());
328 let media_stream = Arc::new(media_stream_builder.build());
329 let call_state = Arc::new(RwLock::new(ActiveCallState {
330 session_id: session_id.clone(),
331 start_time: Utc::now(),
332 ssrc: rand::random::<u32>(),
333 extras,
334 audio_receiver,
335 ..Default::default()
336 }));
337 Self {
338 cancel_token,
339 call_type,
340 session_id,
341 call_state,
342 media_stream,
343 track_config,
344 event_sender,
345 app_state,
346 invitation,
347 cmd_sender,
348 dump_events,
349 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
350 }
351 }
352
353 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
354 self.cmd_sender
355 .send(command)
356 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
357 Ok(())
358 }
359
360 pub fn new_receiver(&self) -> ActiveCallReceiver {
364 ActiveCallReceiver {
365 cmd_receiver: self.cmd_sender.subscribe(),
366 dump_cmd_receiver: self.cmd_sender.subscribe(),
367 dump_event_receiver: self.event_sender.subscribe(),
368 }
369 }
370
371 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
372 let ActiveCallReceiver {
373 mut cmd_receiver,
374 dump_cmd_receiver,
375 dump_event_receiver,
376 } = receiver;
377
378 let process_command_loop = async move {
379 while let Ok(command) = cmd_receiver.recv().await {
380 match self.dispatch(command).await {
381 Ok(_) => (),
382 Err(e) => {
383 warn!(session_id = self.session_id, "{}", e);
384 self.event_sender
385 .send(SessionEvent::Error {
386 track_id: self.session_id.clone(),
387 timestamp: crate::media::get_timestamp(),
388 sender: "command".to_string(),
389 error: e.to_string(),
390 code: None,
391 })
392 .ok();
393 }
394 }
395 }
396 };
397 self.app_state
398 .total_calls
399 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
400
401 tokio::join!(
402 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
403 async {
404 select! {
405 _ = process_command_loop => {
406 info!(session_id = self.session_id, "command loop done");
407 }
408 _ = self.process() => {
409 info!(session_id = self.session_id, "call serve done");
410 }
411 _ = self.cancel_token.cancelled() => {
412 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
413 }
414 }
415 self.cancel_token.cancel();
416 }
417 );
418 Ok(())
419 }
420
421 async fn process(&self) -> Result<()> {
422 let mut event_receiver = self.event_sender.subscribe();
423
424 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
425 let input_timeout_expire_ref = input_timeout_expire.clone();
426 let event_sender = self.event_sender.clone();
427 let wait_input_timeout_loop = async {
428 loop {
429 let (start_time, expire) = { *input_timeout_expire.lock().await };
430 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
431 info!(session_id = self.session_id, "wait input timeout reached");
432 *input_timeout_expire.lock().await = (0, 0);
433 event_sender
434 .send(SessionEvent::Silence {
435 track_id: self.server_side_track_id.clone(),
436 timestamp: crate::media::get_timestamp(),
437 start_time,
438 duration: expire as u64,
439 samples: None,
440 })
441 .ok();
442 }
443 sleep(Duration::from_millis(100)).await;
444 }
445 };
446 let server_side_track_id = self.server_side_track_id.clone();
447 let event_hook_loop = async move {
448 while let Ok(event) = event_receiver.recv().await {
449 match event {
450 SessionEvent::Speaking { .. }
451 | SessionEvent::Dtmf { .. }
452 | SessionEvent::AsrDelta { .. }
453 | SessionEvent::AsrFinal { .. }
454 | SessionEvent::TrackStart { .. } => {
455 *input_timeout_expire_ref.lock().await = (0, 0);
456 }
457 SessionEvent::TrackEnd {
458 track_id,
459 play_id,
460 ssrc,
461 ..
462 } => {
463 if track_id != server_side_track_id {
464 continue;
465 }
466
467 let (moh_path, auto_hangup, wait_timeout_val) = {
468 let mut state = self.call_state.write().await;
469 if play_id != state.current_play_id {
470 debug!(
471 session_id = self.session_id,
472 ?play_id,
473 current = ?state.current_play_id,
474 "ignoring interrupted track end"
475 );
476 continue;
477 }
478 state.current_play_id = None;
479 (
480 state.moh.clone(),
481 state.auto_hangup.clone(),
482 state.wait_input_timeout.take(),
483 )
484 };
485
486 if let Some(path) = moh_path {
487 info!(session_id = self.session_id, "looping moh: {}", path);
488 let ssrc = rand::random::<u32>();
489 let file_track = FileTrack::new(self.server_side_track_id.clone())
490 .with_play_id(Some(path.clone()))
491 .with_ssrc(ssrc)
492 .with_path(path.clone())
493 .with_cancel_token(self.cancel_token.child_token());
494 self.update_track_wrapper(Box::new(file_track), Some(path))
495 .await;
496 continue;
497 }
498
499 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
500 if hangup_ssrc == ssrc {
501 info!(
502 session_id = self.session_id,
503 ssrc, "auto hangup when track end track_id:{}", track_id
504 );
505 self.do_hangup(Some(hangup_reason), None).await.ok();
506 }
507 }
508
509 if let Some(timeout) = wait_timeout_val {
510 let expire = if timeout > 0 {
511 (crate::media::get_timestamp(), timeout)
512 } else {
513 (0, 0)
514 };
515 *input_timeout_expire_ref.lock().await = expire;
516 }
517 }
518 SessionEvent::Interrupt { receiver } => {
519 let track_id =
520 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
521 if track_id == self.server_side_track_id {
522 debug!(
523 session_id = self.session_id,
524 "received interrupt event, stopping playback"
525 );
526 self.do_interrupt(true).await.ok();
527 }
528 }
529 SessionEvent::Inactivity { track_id, .. } => {
530 info!(
531 session_id = self.session_id,
532 track_id, "inactivity timeout reached, hanging up"
533 );
534 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
535 .await
536 .ok();
537 }
538 SessionEvent::Error { track_id, .. } => {
539 if track_id != server_side_track_id {
540 continue;
541 }
542
543 let moh_info = {
544 let mut state = self.call_state.write().await;
545 if let Some(path) = state.moh.clone() {
546 let fallback = "./config/sounds/refer_moh.wav".to_string();
547 let next_path = if path != fallback
548 && std::path::Path::new(&fallback).exists()
549 {
550 info!(
551 session_id = self.session_id,
552 "moh error, switching to fallback: {}", fallback
553 );
554 state.moh = Some(fallback.clone());
555 fallback
556 } else {
557 info!(
558 session_id = self.session_id,
559 "looping moh on error: {}", path
560 );
561 path
562 };
563 Some(next_path)
564 } else {
565 None
566 }
567 };
568
569 if let Some(next_path) = moh_info {
570 let ssrc = rand::random::<u32>();
571 let file_track = FileTrack::new(self.server_side_track_id.clone())
572 .with_play_id(Some(next_path.clone()))
573 .with_ssrc(ssrc)
574 .with_path(next_path.clone())
575 .with_cancel_token(self.cancel_token.child_token());
576 self.update_track_wrapper(Box::new(file_track), Some(next_path))
577 .await;
578 continue;
579 }
580 }
581 _ => {}
582 }
583 }
584 };
585
586 select! {
587 _ = wait_input_timeout_loop=>{
588 info!(session_id = self.session_id, "wait input timeout loop done");
589 }
590 _ = self.media_stream.serve() => {
591 info!(session_id = self.session_id, "media stream loop done");
592 }
593 _ = event_hook_loop => {
594 info!(session_id = self.session_id, "event loop done");
595 }
596 }
597 Ok(())
598 }
599
600 async fn dispatch(&self, command: Command) -> Result<()> {
601 match command {
602 Command::Invite { option } => self.do_invite(option).await,
603 Command::Accept { option } => self.do_accept(option).await,
604 Command::Reject { reason, code } => {
605 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
606 .await
607 }
608 Command::Ringing {
609 ringtone,
610 recorder,
611 early_media,
612 } => self.do_ringing(ringtone, recorder, early_media).await,
613 Command::Tts {
614 text,
615 speaker,
616 play_id,
617 auto_hangup,
618 streaming,
619 end_of_stream,
620 option,
621 wait_input_timeout,
622 base64,
623 cache_key,
624 } => {
625 self.do_tts(
626 text,
627 speaker,
628 play_id,
629 auto_hangup,
630 streaming.unwrap_or_default(),
631 end_of_stream.unwrap_or_default(),
632 option,
633 wait_input_timeout,
634 base64.unwrap_or_default(),
635 cache_key,
636 )
637 .await
638 }
639 Command::Play {
640 url,
641 play_id,
642 auto_hangup,
643 wait_input_timeout,
644 } => {
645 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
646 .await
647 }
648 Command::Hangup { reason, initiator } => {
649 let reason = reason.map(|r| {
650 r.parse::<CallRecordHangupReason>()
651 .unwrap_or(CallRecordHangupReason::BySystem)
652 });
653 self.do_hangup(reason, initiator).await
654 }
655 Command::Refer {
656 caller,
657 callee,
658 options,
659 } => self.do_refer(caller, callee, options).await,
660 Command::Mute { track_id } => self.do_mute(track_id).await,
661 Command::Unmute { track_id } => self.do_unmute(track_id).await,
662 Command::Pause {} => self.do_pause().await,
663 Command::Resume {} => self.do_resume().await,
664 Command::Interrupt {
665 graceful: passage,
666 fade_out_ms: _,
667 } => self.do_interrupt(passage.unwrap_or_default()).await,
668 Command::History { speaker, text } => self.do_history(speaker, text).await,
669 }
670 }
671
672 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
673 if let Some(recorder_option) = &option.recorder {
674 let mut recorder_file = recorder_option.recorder_file.clone();
675 if recorder_file.contains("{id}") {
676 recorder_file = recorder_file.replace("{id}", &self.session_id);
677 }
678
679 let recorder_file = if recorder_file.is_empty() {
680 self.app_state.get_recorder_file(&self.session_id)
681 } else {
682 let p = Path::new(&recorder_file);
683 p.is_absolute()
684 .then(|| recorder_file.clone())
685 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
686 };
687 info!(
688 session_id = self.session_id,
689 recorder_file, "created recording file"
690 );
691
692 let track_samplerate = self.track_config.samplerate;
693 let recorder_samplerate = if track_samplerate > 0 {
694 track_samplerate
695 } else {
696 recorder_option.samplerate
697 };
698 let recorder_ptime = if recorder_option.ptime == 0 {
699 200
700 } else {
701 recorder_option.ptime
702 };
703 let requested_format = recorder_option
704 .format
705 .unwrap_or(self.app_state.config.recorder_format());
706 let format = requested_format.effective();
707 if requested_format != format {
708 warn!(
709 session_id = self.session_id,
710 requested = requested_format.extension(),
711 "Recorder format fallback to wav due to unsupported feature"
712 );
713 }
714 let mut recorder_config = RecorderOption {
715 recorder_file,
716 samplerate: recorder_samplerate,
717 ptime: recorder_ptime,
718 format: Some(format),
719 };
720 recorder_config.ensure_path_extension(format);
721 Some(recorder_config)
722 } else {
723 None
724 }
725 }
726
727 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
728 {
730 let state = self.call_state.read().await;
731 option = state.merge_option(option);
732 }
733
734 option.check_default();
735 if let Some(opt) = self.build_record_option(&option) {
736 self.media_stream.update_recorder_option(opt).await;
737 }
738
739 if let Some(opt) = &option.media_pass {
740 let track_id = self.server_side_track_id.clone();
741 let cancel_token = self.cancel_token.child_token();
742 let ssrc = rand::random::<u32>();
743 let media_pass_track = MediaPassTrack::new(
744 self.session_id.clone(),
745 ssrc,
746 track_id,
747 cancel_token,
748 opt.clone(),
749 );
750 self.update_track_wrapper(Box::new(media_pass_track), None)
751 .await;
752 }
753
754 info!(
755 session_id = self.session_id,
756 call_type = ?self.call_type,
757 sender,
758 ?option,
759 "caller with option"
760 );
761
762 match self.setup_caller_track(&option).await {
763 Ok(_) => return Ok(option),
764 Err(e) => {
765 self.app_state
766 .total_failed_calls
767 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
768 let error_event = crate::event::SessionEvent::Error {
769 track_id: self.session_id.clone(),
770 timestamp: crate::media::get_timestamp(),
771 sender,
772 error: e.to_string(),
773 code: None,
774 };
775 self.event_sender.send(error_event).ok();
776 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
777 .await
778 .ok();
779 return Err(e);
780 }
781 }
782 }
783
784 async fn do_invite(&self, option: CallOption) -> Result<()> {
785 self.invite_or_accept(option, "invite".to_string())
786 .await
787 .map(|_| ())
788 }
789
790 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
791 let has_pending = self
792 .invitation
793 .find_dialog_id_by_session_id(&self.session_id)
794 .is_some();
795 let ready_to_answer_val = {
796 let state = self.call_state.read().await;
797 state.ready_to_answer.is_none()
798 };
799
800 if ready_to_answer_val {
801 if !has_pending {
802 warn!(session_id = self.session_id, "no pending call to accept");
804 let rejet_event = crate::event::SessionEvent::Reject {
805 track_id: self.session_id.clone(),
806 timestamp: crate::media::get_timestamp(),
807 reason: "no pending call".to_string(),
808 refer: None,
809 code: Some(486),
810 };
811 self.event_sender.send(rejet_event).ok();
812 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
813 .await
814 .ok();
815 return Err(anyhow::anyhow!("no pending call to accept"));
816 }
817 option = self.invite_or_accept(option, "accept".to_string()).await?;
818 } else {
819 option.check_default();
820 self.call_state.write().await.option = Some(option.clone());
821 }
822 info!(session_id = self.session_id, ?option, "accepting call");
823 let ready = self.call_state.write().await.ready_to_answer.take();
824 if let Some((answer, track, dialog)) = ready {
825 info!(
826 session_id = self.session_id,
827 track_id = track.as_ref().map(|t| t.id()),
828 "ready to answer with track"
829 );
830
831 let headers = vec![rsip::Header::ContentType(
832 "application/sdp".to_string().into(),
833 )];
834
835 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
836 Ok(_) => {
837 {
838 let mut state = self.call_state.write().await;
839 state.answer = Some(answer);
840 state.answer_time = Some(Utc::now());
841 }
842 self.finish_caller_stack(&option, track).await?;
843 }
844 Err(e) => {
845 warn!(session_id = self.session_id, "failed to accept call: {}", e);
846 return Err(anyhow::anyhow!("failed to accept call"));
847 }
848 }
849 }
850 return Ok(());
851 }
852
853 async fn do_reject(
854 &self,
855 code: Option<rsip::StatusCode>,
856 reason: Option<String>,
857 ) -> Result<()> {
858 match self
859 .invitation
860 .find_dialog_id_by_session_id(&self.session_id)
861 {
862 Some(id) => {
863 info!(
864 session_id = self.session_id,
865 ?reason,
866 ?code,
867 "rejecting call"
868 );
869 self.invitation.hangup(id, code, reason).await
870 }
871 None => Ok(()),
872 }
873 }
874
875 async fn do_ringing(
876 &self,
877 ringtone: Option<String>,
878 recorder: Option<RecorderOption>,
879 early_media: Option<bool>,
880 ) -> Result<()> {
881 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
882 if ready_to_answer_val {
883 let option = CallOption {
884 recorder,
885 ..Default::default()
886 };
887 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
888 }
889
890 let state = self.call_state.read().await;
891 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
892 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
893 let headers = vec![rsip::Header::ContentType(
894 "application/sdp".to_string().into(),
895 )];
896 (Some(headers), Some(answer.as_bytes().to_vec()))
897 } else {
898 (None, None)
899 };
900
901 dialog.ringing(headers, body).ok();
902 info!(
903 session_id = self.session_id,
904 ringtone, early_media, "playing ringtone"
905 );
906 if let Some(ringtone_url) = ringtone {
907 drop(state);
908 self.do_play(ringtone_url, None, None, None).await.ok();
909 } else {
910 info!(session_id = self.session_id, "no ringtone to play");
911 }
912 }
913 Ok(())
914 }
915
916 async fn do_tts(
917 &self,
918 text: String,
919 speaker: Option<String>,
920 play_id: Option<String>,
921 auto_hangup: Option<bool>,
922 streaming: bool,
923 end_of_stream: bool,
924 option: Option<SynthesisOption>,
925 wait_input_timeout: Option<u32>,
926 base64: bool,
927 cache_key: Option<String>,
928 ) -> Result<()> {
929 let tts_option = {
930 let call_state = self.call_state.read().await;
931 match call_state.option.clone().unwrap_or_default().tts {
932 Some(opt) => opt.merge_with(option),
933 None => {
934 if let Some(opt) = option {
935 opt
936 } else {
937 return Err(anyhow::anyhow!("no tts option available"));
938 }
939 }
940 }
941 };
942 let speaker = match speaker {
943 Some(s) => Some(s),
944 None => tts_option.speaker.clone(),
945 };
946
947 let mut play_command = SynthesisCommand {
948 text,
949 speaker,
950 play_id: play_id.clone(),
951 streaming,
952 end_of_stream,
953 option: tts_option,
954 base64,
955 cache_key,
956 };
957 info!(
958 session_id = self.session_id,
959 provider = ?play_command.option.provider,
960 text = %play_command.text.chars().take(10).collect::<String>(),
961 speaker = play_command.speaker.as_deref(),
962 auto_hangup = auto_hangup.unwrap_or_default(),
963 play_id = play_command.play_id.as_deref(),
964 streaming = play_command.streaming,
965 end_of_stream = play_command.end_of_stream,
966 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
967 is_base64 = play_command.base64,
968 cache_key = play_command.cache_key.as_deref(),
969 "new synthesis"
970 );
971
972 let ssrc = rand::random::<u32>();
973 let (should_interrupt, picked_ssrc) = {
974 let mut state = self.call_state.write().await;
975
976 let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
977 if play_id.is_some() && state.current_play_id != play_id {
978 (ssrc, true)
979 } else {
980 (handle.ssrc, false)
981 }
982 } else {
983 (ssrc, false)
984 };
985
986 state.auto_hangup = match auto_hangup {
987 Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
988 _ => state.auto_hangup.clone(),
989 };
990 state.wait_input_timeout = wait_input_timeout;
991
992 state.current_play_id = play_id.clone();
993 (changed, target_ssrc)
994 };
995
996 if should_interrupt {
997 let _ = self.do_interrupt(false).await;
998 }
999
1000 let existing_handle = self.call_state.read().await.tts_handle.clone();
1001 if let Some(tts_handle) = existing_handle {
1002 match tts_handle.try_send(play_command) {
1003 Ok(_) => return Ok(()),
1004 Err(e) => {
1005 play_command = e.0;
1006 }
1007 }
1008 }
1009
1010 let (new_handle, tts_track) = StreamEngine::create_tts_track(
1011 self.app_state.stream_engine.clone(),
1012 self.cancel_token.child_token(),
1013 self.session_id.clone(),
1014 self.server_side_track_id.clone(),
1015 picked_ssrc,
1016 play_id.clone(),
1017 streaming,
1018 &play_command.option,
1019 )
1020 .await?;
1021
1022 new_handle.try_send(play_command)?;
1023 self.call_state.write().await.tts_handle = Some(new_handle);
1024 self.update_track_wrapper(tts_track, play_id).await;
1025 Ok(())
1026 }
1027
1028 async fn do_play(
1029 &self,
1030 url: String,
1031 play_id: Option<String>,
1032 auto_hangup: Option<bool>,
1033 wait_input_timeout: Option<u32>,
1034 ) -> Result<()> {
1035 let ssrc = rand::random::<u32>();
1036 info!(
1037 session_id = self.session_id,
1038 ssrc, url, play_id, auto_hangup, "play file track"
1039 );
1040
1041 let play_id = play_id.or(Some(url.clone()));
1042
1043 let file_track = FileTrack::new(self.server_side_track_id.clone())
1044 .with_play_id(play_id.clone())
1045 .with_ssrc(ssrc)
1046 .with_path(url)
1047 .with_cancel_token(self.cancel_token.child_token());
1048
1049 {
1050 let mut state = self.call_state.write().await;
1051 state.tts_handle = None;
1052 state.auto_hangup = match auto_hangup {
1053 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1054 _ => None,
1055 };
1056 state.wait_input_timeout = wait_input_timeout;
1057 }
1058
1059 self.update_track_wrapper(Box::new(file_track), play_id)
1060 .await;
1061 Ok(())
1062 }
1063
1064 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1065 self.event_sender
1066 .send(SessionEvent::AddHistory {
1067 sender: Some(self.session_id.clone()),
1068 timestamp: crate::media::get_timestamp(),
1069 speaker,
1070 text,
1071 })
1072 .map(|_| ())
1073 .map_err(Into::into)
1074 }
1075
1076 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1077 {
1078 let mut state = self.call_state.write().await;
1079 state.tts_handle = None;
1080 state.moh = None;
1081 }
1082 self.media_stream
1083 .remove_track(&self.server_side_track_id, graceful)
1084 .await;
1085 Ok(())
1086 }
1087 async fn do_pause(&self) -> Result<()> {
1088 Ok(())
1089 }
1090 async fn do_resume(&self) -> Result<()> {
1091 Ok(())
1092 }
1093 async fn do_hangup(
1094 &self,
1095 reason: Option<CallRecordHangupReason>,
1096 initiator: Option<String>,
1097 ) -> Result<()> {
1098 info!(
1099 session_id = self.session_id,
1100 ?reason,
1101 ?initiator,
1102 "do_hangup"
1103 );
1104
1105 let hangup_reason = match initiator.as_deref() {
1107 Some("caller") => CallRecordHangupReason::ByCaller,
1108 Some("callee") => CallRecordHangupReason::ByCallee,
1109 Some("system") => CallRecordHangupReason::Autohangup,
1110 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1111 };
1112
1113 self.media_stream
1114 .stop(Some(hangup_reason.to_string()), initiator);
1115
1116 self.call_state
1117 .write()
1118 .await
1119 .set_hangup_reason(hangup_reason);
1120 Ok(())
1121 }
1122
1123 async fn do_refer(
1124 &self,
1125 caller: String,
1126 callee: String,
1127 refer_option: Option<ReferOption>,
1128 ) -> Result<()> {
1129 self.do_interrupt(false).await.ok();
1130 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1131 if let Some(ref path) = moh {
1132 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1133 let fallback = "./config/sounds/refer_moh.wav";
1134 if std::path::Path::new(fallback).exists() {
1135 info!(
1136 session_id = self.session_id,
1137 "moh {} not found, using fallback {}", path, fallback
1138 );
1139 moh = Some(fallback.to_string());
1140 }
1141 }
1142 }
1143 let session_id = self.session_id.clone();
1144 let track_id = self.server_side_track_id.clone();
1145
1146 let recorder = {
1147 let cs = self.call_state.read().await;
1148 cs.option
1149 .as_ref()
1150 .map(|o| o.recorder.clone())
1151 .unwrap_or_default()
1152 };
1153
1154 let call_option = CallOption {
1155 caller: Some(caller),
1156 callee: Some(callee.clone()),
1157 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1158 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1159 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1160 recorder,
1161 ..Default::default()
1162 };
1163
1164 let mut invite_option = call_option.build_invite_option()?;
1165 invite_option.call_id = Some(self.session_id.clone());
1166
1167 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1168
1169 {
1170 let cs = self.call_state.read().await;
1171 if let Some(opt) = cs.option.as_ref() {
1172 if let Some(callee) = opt.callee.as_ref() {
1173 headers.push(rsip::Header::Other(
1174 "X-Referred-To".to_string(),
1175 callee.clone(),
1176 ));
1177 }
1178 if let Some(caller) = opt.caller.as_ref() {
1179 headers.push(rsip::Header::Other(
1180 "X-Referred-From".to_string(),
1181 caller.clone(),
1182 ));
1183 }
1184 }
1185 }
1186
1187 headers.push(rsip::Header::Other(
1188 "X-Referred-Id".to_string(),
1189 self.session_id.clone(),
1190 ));
1191
1192 let ssrc = rand::random::<u32>();
1193 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1194 start_time: Utc::now(),
1195 ssrc,
1196 option: Some(call_option.clone()),
1197 is_refer: true,
1198 ..Default::default()
1199 }));
1200
1201 {
1202 let mut cs = self.call_state.write().await;
1203 cs.refer_callstate.replace(refer_call_state.clone());
1204 }
1205
1206 let auto_hangup_requested = refer_option
1207 .as_ref()
1208 .and_then(|o| o.auto_hangup)
1209 .unwrap_or(true);
1210
1211 if auto_hangup_requested {
1212 self.call_state.write().await.auto_hangup =
1213 Some((ssrc, CallRecordHangupReason::ByRefer));
1214 } else {
1215 self.call_state.write().await.auto_hangup = None;
1216 }
1217
1218 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1219
1220 info!(
1221 session_id = self.session_id,
1222 ssrc,
1223 auto_hangup = auto_hangup_requested,
1224 callee,
1225 timeout_secs,
1226 "do_refer"
1227 );
1228
1229 let r = tokio::time::timeout(
1230 Duration::from_secs(timeout_secs as u64),
1231 self.create_outgoing_sip_track(
1232 self.cancel_token.child_token(),
1233 refer_call_state.clone(),
1234 &track_id,
1235 invite_option,
1236 &call_option,
1237 moh,
1238 auto_hangup_requested,
1239 ),
1240 )
1241 .await;
1242
1243 {
1244 self.call_state.write().await.moh = None;
1245 }
1246
1247 let result = match r {
1248 Ok(res) => res,
1249 Err(_) => {
1250 warn!(
1251 session_id = session_id,
1252 "refer sip track creation timed out after {} seconds", timeout_secs
1253 );
1254 self.event_sender
1255 .send(SessionEvent::Reject {
1256 track_id,
1257 timestamp: crate::media::get_timestamp(),
1258 reason: "Timeout when refer".into(),
1259 code: Some(408),
1260 refer: Some(true),
1261 })
1262 .ok();
1263 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1264 }
1265 };
1266
1267 match result {
1268 Ok(answer) => {
1269 self.event_sender
1270 .send(SessionEvent::Answer {
1271 timestamp: crate::media::get_timestamp(),
1272 track_id,
1273 sdp: answer,
1274 refer: Some(true),
1275 })
1276 .ok();
1277 }
1278 Err(e) => {
1279 warn!(
1280 session_id = session_id,
1281 "failed to create refer sip track: {}", e
1282 );
1283 match &e {
1284 rsipstack::Error::DialogError(reason, _, code) => {
1285 self.event_sender
1286 .send(SessionEvent::Reject {
1287 track_id,
1288 timestamp: crate::media::get_timestamp(),
1289 reason: reason.clone(),
1290 code: Some(code.code() as u32),
1291 refer: Some(true),
1292 })
1293 .ok();
1294 }
1295 _ => {}
1296 }
1297 return Err(e.into());
1298 }
1299 }
1300 Ok(())
1301 }
1302
1303 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1304 self.media_stream.mute_track(track_id).await;
1305 Ok(())
1306 }
1307
1308 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1309 self.media_stream.unmute_track(track_id).await;
1310 Ok(())
1311 }
1312
1313 pub async fn cleanup(&self) -> Result<()> {
1314 self.call_state.write().await.tts_handle = None;
1315 self.media_stream.cleanup().await.ok();
1316 Ok(())
1317 }
1318
1319 pub fn get_callrecord(&self) -> Option<CallRecord> {
1320 self.call_state.try_read().ok().map(|call_state| {
1321 call_state.build_callrecord(
1322 self.app_state.clone(),
1323 self.session_id.clone(),
1324 self.call_type.clone(),
1325 )
1326 })
1327 }
1328
1329 async fn dump_to_file(
1330 &self,
1331 dump_file: &mut File,
1332 cmd_receiver: &mut CommandReceiver,
1333 event_receiver: &mut EventReceiver,
1334 ) {
1335 loop {
1336 select! {
1337 _ = self.cancel_token.cancelled() => {
1338 break;
1339 }
1340 Ok(cmd) = cmd_receiver.recv() => {
1341 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1342 .await;
1343 }
1344 Ok(event) = event_receiver.recv() => {
1345 if matches!(event, SessionEvent::Binary{..}) {
1346 continue;
1347 }
1348 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1349 .await;
1350 }
1351 };
1352 }
1353 }
1354
1355 async fn dump_loop(
1356 &self,
1357 dump_events: bool,
1358 mut dump_cmd_receiver: CommandReceiver,
1359 mut dump_event_receiver: EventReceiver,
1360 ) {
1361 if !dump_events {
1362 return;
1363 }
1364
1365 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1366 let mut dump_file = match File::options()
1367 .create(true)
1368 .append(true)
1369 .open(&file_name)
1370 .await
1371 {
1372 Ok(file) => file,
1373 Err(e) => {
1374 warn!(
1375 session_id = self.session_id,
1376 file_name, "failed to open dump events file: {}", e
1377 );
1378 return;
1379 }
1380 };
1381 self.dump_to_file(
1382 &mut dump_file,
1383 &mut dump_cmd_receiver,
1384 &mut dump_event_receiver,
1385 )
1386 .await;
1387
1388 while let Ok(event) = dump_event_receiver.try_recv() {
1389 if matches!(event, SessionEvent::Binary { .. }) {
1390 continue;
1391 }
1392 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1393 }
1394 }
1395
1396 pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1397 let mut rtc_config = RtcTrackConfig::default();
1398 rtc_config.mode = rustrtc::TransportMode::Rtp;
1399
1400 if let Some(codecs) = &self.app_state.config.codecs {
1401 let mut codec_types = Vec::new();
1402 for c in codecs {
1403 match c.to_lowercase().as_str() {
1404 "pcmu" => codec_types.push(CodecType::PCMU),
1405 "pcma" => codec_types.push(CodecType::PCMA),
1406 "g722" => codec_types.push(CodecType::G722),
1407 "g729" => codec_types.push(CodecType::G729),
1408 "opus" => codec_types.push(CodecType::Opus),
1409 "dtmf" | "2833" | "telephone_event" => {
1410 codec_types.push(CodecType::TelephoneEvent)
1411 }
1412 _ => {}
1413 }
1414 }
1415 if !codec_types.is_empty() {
1416 rtc_config.preferred_codec = Some(codec_types[0].clone());
1417 rtc_config.codecs = codec_types;
1418 }
1419 }
1420
1421 if rtc_config.preferred_codec.is_none() {
1422 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1423 }
1424
1425 rtc_config.rtp_port_range = self
1426 .app_state
1427 .config
1428 .rtp_start_port
1429 .zip(self.app_state.config.rtp_end_port);
1430
1431 if let Some(ref external_ip) = self.app_state.config.external_ip {
1432 rtc_config.external_ip = Some(external_ip.clone());
1433 }
1434 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1435 rtc_config.bind_ip = Some(bind_ip.clone());
1436 }
1437
1438 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1439
1440 let mut track = RtcTrack::new(
1441 self.cancel_token.child_token(),
1442 track_id,
1443 self.track_config.clone(),
1444 rtc_config,
1445 )
1446 .with_ssrc(ssrc);
1447
1448 track.create().await?;
1449
1450 Ok(track)
1451 }
1452
1453 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1454 let hangup_headers = option
1455 .sip
1456 .as_ref()
1457 .and_then(|s| s.hangup_headers.as_ref())
1458 .map(|headers_map| {
1459 headers_map
1460 .iter()
1461 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1462 .collect::<Vec<rsip::Header>>()
1463 });
1464 self.call_state.write().await.option = Some(option.clone());
1465 info!(
1466 session_id = self.session_id,
1467 call_type = ?self.call_type,
1468 "setup caller track"
1469 );
1470
1471 let track = match self.call_type {
1472 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1473 ActiveCallType::WebSocket => {
1474 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1475 if let Some(receiver) = audio_receiver {
1476 Some(self.create_websocket_track(receiver).await?)
1477 } else {
1478 None
1479 }
1480 }
1481 ActiveCallType::Sip => {
1482 if let Some(dialog_id) = self
1483 .invitation
1484 .find_dialog_id_by_session_id(&self.session_id)
1485 {
1486 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1487 return self
1488 .prepare_incoming_sip_track(
1489 self.cancel_token.clone(),
1490 self.call_state.clone(),
1491 &self.session_id,
1492 pending_dialog,
1493 hangup_headers,
1494 )
1495 .await;
1496 }
1497 }
1498
1499 let mut option = option.clone();
1501 if option.sip.is_none()
1502 || option
1503 .sip
1504 .as_ref()
1505 .and_then(|s| s.username.as_ref())
1506 .is_none()
1507 {
1508 if let Some(callee) = &option.callee {
1509 if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1510 if option.sip.is_none() {
1511 option.sip = Some(crate::SipOption {
1512 username: Some(cred.username.clone()),
1513 password: Some(cred.password.clone()),
1514 realm: cred.realm.clone(),
1515 ..Default::default()
1516 });
1517 }
1518 }
1519 }
1520 }
1521
1522 let mut invite_option = option.build_invite_option()?;
1523 invite_option.call_id = Some(self.session_id.clone());
1524
1525 match self
1526 .create_outgoing_sip_track(
1527 self.cancel_token.clone(),
1528 self.call_state.clone(),
1529 &self.session_id,
1530 invite_option,
1531 &option,
1532 None,
1533 false,
1534 )
1535 .await
1536 {
1537 Ok(answer) => {
1538 self.event_sender
1539 .send(SessionEvent::Answer {
1540 timestamp: crate::media::get_timestamp(),
1541 track_id: self.session_id.clone(),
1542 sdp: answer,
1543 refer: Some(false),
1544 })
1545 .ok();
1546 return Ok(());
1547 }
1548 Err(e) => {
1549 warn!(
1550 session_id = self.session_id,
1551 "failed to create sip track: {}", e
1552 );
1553 match &e {
1554 rsipstack::Error::DialogError(reason, _, code) => {
1555 self.event_sender
1556 .send(SessionEvent::Reject {
1557 track_id: self.session_id.clone(),
1558 timestamp: crate::media::get_timestamp(),
1559 reason: reason.clone(),
1560 code: Some(code.code() as u32),
1561 refer: Some(false),
1562 })
1563 .ok();
1564 }
1565 _ => {}
1566 }
1567 return Err(e.into());
1568 }
1569 }
1570 }
1571 ActiveCallType::B2bua => {
1572 if let Some(dialog_id) = self
1573 .invitation
1574 .find_dialog_id_by_session_id(&self.session_id)
1575 {
1576 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1577 return self
1578 .prepare_incoming_sip_track(
1579 self.cancel_token.clone(),
1580 self.call_state.clone(),
1581 &self.session_id,
1582 pending_dialog,
1583 hangup_headers,
1584 )
1585 .await;
1586 }
1587 }
1588
1589 warn!(
1590 session_id = self.session_id,
1591 "no pending dialog found for B2BUA call"
1592 );
1593 return Err(anyhow::anyhow!(
1594 "no pending dialog found for session_id: {}",
1595 self.session_id
1596 ));
1597 }
1598 };
1599 match track {
1600 Some(track) => {
1601 self.finish_caller_stack(&option, Some(track)).await?;
1602 }
1603 None => {
1604 warn!(session_id = self.session_id, "no track created for caller");
1605 return Err(anyhow::anyhow!("no track created for caller"));
1606 }
1607 }
1608 Ok(())
1609 }
1610
1611 async fn finish_caller_stack(
1612 &self,
1613 option: &CallOption,
1614 track: Option<Box<dyn Track>>,
1615 ) -> Result<()> {
1616 if let Some(track) = track {
1617 self.setup_track_with_stream(&option, track).await?;
1618 }
1619
1620 {
1621 let call_state = self.call_state.read().await;
1622 if let Some(ref answer) = call_state.answer {
1623 info!(
1624 session_id = self.session_id,
1625 "sending answer event: {}", answer,
1626 );
1627 self.event_sender
1628 .send(SessionEvent::Answer {
1629 timestamp: crate::media::get_timestamp(),
1630 track_id: self.session_id.clone(),
1631 sdp: answer.clone(),
1632 refer: Some(false),
1633 })
1634 .ok();
1635 } else {
1636 warn!(
1637 session_id = self.session_id,
1638 "no answer in state to send event"
1639 );
1640 }
1641 }
1642 Ok(())
1643 }
1644
1645 pub async fn setup_track_with_stream(
1646 &self,
1647 option: &CallOption,
1648 mut track: Box<dyn Track>,
1649 ) -> Result<()> {
1650 let processors = match StreamEngine::create_processors(
1651 self.app_state.stream_engine.clone(),
1652 track.as_ref(),
1653 self.cancel_token.child_token(),
1654 self.event_sender.clone(),
1655 self.media_stream.packet_sender.clone(),
1656 option,
1657 )
1658 .await
1659 {
1660 Ok(processors) => processors,
1661 Err(e) => {
1662 warn!(
1663 session_id = self.session_id,
1664 "failed to prepare stream processors: {}", e
1665 );
1666 vec![]
1667 }
1668 };
1669
1670 for processor in processors {
1672 track.append_processor(processor);
1673 }
1674
1675 self.update_track_wrapper(track, None).await;
1676 Ok(())
1677 }
1678
1679 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1680 let (ambiance_opt, subscribe) = {
1681 let state = self.call_state.read().await;
1682 let mut opt = state
1683 .option
1684 .as_ref()
1685 .and_then(|o| o.ambiance.clone())
1686 .unwrap_or_default();
1687
1688 if let Some(global) = &self.app_state.config.ambiance {
1689 opt.merge(global);
1690 }
1691
1692 let subscribe = state
1693 .option
1694 .as_ref()
1695 .and_then(|o| o.subscribe)
1696 .unwrap_or_default();
1697
1698 (opt, subscribe)
1699 };
1700 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1701 match AmbianceProcessor::new(ambiance_opt).await {
1702 Ok(ambiance) => {
1703 info!(session_id = self.session_id, "loaded ambiance processor");
1704 track.append_processor(Box::new(ambiance));
1705 }
1706 Err(e) => {
1707 tracing::error!("failed to load ambiance wav {}", e);
1708 }
1709 }
1710 }
1711
1712 if subscribe && self.call_type != ActiveCallType::WebSocket {
1713 let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1714 (0, self.server_side_track_id.clone())
1715 } else {
1716 (1, self.session_id.clone())
1717 };
1718 let sub_processor =
1719 SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1720 track.append_processor(Box::new(sub_processor));
1721 }
1722
1723 self.call_state.write().await.current_play_id = play_id.clone();
1724 self.media_stream.update_track(track, play_id).await;
1725 }
1726
1727 pub async fn create_websocket_track(
1728 &self,
1729 audio_receiver: WebsocketBytesReceiver,
1730 ) -> Result<Box<dyn Track>> {
1731 let (ssrc, codec) = {
1732 let call_state = self.call_state.read().await;
1733 (
1734 call_state.ssrc,
1735 call_state
1736 .option
1737 .as_ref()
1738 .map(|o| o.codec.clone())
1739 .unwrap_or_default(),
1740 )
1741 };
1742
1743 let ws_track = WebsocketTrack::new(
1744 self.cancel_token.child_token(),
1745 self.session_id.clone(),
1746 self.track_config.clone(),
1747 self.event_sender.clone(),
1748 audio_receiver,
1749 codec,
1750 ssrc,
1751 );
1752
1753 {
1754 let mut call_state = self.call_state.write().await;
1755 call_state.answer_time = Some(Utc::now());
1756 call_state.answer = Some("".to_string());
1757 call_state.last_status_code = 200;
1758 }
1759
1760 Ok(Box::new(ws_track))
1761 }
1762
1763 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1764 let (ssrc, option) = {
1765 let call_state = self.call_state.read().await;
1766 (
1767 call_state.ssrc,
1768 call_state.option.clone().unwrap_or_default(),
1769 )
1770 };
1771
1772 let mut rtc_config = RtcTrackConfig::default();
1773 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1775
1776 if let Some(codecs) = &self.app_state.config.codecs {
1777 let mut codec_types = Vec::new();
1778 for c in codecs {
1779 match c.to_lowercase().as_str() {
1780 "pcmu" => codec_types.push(CodecType::PCMU),
1781 "pcma" => codec_types.push(CodecType::PCMA),
1782 "g722" => codec_types.push(CodecType::G722),
1783 "g729" => codec_types.push(CodecType::G729),
1784 "opus" => codec_types.push(CodecType::Opus),
1785 "dtmf" | "2833" | "telephone_event" => {
1786 codec_types.push(CodecType::TelephoneEvent)
1787 }
1788 _ => {}
1789 }
1790 }
1791 if !codec_types.is_empty() {
1792 rtc_config.preferred_codec = Some(codec_types[0].clone());
1793 rtc_config.codecs = codec_types;
1794 }
1795 }
1796
1797 if let Some(ref external_ip) = self.app_state.config.external_ip {
1798 rtc_config.external_ip = Some(external_ip.clone());
1799 }
1800 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1801 rtc_config.bind_ip = Some(bind_ip.clone());
1802 }
1803
1804 let mut webrtc_track = RtcTrack::new(
1805 self.cancel_token.child_token(),
1806 self.session_id.clone(),
1807 self.track_config.clone(),
1808 rtc_config,
1809 )
1810 .with_ssrc(ssrc);
1811
1812 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1813 let offer = match option.enable_ipv6 {
1814 Some(false) | None => {
1815 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1816 }
1817 _ => option.offer.clone().unwrap_or("".to_string()),
1818 };
1819 let answer: Option<String>;
1820 match webrtc_track.handshake(offer, timeout).await {
1821 Ok(answer_sdp) => {
1822 answer = match option.enable_ipv6 {
1823 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1824 Some(true) => Some(answer_sdp.to_string()),
1825 };
1826 }
1827 Err(e) => {
1828 warn!(session_id = self.session_id, "failed to setup track: {}", e);
1829 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1830 }
1831 }
1832
1833 {
1834 let mut call_state = self.call_state.write().await;
1835 call_state.answer_time = Some(Utc::now());
1836 call_state.answer = answer;
1837 call_state.last_status_code = 200;
1838 }
1839 Ok(Box::new(webrtc_track))
1840 }
1841
1842 async fn create_outgoing_sip_track(
1843 &self,
1844 cancel_token: CancellationToken,
1845 call_state_ref: ActiveCallStateRef,
1846 track_id: &String,
1847 mut invite_option: InviteOption,
1848 call_option: &CallOption,
1849 moh: Option<String>,
1850 auto_hangup: bool,
1851 ) -> Result<String, rsipstack::Error> {
1852 let ssrc = call_state_ref.read().await.ssrc;
1853 let rtp_track = self
1854 .create_rtp_track(track_id.clone(), ssrc)
1855 .await
1856 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1857
1858 let offer = Some(
1859 rtp_track
1860 .local_description()
1861 .await
1862 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1863 );
1864
1865 {
1866 let mut cs = call_state_ref.write().await;
1867 if let Some(o) = cs.option.as_mut() {
1868 o.offer = offer.clone();
1869 }
1870 cs.start_time = Utc::now();
1871 };
1872
1873 invite_option.offer = offer.clone().map(|s| s.into());
1874
1875 let needs_contact = invite_option.contact.scheme.is_none()
1878 || invite_option
1879 .contact
1880 .host_with_port
1881 .to_string()
1882 .starts_with("127.0.0.1");
1883
1884 if needs_contact {
1885 if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1886 invite_option.contact = rsip::Uri {
1887 scheme: Some(rsip::Scheme::Sip),
1888 auth: None,
1889 host_with_port: addr.addr.clone(),
1890 params: vec![],
1891 headers: vec![],
1892 };
1893 }
1894 }
1895
1896 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1897
1898 if let Some(moh) = moh {
1899 let ssrc_and_moh = {
1900 let mut state = call_state_ref.write().await;
1901 state.moh = Some(moh.clone());
1902 if state.current_play_id.is_none() {
1903 let ssrc = rand::random::<u32>();
1904 Some((ssrc, moh.clone()))
1905 } else {
1906 info!(
1907 session_id = self.session_id,
1908 "Something is playing, MOH will start after it ends"
1909 );
1910 None
1911 }
1912 };
1913
1914 if let Some((ssrc, moh_path)) = ssrc_and_moh {
1915 let file_track = FileTrack::new(self.server_side_track_id.clone())
1916 .with_play_id(Some(moh_path.clone()))
1917 .with_ssrc(ssrc)
1918 .with_path(moh_path.clone())
1919 .with_cancel_token(self.cancel_token.child_token());
1920 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1921 .await;
1922 }
1923 } else {
1924 let track = rtp_track_to_setup.take().unwrap();
1925 self.setup_track_with_stream(&call_option, track)
1926 .await
1927 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1928 }
1929
1930 info!(
1931 session_id = self.session_id,
1932 track_id,
1933 contact = %invite_option.contact,
1934 "invite {} -> {} offer: \n{}",
1935 invite_option.caller,
1936 invite_option.callee,
1937 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1938 );
1939
1940 let (dlg_state_sender, dlg_state_receiver) =
1941 self.invitation.dialog_layer.new_dialog_state_channel();
1942
1943 let states = InviteDialogStates {
1944 is_client: true,
1945 session_id: self.session_id.clone(),
1946 track_id: track_id.clone(),
1947 event_sender: self.event_sender.clone(),
1948 media_stream: self.media_stream.clone(),
1949 call_state: call_state_ref.clone(),
1950 cancel_token,
1951 terminated_reason: None,
1952 has_early_media: false,
1953 };
1954
1955 let hangup_headers = call_option
1956 .sip
1957 .as_ref()
1958 .and_then(|s| s.hangup_headers.as_ref())
1959 .map(|headers_map| {
1960 headers_map
1961 .iter()
1962 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1963 .collect::<Vec<rsip::Header>>()
1964 });
1965
1966 let mut client_dialog_handler = DialogStateReceiverGuard::new(
1967 self.invitation.dialog_layer.clone(),
1968 dlg_state_receiver,
1969 hangup_headers,
1970 );
1971
1972 crate::spawn(async move {
1973 client_dialog_handler.process_dialog(states).await;
1974 });
1975
1976 let (dialog_id, answer) = self
1977 .invitation
1978 .invite(invite_option, dlg_state_sender)
1979 .await?;
1980
1981 self.call_state.write().await.moh = None;
1982
1983 if let Some(track) = rtp_track_to_setup {
1984 self.setup_track_with_stream(&call_option, track)
1985 .await
1986 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1987 }
1988
1989 let answer = match answer {
1990 Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1991 None => {
1992 warn!(session_id = self.session_id, "no answer received");
1993 return Err(rsipstack::Error::DialogError(
1994 "No answer received".to_string(),
1995 dialog_id,
1996 rsip::StatusCode::NotAcceptableHere,
1997 ));
1998 }
1999 };
2000
2001 {
2002 let mut cs = call_state_ref.write().await;
2003 if cs.answer.is_none() {
2004 cs.answer = Some(answer.clone());
2005 }
2006 if auto_hangup {
2007 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2008 }
2009 }
2010
2011 self.media_stream
2012 .update_remote_description(&track_id, &answer)
2013 .await
2014 .ok();
2015
2016 Ok(answer)
2017 }
2018
2019 pub fn is_webrtc_sdp(sdp: &str) -> bool {
2021 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2022 && sdp.contains("a=fingerprint:")
2023 }
2024
2025 pub async fn setup_answer_track(
2026 &self,
2027 ssrc: u32,
2028 option: &CallOption,
2029 offer: String,
2030 ) -> Result<(String, Box<dyn Track>)> {
2031 let offer = match option.enable_ipv6 {
2032 Some(false) | None => strip_ipv6_candidates(&offer),
2033 _ => offer.clone(),
2034 };
2035
2036 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2037
2038 let mut media_track = if Self::is_webrtc_sdp(&offer) {
2039 let mut rtc_config = RtcTrackConfig::default();
2040 rtc_config.mode = rustrtc::TransportMode::WebRtc;
2041 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2042 if let Some(ref external_ip) = self.app_state.config.external_ip {
2043 rtc_config.external_ip = Some(external_ip.clone());
2044 }
2045 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2046 rtc_config.bind_ip = Some(bind_ip.clone());
2047 }
2048 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2049
2050 let webrtc_track = RtcTrack::new(
2051 self.cancel_token.child_token(),
2052 self.session_id.clone(),
2053 self.track_config.clone(),
2054 rtc_config,
2055 )
2056 .with_ssrc(ssrc);
2057
2058 Box::new(webrtc_track) as Box<dyn Track>
2059 } else {
2060 let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
2061 Box::new(rtp_track) as Box<dyn Track>
2062 };
2063
2064 let answer = match media_track.handshake(offer.clone(), timeout).await {
2065 Ok(answer) => answer,
2066 Err(e) => {
2067 return Err(anyhow::anyhow!("handshake failed: {e}"));
2068 }
2069 };
2070
2071 return Ok((answer, media_track));
2072 }
2073
2074 pub async fn prepare_incoming_sip_track(
2075 &self,
2076 cancel_token: CancellationToken,
2077 call_state_ref: ActiveCallStateRef,
2078 track_id: &String,
2079 pending_dialog: PendingDialog,
2080 hangup_headers: Option<Vec<rsip::Header>>,
2081 ) -> Result<()> {
2082 let state_receiver = pending_dialog.state_receiver;
2083 let states = InviteDialogStates {
2086 is_client: false,
2087 session_id: self.session_id.clone(),
2088 track_id: track_id.clone(),
2089 event_sender: self.event_sender.clone(),
2090 media_stream: self.media_stream.clone(),
2091 call_state: self.call_state.clone(),
2092 cancel_token,
2093 terminated_reason: None,
2094 has_early_media: false,
2095 };
2096
2097 let initial_request = pending_dialog.dialog.initial_request();
2098 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2099
2100 let (ssrc, option) = {
2101 let call_state = call_state_ref.read().await;
2102 (
2103 call_state.ssrc,
2104 call_state.option.clone().unwrap_or_default(),
2105 )
2106 };
2107
2108 match self.setup_answer_track(ssrc, &option, offer).await {
2109 Ok((offer, track)) => {
2110 self.setup_track_with_stream(&option, track).await?;
2111 {
2112 let mut state = self.call_state.write().await;
2113 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2114 }
2115 }
2116 Err(e) => {
2117 return Err(anyhow::anyhow!("error creating track: {}", e));
2118 }
2119 }
2120
2121 let mut client_dialog_handler = DialogStateReceiverGuard::new(
2122 self.invitation.dialog_layer.clone(),
2123 state_receiver,
2124 hangup_headers,
2125 );
2126
2127 crate::spawn(async move {
2128 client_dialog_handler.process_dialog(states).await;
2129 });
2130 Ok(())
2131 }
2132}
2133
2134impl Drop for ActiveCall {
2135 fn drop(&mut self) {
2136 info!(session_id = self.session_id, "dropping active call");
2137 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2138 if let Some(record) = self.get_callrecord() {
2139 if let Err(e) = sender.send(record) {
2140 warn!(
2141 session_id = self.session_id,
2142 "failed to send call record: {}", e
2143 );
2144 }
2145 }
2146 }
2147 }
2148}
2149
2150impl ActiveCallState {
2151 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2152 if let Some(existing) = &self.option {
2153 if option.asr.is_none() {
2154 option.asr = existing.asr.clone();
2155 }
2156 if option.tts.is_none() {
2157 option.tts = existing.tts.clone();
2158 }
2159 if option.vad.is_none() {
2160 option.vad = existing.vad.clone();
2161 }
2162 if option.denoise.is_none() {
2163 option.denoise = existing.denoise;
2164 }
2165 if option.recorder.is_none() {
2166 option.recorder = existing.recorder.clone();
2167 }
2168 if option.eou.is_none() {
2169 option.eou = existing.eou.clone();
2170 }
2171 if option.extra.is_none() {
2172 option.extra = existing.extra.clone();
2173 }
2174 if option.ambiance.is_none() {
2175 option.ambiance = existing.ambiance.clone();
2176 }
2177 }
2178 option
2179 }
2180
2181 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2182 if self.hangup_reason.is_none() {
2183 self.hangup_reason = Some(reason);
2184 }
2185 }
2186
2187 pub fn build_hangup_event(
2188 &self,
2189 track_id: TrackId,
2190 initiator: Option<String>,
2191 ) -> crate::event::SessionEvent {
2192 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2193 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2194 let extra = self.extras.clone();
2195
2196 crate::event::SessionEvent::Hangup {
2197 track_id,
2198 timestamp: crate::media::get_timestamp(),
2199 reason: Some(format!("{:?}", self.hangup_reason)),
2200 initiator,
2201 start_time: self.start_time.to_rfc3339(),
2202 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2203 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2204 hangup_time: Utc::now().to_rfc3339(),
2205 extra,
2206 from: from.map(|f| f.into()),
2207 to: to.map(|f| f.into()),
2208 refer: Some(self.is_refer),
2209 }
2210 }
2211
2212 pub fn build_callrecord(
2213 &self,
2214 app_state: AppState,
2215 session_id: String,
2216 call_type: ActiveCallType,
2217 ) -> CallRecord {
2218 let option = self.option.clone().unwrap_or_default();
2219 let recorder = if option.recorder.is_some() {
2220 let recorder_file = app_state.get_recorder_file(&session_id);
2221 if std::path::Path::new(&recorder_file).exists() {
2222 let file_size = std::fs::metadata(&recorder_file)
2223 .map(|m| m.len())
2224 .unwrap_or(0);
2225 vec![crate::callrecord::CallRecordMedia {
2226 track_id: session_id.clone(),
2227 path: recorder_file,
2228 size: file_size,
2229 extra: None,
2230 }]
2231 } else {
2232 vec![]
2233 }
2234 } else {
2235 vec![]
2236 };
2237
2238 let dump_event_file = app_state.get_dump_events_file(&session_id);
2239 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2240 Some(dump_event_file)
2241 } else {
2242 None
2243 };
2244
2245 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2246 if let Ok(rc) = rc.try_read() {
2247 Some(Box::new(rc.build_callrecord(
2248 app_state.clone(),
2249 rc.session_id.clone(),
2250 ActiveCallType::B2bua,
2251 )))
2252 } else {
2253 None
2254 }
2255 });
2256
2257 let caller = option.caller.clone().unwrap_or_default();
2258 let callee = option.callee.clone().unwrap_or_default();
2259
2260 CallRecord {
2261 option: Some(option),
2262 call_id: session_id,
2263 call_type,
2264 start_time: self.start_time,
2265 ring_time: self.ring_time.clone(),
2266 answer_time: self.answer_time.clone(),
2267 end_time: Utc::now(),
2268 caller,
2269 callee,
2270 hangup_reason: self.hangup_reason.clone(),
2271 hangup_messages: Vec::new(),
2272 status_code: self.last_status_code,
2273 extras: self.extras.clone(),
2274 dump_event_file,
2275 recorder,
2276 refer_callrecord,
2277 }
2278 }
2279}