1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 recorder::RecorderOption,
11 stream::{MediaStream, MediaStreamBuilder},
12 track::{
13 Track, TrackConfig,
14 file::FileTrack,
15 media_pass::MediaPassTrack,
16 rtc::{RtcTrack, RtcTrackConfig},
17 tts::SynthesisHandle,
18 websocket::{WebsocketBytesReceiver, WebsocketTrack},
19 },
20 },
21 synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24 app::AppState,
25 call::{
26 CommandReceiver, CommandSender,
27 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28 },
29 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30 useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[cfg(test)]
43mod tests {
44 use super::*;
45 use crate::app::AppStateBuilder;
46 use crate::callrecord::CallRecordHangupReason;
47 use crate::config::Config;
48 use crate::media::track::tts::SynthesisHandle;
49 use crate::synthesis::SynthesisCommand;
50 use tokio::sync::mpsc;
51
52 #[tokio::test]
53 async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
54 let mut config = Config::default();
55 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
57 let stream_engine = Arc::new(StreamEngine::default());
58 let app_state = AppStateBuilder::new()
59 .with_config(config)
60 .with_stream_engine(stream_engine)
61 .build()
62 .await?;
63
64 let cancel_token = CancellationToken::new();
65 let session_id = "test-session".to_string();
66 let track_config = TrackConfig::default();
67
68 let mut option = crate::CallOption::default();
69 option.tts = Some(crate::synthesis::SynthesisOption::default());
70
71 let active_call = Arc::new(ActiveCall::new(
72 ActiveCallType::Sip,
73 cancel_token.clone(),
74 session_id.clone(),
75 app_state.invitation.clone(),
76 app_state.clone(),
77 track_config,
78 None,
79 false,
80 None,
81 None,
82 ));
83
84 {
85 let mut state = active_call.call_state.write().await;
86 state.option = Some(option);
87 }
88
89 let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
90 let initial_ssrc = 12345;
91 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
92
93 {
95 let mut state = active_call.call_state.write().await;
96 state.tts_handle = Some(handle);
97 state.current_play_id = Some("play_1".to_string());
98 }
99
100 active_call
102 .do_tts(
103 "hangup now".to_string(),
104 None,
105 Some("play_1".to_string()),
106 Some(true),
107 false,
108 true,
109 None,
110 None,
111 false,
112 )
113 .await?;
114
115 {
117 let state = active_call.call_state.read().await;
118 assert!(state.auto_hangup.is_some());
119 let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
120 assert_eq!(
121 h_ssrc, initial_ssrc,
122 "SSRC should be reused from existing handle"
123 );
124 assert_eq!(reason, CallRecordHangupReason::BySystem);
125 }
126
127 let cmd = rx.try_recv().expect("Should have received tts command");
129 assert_eq!(cmd.text, "hangup now");
130
131 Ok(())
132 }
133
134 #[tokio::test]
135 async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
136 let mut config = Config::default();
137 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
139 let stream_engine = Arc::new(StreamEngine::default());
140 let app_state = AppStateBuilder::new()
141 .with_config(config)
142 .with_stream_engine(stream_engine)
143 .build()
144 .await?;
145
146 let active_call = Arc::new(ActiveCall::new(
147 ActiveCallType::Sip,
148 CancellationToken::new(),
149 "test-session".to_string(),
150 app_state.invitation.clone(),
151 app_state.clone(),
152 TrackConfig::default(),
153 None,
154 false,
155 None,
156 None,
157 ));
158
159 let mut tts_opt = crate::synthesis::SynthesisOption::default();
160 tts_opt.provider = Some(crate::synthesis::SynthesisType::MsEdge);
161 let mut option = crate::CallOption::default();
162 option.tts = Some(tts_opt);
163 {
164 let mut state = active_call.call_state.write().await;
165 state.option = Some(option);
166 }
167
168 let (tx, _rx) = mpsc::unbounded_channel();
169 let initial_ssrc = 111;
170 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
171
172 {
173 let mut state = active_call.call_state.write().await;
174 state.tts_handle = Some(handle);
175 state.current_play_id = Some("play_1".to_string());
176 }
177
178 active_call
180 .do_tts(
181 "new play".to_string(),
182 None,
183 Some("play_2".to_string()),
184 Some(true),
185 false,
186 true,
187 None,
188 None,
189 false,
190 )
191 .await?;
192
193 {
195 let state = active_call.call_state.read().await;
196 let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
197 assert_ne!(
198 h_ssrc, initial_ssrc,
199 "Should use a new SSRC for different play_id"
200 );
201 }
202
203 Ok(())
204 }
205}
206
207#[derive(Deserialize)]
208#[serde(rename_all = "camelCase")]
209pub struct CallParams {
210 pub id: Option<String>,
211 #[serde(rename = "dump")]
212 pub dump_events: Option<bool>,
213 #[serde(rename = "ping")]
214 pub ping_interval: Option<u32>,
215 pub server_side_track: Option<String>,
216}
217
218#[derive(Debug, Serialize, Deserialize, Clone, Default)]
219#[serde(rename_all = "camelCase")]
220pub enum ActiveCallType {
221 Webrtc,
222 B2bua,
223 WebSocket,
224 #[default]
225 Sip,
226}
227
228#[derive(Default)]
229pub struct ActiveCallState {
230 pub session_id: String,
231 pub start_time: DateTime<Utc>,
232 pub ring_time: Option<DateTime<Utc>>,
233 pub answer_time: Option<DateTime<Utc>>,
234 pub hangup_reason: Option<CallRecordHangupReason>,
235 pub last_status_code: u16,
236 pub option: Option<CallOption>,
237 pub answer: Option<String>,
238 pub ssrc: u32,
239 pub refer_callstate: Option<ActiveCallStateRef>,
240 pub extras: Option<HashMap<String, serde_json::Value>>,
241 pub is_refer: bool,
242
243 pub tts_handle: Option<SynthesisHandle>,
245 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
246 pub wait_input_timeout: Option<u32>,
247 pub moh: Option<String>,
248 pub current_play_id: Option<String>,
249 pub audio_receiver: Option<WebsocketBytesReceiver>,
250 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
251}
252
253pub type ActiveCallRef = Arc<ActiveCall>;
254pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
255
256pub struct ActiveCall {
257 pub call_state: ActiveCallStateRef,
258 pub cancel_token: CancellationToken,
259 pub call_type: ActiveCallType,
260 pub session_id: String,
261 pub media_stream: Arc<MediaStream>,
262 pub track_config: TrackConfig,
263 pub event_sender: EventSender,
264 pub app_state: AppState,
265 pub invitation: Invitation,
266 pub cmd_sender: CommandSender,
267 pub dump_events: bool,
268 pub server_side_track_id: TrackId,
269}
270
271pub struct ActiveCallGuard {
272 pub call: ActiveCallRef,
273 pub active_calls: usize,
274}
275
276impl ActiveCallGuard {
277 pub fn new(call: ActiveCallRef) -> Self {
278 let active_calls = {
279 call.app_state
280 .total_calls
281 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
282 let mut calls = call.app_state.active_calls.lock().unwrap();
283 calls.insert(call.session_id.clone(), call.clone());
284 calls.len()
285 };
286 Self { call, active_calls }
287 }
288}
289
290impl Drop for ActiveCallGuard {
291 fn drop(&mut self) {
292 self.call
293 .app_state
294 .active_calls
295 .lock()
296 .unwrap()
297 .remove(&self.call.session_id);
298 }
299}
300
301pub struct ActiveCallReceiver {
302 pub cmd_receiver: CommandReceiver,
303 pub dump_cmd_receiver: CommandReceiver,
304 pub dump_event_receiver: EventReceiver,
305}
306
307impl ActiveCall {
308 pub fn new(
309 call_type: ActiveCallType,
310 cancel_token: CancellationToken,
311 session_id: String,
312 invitation: Invitation,
313 app_state: AppState,
314 track_config: TrackConfig,
315 audio_receiver: Option<WebsocketBytesReceiver>,
316 dump_events: bool,
317 server_side_track_id: Option<TrackId>,
318 extras: Option<HashMap<String, serde_json::Value>>,
319 ) -> Self {
320 let event_sender = crate::event::create_event_sender();
321 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
322 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
323 .with_id(session_id.clone())
324 .with_cancel_token(cancel_token.child_token());
325 let media_stream = Arc::new(media_stream_builder.build());
326 let call_state = Arc::new(RwLock::new(ActiveCallState {
327 session_id: session_id.clone(),
328 start_time: Utc::now(),
329 ssrc: rand::random::<u32>(),
330 extras,
331 audio_receiver,
332 ..Default::default()
333 }));
334 Self {
335 cancel_token,
336 call_type,
337 session_id,
338 call_state,
339 media_stream,
340 track_config,
341 event_sender,
342 app_state,
343 invitation,
344 cmd_sender,
345 dump_events,
346 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
347 }
348 }
349
350 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
351 self.cmd_sender
352 .send(command)
353 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
354 Ok(())
355 }
356
357 pub fn new_receiver(&self) -> ActiveCallReceiver {
361 ActiveCallReceiver {
362 cmd_receiver: self.cmd_sender.subscribe(),
363 dump_cmd_receiver: self.cmd_sender.subscribe(),
364 dump_event_receiver: self.event_sender.subscribe(),
365 }
366 }
367
368 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
369 let ActiveCallReceiver {
370 mut cmd_receiver,
371 dump_cmd_receiver,
372 dump_event_receiver,
373 } = receiver;
374
375 let process_command_loop = async move {
376 while let Ok(command) = cmd_receiver.recv().await {
377 match self.dispatch(command).await {
378 Ok(_) => (),
379 Err(e) => {
380 warn!(session_id = self.session_id, "{}", e);
381 self.event_sender
382 .send(SessionEvent::Error {
383 track_id: self.session_id.clone(),
384 timestamp: crate::media::get_timestamp(),
385 sender: "command".to_string(),
386 error: e.to_string(),
387 code: None,
388 })
389 .ok();
390 }
391 }
392 }
393 };
394 self.app_state
395 .total_calls
396 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
397
398 tokio::join!(
399 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
400 async {
401 select! {
402 _ = process_command_loop => {
403 info!(session_id = self.session_id, "command loop done");
404 }
405 _ = self.process() => {
406 info!(session_id = self.session_id, "call serve done");
407 }
408 _ = self.cancel_token.cancelled() => {
409 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
410 }
411 }
412 self.cancel_token.cancel();
413 }
414 );
415 Ok(())
416 }
417
418 async fn process(&self) -> Result<()> {
419 let mut event_receiver = self.event_sender.subscribe();
420
421 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
422 let input_timeout_expire_ref = input_timeout_expire.clone();
423 let event_sender = self.event_sender.clone();
424 let wait_input_timeout_loop = async {
425 loop {
426 let (start_time, expire) = { *input_timeout_expire.lock().await };
427 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
428 info!(session_id = self.session_id, "wait input timeout reached");
429 *input_timeout_expire.lock().await = (0, 0);
430 event_sender
431 .send(SessionEvent::Silence {
432 track_id: self.server_side_track_id.clone(),
433 timestamp: crate::media::get_timestamp(),
434 start_time,
435 duration: expire as u64,
436 samples: None,
437 })
438 .ok();
439 }
440 sleep(Duration::from_millis(100)).await;
441 }
442 };
443 let server_side_track_id = self.server_side_track_id.clone();
444 let event_hook_loop = async move {
445 while let Ok(event) = event_receiver.recv().await {
446 match event {
447 SessionEvent::Speaking { .. }
448 | SessionEvent::Dtmf { .. }
449 | SessionEvent::AsrDelta { .. }
450 | SessionEvent::AsrFinal { .. }
451 | SessionEvent::TrackStart { .. } => {
452 *input_timeout_expire_ref.lock().await = (0, 0);
453 }
454 SessionEvent::TrackEnd {
455 track_id,
456 play_id,
457 ssrc,
458 ..
459 } => {
460 if track_id != server_side_track_id {
461 continue;
462 }
463
464 let (moh_path, auto_hangup, wait_timeout_val) = {
465 let mut state = self.call_state.write().await;
466 if play_id != state.current_play_id {
467 debug!(
468 session_id = self.session_id,
469 ?play_id,
470 current = ?state.current_play_id,
471 "ignoring interrupted track end"
472 );
473 continue;
474 }
475 state.current_play_id = None;
476 (
477 state.moh.clone(),
478 state.auto_hangup.clone(),
479 state.wait_input_timeout.take(),
480 )
481 };
482
483 if let Some(path) = moh_path {
484 info!(session_id = self.session_id, "looping moh: {}", path);
485 let ssrc = rand::random::<u32>();
486 let file_track = FileTrack::new(self.server_side_track_id.clone())
487 .with_play_id(Some(path.clone()))
488 .with_ssrc(ssrc)
489 .with_path(path.clone())
490 .with_cancel_token(self.cancel_token.child_token());
491 self.update_track_wrapper(Box::new(file_track), Some(path))
492 .await;
493 continue;
494 }
495
496 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
497 if hangup_ssrc == ssrc {
498 info!(
499 session_id = self.session_id,
500 ssrc, "auto hangup when track end track_id:{}", track_id
501 );
502 self.do_hangup(Some(hangup_reason), None).await.ok();
503 }
504 }
505
506 if let Some(timeout) = wait_timeout_val {
507 let expire = if timeout > 0 {
508 (crate::media::get_timestamp(), timeout)
509 } else {
510 (0, 0)
511 };
512 *input_timeout_expire_ref.lock().await = expire;
513 }
514 }
515 SessionEvent::Interrupt { receiver } => {
516 let track_id =
517 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
518 if track_id == self.server_side_track_id {
519 debug!(
520 session_id = self.session_id,
521 "received interrupt event, stopping playback"
522 );
523 self.do_interrupt(true).await.ok();
524 }
525 }
526 SessionEvent::Inactivity { track_id, .. } => {
527 info!(
528 session_id = self.session_id,
529 track_id, "inactivity timeout reached, hanging up"
530 );
531 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
532 .await
533 .ok();
534 }
535 SessionEvent::Error { track_id, .. } => {
536 if track_id != server_side_track_id {
537 continue;
538 }
539
540 let moh_info = {
541 let mut state = self.call_state.write().await;
542 if let Some(path) = state.moh.clone() {
543 let fallback = "./config/sounds/refer_moh.wav".to_string();
544 let next_path = if path != fallback
545 && std::path::Path::new(&fallback).exists()
546 {
547 info!(
548 session_id = self.session_id,
549 "moh error, switching to fallback: {}", fallback
550 );
551 state.moh = Some(fallback.clone());
552 fallback
553 } else {
554 info!(
555 session_id = self.session_id,
556 "looping moh on error: {}", path
557 );
558 path
559 };
560 Some(next_path)
561 } else {
562 None
563 }
564 };
565
566 if let Some(next_path) = moh_info {
567 let ssrc = rand::random::<u32>();
568 let file_track = FileTrack::new(self.server_side_track_id.clone())
569 .with_play_id(Some(next_path.clone()))
570 .with_ssrc(ssrc)
571 .with_path(next_path.clone())
572 .with_cancel_token(self.cancel_token.child_token());
573 self.update_track_wrapper(Box::new(file_track), Some(next_path))
574 .await;
575 continue;
576 }
577 }
578 _ => {}
579 }
580 }
581 };
582
583 select! {
584 _ = wait_input_timeout_loop=>{
585 info!(session_id = self.session_id, "wait input timeout loop done");
586 }
587 _ = self.media_stream.serve() => {
588 info!(session_id = self.session_id, "media stream loop done");
589 }
590 _ = event_hook_loop => {
591 info!(session_id = self.session_id, "event loop done");
592 }
593 }
594 Ok(())
595 }
596
597 async fn dispatch(&self, command: Command) -> Result<()> {
598 match command {
599 Command::Invite { option } => self.do_invite(option).await,
600 Command::Accept { option } => self.do_accept(option).await,
601 Command::Reject { reason, code } => {
602 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
603 .await
604 }
605 Command::Ringing {
606 ringtone,
607 recorder,
608 early_media,
609 } => self.do_ringing(ringtone, recorder, early_media).await,
610 Command::Tts {
611 text,
612 speaker,
613 play_id,
614 auto_hangup,
615 streaming,
616 end_of_stream,
617 option,
618 wait_input_timeout,
619 base64,
620 } => {
621 self.do_tts(
622 text,
623 speaker,
624 play_id,
625 auto_hangup,
626 streaming.unwrap_or_default(),
627 end_of_stream.unwrap_or_default(),
628 option,
629 wait_input_timeout,
630 base64.unwrap_or_default(),
631 )
632 .await
633 }
634 Command::Play {
635 url,
636 play_id,
637 auto_hangup,
638 wait_input_timeout,
639 } => {
640 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
641 .await
642 }
643 Command::Hangup { reason, initiator } => {
644 let reason = reason.map(|r| {
645 r.parse::<CallRecordHangupReason>()
646 .unwrap_or(CallRecordHangupReason::BySystem)
647 });
648 self.do_hangup(reason, initiator).await
649 }
650 Command::Refer {
651 caller,
652 callee,
653 options,
654 } => self.do_refer(caller, callee, options).await,
655 Command::Mute { track_id } => self.do_mute(track_id).await,
656 Command::Unmute { track_id } => self.do_unmute(track_id).await,
657 Command::Pause {} => self.do_pause().await,
658 Command::Resume {} => self.do_resume().await,
659 Command::Interrupt {
660 graceful: passage,
661 fade_out_ms: _,
662 } => self.do_interrupt(passage.unwrap_or_default()).await,
663 Command::History { speaker, text } => self.do_history(speaker, text).await,
664 }
665 }
666
667 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
668 if let Some(recorder_option) = &option.recorder {
669 let mut recorder_file = recorder_option.recorder_file.clone();
670 if recorder_file.contains("{id}") {
671 recorder_file = recorder_file.replace("{id}", &self.session_id);
672 }
673
674 let recorder_file = if recorder_file.is_empty() {
675 self.app_state.get_recorder_file(&self.session_id)
676 } else {
677 let p = Path::new(&recorder_file);
678 p.is_absolute()
679 .then(|| recorder_file.clone())
680 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
681 };
682 info!(
683 session_id = self.session_id,
684 recorder_file, "created recording file"
685 );
686
687 let track_samplerate = self.track_config.samplerate;
688 let recorder_samplerate = if track_samplerate > 0 {
689 track_samplerate
690 } else {
691 recorder_option.samplerate
692 };
693 let recorder_ptime = if recorder_option.ptime == 0 {
694 200
695 } else {
696 recorder_option.ptime
697 };
698 let requested_format = recorder_option
699 .format
700 .unwrap_or(self.app_state.config.recorder_format());
701 let format = requested_format.effective();
702 if requested_format != format {
703 warn!(
704 session_id = self.session_id,
705 requested = requested_format.extension(),
706 "Recorder format fallback to wav due to unsupported feature"
707 );
708 }
709 let mut recorder_config = RecorderOption {
710 recorder_file,
711 samplerate: recorder_samplerate,
712 ptime: recorder_ptime,
713 format: Some(format),
714 };
715 recorder_config.ensure_path_extension(format);
716 Some(recorder_config)
717 } else {
718 None
719 }
720 }
721
722 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
723 {
725 let state = self.call_state.read().await;
726 option = state.merge_option(option);
727 }
728
729 option.check_default();
730 if let Some(opt) = self.build_record_option(&option) {
731 self.media_stream.update_recorder_option(opt).await;
732 }
733
734 if let Some(opt) = &option.media_pass {
735 let track_id = self.server_side_track_id.clone();
736 let cancel_token = self.cancel_token.child_token();
737 let ssrc = rand::random::<u32>();
738 let media_pass_track = MediaPassTrack::new(
739 self.session_id.clone(),
740 ssrc,
741 track_id,
742 cancel_token,
743 opt.clone(),
744 );
745 self.update_track_wrapper(Box::new(media_pass_track), None)
746 .await;
747 }
748
749 info!(
750 session_id = self.session_id,
751 call_type = ?self.call_type,
752 sender,
753 ?option,
754 "caller with option"
755 );
756
757 match self.setup_caller_track(&option).await {
758 Ok(_) => return Ok(option),
759 Err(e) => {
760 self.app_state
761 .total_failed_calls
762 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
763 let error_event = crate::event::SessionEvent::Error {
764 track_id: self.session_id.clone(),
765 timestamp: crate::media::get_timestamp(),
766 sender,
767 error: e.to_string(),
768 code: None,
769 };
770 self.event_sender.send(error_event).ok();
771 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
772 .await
773 .ok();
774 return Err(e);
775 }
776 }
777 }
778
779 async fn do_invite(&self, option: CallOption) -> Result<()> {
780 self.invite_or_accept(option, "invite".to_string())
781 .await
782 .map(|_| ())
783 }
784
785 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
786 let has_pending = self
787 .invitation
788 .find_dialog_id_by_session_id(&self.session_id)
789 .is_some();
790 let ready_to_answer_val = {
791 let state = self.call_state.read().await;
792 state.ready_to_answer.is_none()
793 };
794
795 if ready_to_answer_val {
796 if !has_pending {
797 warn!(session_id = self.session_id, "no pending call to accept");
799 let rejet_event = crate::event::SessionEvent::Reject {
800 track_id: self.session_id.clone(),
801 timestamp: crate::media::get_timestamp(),
802 reason: "no pending call".to_string(),
803 refer: None,
804 code: Some(486),
805 };
806 self.event_sender.send(rejet_event).ok();
807 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
808 .await
809 .ok();
810 return Err(anyhow::anyhow!("no pending call to accept"));
811 }
812 option = self.invite_or_accept(option, "accept".to_string()).await?;
813 } else {
814 option.check_default();
815 self.call_state.write().await.option = Some(option.clone());
816 }
817 info!(session_id = self.session_id, ?option, "accepting call");
818 let ready = self.call_state.write().await.ready_to_answer.take();
819 if let Some((answer, track, dialog)) = ready {
820 info!(
821 session_id = self.session_id,
822 track_id = track.as_ref().map(|t| t.id()),
823 "ready to answer with track"
824 );
825
826 let headers = vec![rsip::Header::ContentType(
827 "application/sdp".to_string().into(),
828 )];
829
830 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
831 Ok(_) => {
832 {
833 let mut state = self.call_state.write().await;
834 state.answer = Some(answer);
835 state.answer_time = Some(Utc::now());
836 }
837 self.finish_caller_stack(&option, track).await?;
838 }
839 Err(e) => {
840 warn!(session_id = self.session_id, "failed to accept call: {}", e);
841 return Err(anyhow::anyhow!("failed to accept call"));
842 }
843 }
844 }
845 return Ok(());
846 }
847
848 async fn do_reject(
849 &self,
850 code: Option<rsip::StatusCode>,
851 reason: Option<String>,
852 ) -> Result<()> {
853 match self
854 .invitation
855 .find_dialog_id_by_session_id(&self.session_id)
856 {
857 Some(id) => {
858 info!(
859 session_id = self.session_id,
860 ?reason,
861 ?code,
862 "rejecting call"
863 );
864 self.invitation.hangup(id, code, reason).await
865 }
866 None => Ok(()),
867 }
868 }
869
870 async fn do_ringing(
871 &self,
872 ringtone: Option<String>,
873 recorder: Option<RecorderOption>,
874 early_media: Option<bool>,
875 ) -> Result<()> {
876 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
877 if ready_to_answer_val {
878 let option = CallOption {
879 recorder,
880 ..Default::default()
881 };
882 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
883 }
884
885 let state = self.call_state.read().await;
886 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
887 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
888 let headers = vec![rsip::Header::ContentType(
889 "application/sdp".to_string().into(),
890 )];
891 (Some(headers), Some(answer.as_bytes().to_vec()))
892 } else {
893 (None, None)
894 };
895
896 dialog.ringing(headers, body).ok();
897 info!(
898 session_id = self.session_id,
899 ringtone, early_media, "playing ringtone"
900 );
901 if let Some(ringtone_url) = ringtone {
902 drop(state);
903 self.do_play(ringtone_url, None, None, None).await.ok();
904 } else {
905 info!(session_id = self.session_id, "no ringtone to play");
906 }
907 }
908 Ok(())
909 }
910
911 async fn do_tts(
912 &self,
913 text: String,
914 speaker: Option<String>,
915 play_id: Option<String>,
916 auto_hangup: Option<bool>,
917 streaming: bool,
918 end_of_stream: bool,
919 option: Option<SynthesisOption>,
920 wait_input_timeout: Option<u32>,
921 base64: bool,
922 ) -> Result<()> {
923 let tts_option = {
924 let call_state = self.call_state.read().await;
925 match call_state.option.clone().unwrap_or_default().tts {
926 Some(opt) => opt.merge_with(option),
927 None => {
928 if let Some(opt) = option {
929 opt
930 } else {
931 return Err(anyhow::anyhow!("no tts option available"));
932 }
933 }
934 }
935 };
936 let speaker = match speaker {
937 Some(s) => Some(s),
938 None => tts_option.speaker.clone(),
939 };
940
941 let mut play_command = SynthesisCommand {
942 text,
943 speaker,
944 play_id: play_id.clone(),
945 streaming,
946 end_of_stream,
947 option: tts_option,
948 base64,
949 };
950 info!(
951 session_id = self.session_id,
952 provider = ?play_command.option.provider,
953 text = %play_command.text.chars().take(10).collect::<String>(),
954 speaker = play_command.speaker.as_deref(),
955 auto_hangup = auto_hangup.unwrap_or_default(),
956 play_id = play_command.play_id.as_deref(),
957 streaming = play_command.streaming,
958 end_of_stream = play_command.end_of_stream,
959 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
960 is_base64 = play_command.base64,
961 "new synthesis"
962 );
963
964 let ssrc = rand::random::<u32>();
965 let (should_interrupt, picked_ssrc) = {
966 let mut state = self.call_state.write().await;
967
968 let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
969 if play_id.is_some() && state.current_play_id != play_id {
970 (ssrc, true)
971 } else {
972 (handle.ssrc, false)
973 }
974 } else {
975 (ssrc, false)
976 };
977
978 state.auto_hangup = match auto_hangup {
979 Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
980 _ => state.auto_hangup.clone(),
981 };
982 state.wait_input_timeout = wait_input_timeout;
983
984 state.current_play_id = play_id.clone();
985 (changed, target_ssrc)
986 };
987
988 if should_interrupt {
989 let _ = self.do_interrupt(false).await;
990 }
991
992 let existing_handle = self.call_state.read().await.tts_handle.clone();
993 if let Some(tts_handle) = existing_handle {
994 match tts_handle.try_send(play_command) {
995 Ok(_) => return Ok(()),
996 Err(e) => {
997 play_command = e.0;
998 }
999 }
1000 }
1001
1002 let (new_handle, tts_track) = StreamEngine::create_tts_track(
1003 self.app_state.stream_engine.clone(),
1004 self.cancel_token.child_token(),
1005 self.session_id.clone(),
1006 self.server_side_track_id.clone(),
1007 picked_ssrc,
1008 play_id.clone(),
1009 streaming,
1010 &play_command.option,
1011 )
1012 .await?;
1013
1014 new_handle.try_send(play_command)?;
1015 self.call_state.write().await.tts_handle = Some(new_handle);
1016 self.update_track_wrapper(tts_track, play_id).await;
1017 Ok(())
1018 }
1019
1020 async fn do_play(
1021 &self,
1022 url: String,
1023 play_id: Option<String>,
1024 auto_hangup: Option<bool>,
1025 wait_input_timeout: Option<u32>,
1026 ) -> Result<()> {
1027 let ssrc = rand::random::<u32>();
1028 info!(
1029 session_id = self.session_id,
1030 ssrc, url, play_id, auto_hangup, "play file track"
1031 );
1032
1033 let play_id = play_id.or(Some(url.clone()));
1034
1035 let file_track = FileTrack::new(self.server_side_track_id.clone())
1036 .with_play_id(play_id.clone())
1037 .with_ssrc(ssrc)
1038 .with_path(url)
1039 .with_cancel_token(self.cancel_token.child_token());
1040
1041 {
1042 let mut state = self.call_state.write().await;
1043 state.tts_handle = None;
1044 state.auto_hangup = match auto_hangup {
1045 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1046 _ => None,
1047 };
1048 state.wait_input_timeout = wait_input_timeout;
1049 }
1050
1051 self.update_track_wrapper(Box::new(file_track), play_id)
1052 .await;
1053 Ok(())
1054 }
1055
1056 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1057 self.event_sender
1058 .send(SessionEvent::AddHistory {
1059 sender: Some(self.session_id.clone()),
1060 timestamp: crate::media::get_timestamp(),
1061 speaker,
1062 text,
1063 })
1064 .map(|_| ())
1065 .map_err(Into::into)
1066 }
1067
1068 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1069 {
1070 let mut state = self.call_state.write().await;
1071 state.tts_handle = None;
1072 state.moh = None;
1073 }
1074 self.media_stream
1075 .remove_track(&self.server_side_track_id, graceful)
1076 .await;
1077 Ok(())
1078 }
1079 async fn do_pause(&self) -> Result<()> {
1080 Ok(())
1081 }
1082 async fn do_resume(&self) -> Result<()> {
1083 Ok(())
1084 }
1085 async fn do_hangup(
1086 &self,
1087 reason: Option<CallRecordHangupReason>,
1088 initiator: Option<String>,
1089 ) -> Result<()> {
1090 info!(
1091 session_id = self.session_id,
1092 ?reason,
1093 ?initiator,
1094 "do_hangup"
1095 );
1096
1097 let hangup_reason = match initiator.as_deref() {
1099 Some("caller") => CallRecordHangupReason::ByCaller,
1100 Some("callee") => CallRecordHangupReason::ByCallee,
1101 Some("system") => CallRecordHangupReason::Autohangup,
1102 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1103 };
1104
1105 self.media_stream
1106 .stop(Some(hangup_reason.to_string()), initiator);
1107
1108 self.call_state
1109 .write()
1110 .await
1111 .set_hangup_reason(hangup_reason);
1112 Ok(())
1113 }
1114
1115 async fn do_refer(
1116 &self,
1117 caller: String,
1118 callee: String,
1119 refer_option: Option<ReferOption>,
1120 ) -> Result<()> {
1121 self.do_interrupt(false).await.ok();
1122 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1123 if let Some(ref path) = moh {
1124 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1125 let fallback = "./config/sounds/refer_moh.wav";
1126 if std::path::Path::new(fallback).exists() {
1127 info!(
1128 session_id = self.session_id,
1129 "moh {} not found, using fallback {}", path, fallback
1130 );
1131 moh = Some(fallback.to_string());
1132 }
1133 }
1134 }
1135 let session_id = self.session_id.clone();
1136 let track_id = self.server_side_track_id.clone();
1137
1138 let recorder = {
1139 let cs = self.call_state.read().await;
1140 cs.option
1141 .as_ref()
1142 .map(|o| o.recorder.clone())
1143 .unwrap_or_default()
1144 };
1145
1146 let call_option = CallOption {
1147 caller: Some(caller),
1148 callee: Some(callee.clone()),
1149 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1150 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1151 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1152 recorder,
1153 ..Default::default()
1154 };
1155
1156 let mut invite_option = call_option.build_invite_option()?;
1157 invite_option.call_id = Some(self.session_id.clone());
1158
1159 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1160
1161 {
1162 let cs = self.call_state.read().await;
1163 if let Some(opt) = cs.option.as_ref() {
1164 if let Some(callee) = opt.callee.as_ref() {
1165 headers.push(rsip::Header::Other(
1166 "X-Referred-To".to_string(),
1167 callee.clone(),
1168 ));
1169 }
1170 if let Some(caller) = opt.caller.as_ref() {
1171 headers.push(rsip::Header::Other(
1172 "X-Referred-From".to_string(),
1173 caller.clone(),
1174 ));
1175 }
1176 }
1177 }
1178
1179 headers.push(rsip::Header::Other(
1180 "X-Referred-Id".to_string(),
1181 self.session_id.clone(),
1182 ));
1183
1184 let ssrc = rand::random::<u32>();
1185 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1186 start_time: Utc::now(),
1187 ssrc,
1188 option: Some(call_option.clone()),
1189 is_refer: true,
1190 ..Default::default()
1191 }));
1192
1193 {
1194 let mut cs = self.call_state.write().await;
1195 cs.refer_callstate.replace(refer_call_state.clone());
1196 }
1197
1198 let auto_hangup_requested = refer_option
1199 .as_ref()
1200 .and_then(|o| o.auto_hangup)
1201 .unwrap_or(true);
1202
1203 if auto_hangup_requested {
1204 self.call_state.write().await.auto_hangup =
1205 Some((ssrc, CallRecordHangupReason::ByRefer));
1206 } else {
1207 self.call_state.write().await.auto_hangup = None;
1208 }
1209
1210 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1211
1212 info!(
1213 session_id = self.session_id,
1214 ssrc,
1215 auto_hangup = auto_hangup_requested,
1216 callee,
1217 timeout_secs,
1218 "do_refer"
1219 );
1220
1221 let r = tokio::time::timeout(
1222 Duration::from_secs(timeout_secs as u64),
1223 self.create_outgoing_sip_track(
1224 self.cancel_token.child_token(),
1225 refer_call_state.clone(),
1226 &track_id,
1227 invite_option,
1228 &call_option,
1229 moh,
1230 auto_hangup_requested,
1231 ),
1232 )
1233 .await;
1234
1235 {
1236 self.call_state.write().await.moh = None;
1237 }
1238
1239 let result = match r {
1240 Ok(res) => res,
1241 Err(_) => {
1242 warn!(
1243 session_id = session_id,
1244 "refer sip track creation timed out after {} seconds", timeout_secs
1245 );
1246 self.event_sender
1247 .send(SessionEvent::Reject {
1248 track_id,
1249 timestamp: crate::media::get_timestamp(),
1250 reason: "Timeout when refer".into(),
1251 code: Some(408),
1252 refer: Some(true),
1253 })
1254 .ok();
1255 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1256 }
1257 };
1258
1259 match result {
1260 Ok(answer) => {
1261 self.event_sender
1262 .send(SessionEvent::Answer {
1263 timestamp: crate::media::get_timestamp(),
1264 track_id,
1265 sdp: answer,
1266 refer: Some(true),
1267 })
1268 .ok();
1269 }
1270 Err(e) => {
1271 warn!(
1272 session_id = session_id,
1273 "failed to create refer sip track: {}", e
1274 );
1275 match &e {
1276 rsipstack::Error::DialogError(reason, _, code) => {
1277 self.event_sender
1278 .send(SessionEvent::Reject {
1279 track_id,
1280 timestamp: crate::media::get_timestamp(),
1281 reason: reason.clone(),
1282 code: Some(code.code() as u32),
1283 refer: Some(true),
1284 })
1285 .ok();
1286 }
1287 _ => {}
1288 }
1289 return Err(e.into());
1290 }
1291 }
1292 Ok(())
1293 }
1294
1295 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1296 self.media_stream.mute_track(track_id).await;
1297 Ok(())
1298 }
1299
1300 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1301 self.media_stream.unmute_track(track_id).await;
1302 Ok(())
1303 }
1304
1305 pub async fn cleanup(&self) -> Result<()> {
1306 self.call_state.write().await.tts_handle = None;
1307 self.media_stream.cleanup().await.ok();
1308 Ok(())
1309 }
1310
1311 pub fn get_callrecord(&self) -> Option<CallRecord> {
1312 self.call_state.try_read().ok().map(|call_state| {
1313 call_state.build_callrecord(
1314 self.app_state.clone(),
1315 self.session_id.clone(),
1316 self.call_type.clone(),
1317 )
1318 })
1319 }
1320
1321 async fn dump_to_file(
1322 &self,
1323 dump_file: &mut File,
1324 cmd_receiver: &mut CommandReceiver,
1325 event_receiver: &mut EventReceiver,
1326 ) {
1327 loop {
1328 select! {
1329 _ = self.cancel_token.cancelled() => {
1330 break;
1331 }
1332 Ok(cmd) = cmd_receiver.recv() => {
1333 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1334 .await;
1335 }
1336 Ok(event) = event_receiver.recv() => {
1337 if matches!(event, SessionEvent::Binary{..}) {
1338 continue;
1339 }
1340 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1341 .await;
1342 }
1343 };
1344 }
1345 }
1346
1347 async fn dump_loop(
1348 &self,
1349 dump_events: bool,
1350 mut dump_cmd_receiver: CommandReceiver,
1351 mut dump_event_receiver: EventReceiver,
1352 ) {
1353 if !dump_events {
1354 return;
1355 }
1356
1357 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1358 let mut dump_file = match File::options()
1359 .create(true)
1360 .append(true)
1361 .open(&file_name)
1362 .await
1363 {
1364 Ok(file) => file,
1365 Err(e) => {
1366 warn!(
1367 session_id = self.session_id,
1368 file_name, "failed to open dump events file: {}", e
1369 );
1370 return;
1371 }
1372 };
1373 self.dump_to_file(
1374 &mut dump_file,
1375 &mut dump_cmd_receiver,
1376 &mut dump_event_receiver,
1377 )
1378 .await;
1379
1380 while let Ok(event) = dump_event_receiver.try_recv() {
1381 if matches!(event, SessionEvent::Binary { .. }) {
1382 continue;
1383 }
1384 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1385 }
1386 }
1387
1388 pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1389 let mut rtc_config = RtcTrackConfig::default();
1390 rtc_config.mode = rustrtc::TransportMode::Rtp;
1391
1392 if let Some(codecs) = &self.app_state.config.codecs {
1393 let mut codec_types = Vec::new();
1394 for c in codecs {
1395 match c.to_lowercase().as_str() {
1396 "pcmu" => codec_types.push(CodecType::PCMU),
1397 "pcma" => codec_types.push(CodecType::PCMA),
1398 "g722" => codec_types.push(CodecType::G722),
1399 "g729" => codec_types.push(CodecType::G729),
1400 "opus" => codec_types.push(CodecType::Opus),
1401 "dtmf" | "2833" | "telephone_event" => {
1402 codec_types.push(CodecType::TelephoneEvent)
1403 }
1404 _ => {}
1405 }
1406 }
1407 if !codec_types.is_empty() {
1408 rtc_config.preferred_codec = Some(codec_types[0].clone());
1409 rtc_config.codecs = codec_types;
1410 }
1411 }
1412
1413 if rtc_config.preferred_codec.is_none() {
1414 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1415 }
1416
1417 rtc_config.rtp_port_range = self
1418 .app_state
1419 .config
1420 .rtp_start_port
1421 .zip(self.app_state.config.rtp_end_port);
1422
1423 if let Some(ref external_ip) = self.app_state.config.external_ip {
1424 rtc_config.external_ip = Some(external_ip.clone());
1425 }
1426
1427 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1428
1429 let mut track = RtcTrack::new(
1430 self.cancel_token.child_token(),
1431 track_id,
1432 self.track_config.clone(),
1433 rtc_config,
1434 )
1435 .with_ssrc(ssrc);
1436
1437 track.create().await?;
1438
1439 Ok(track)
1440 }
1441
1442 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1443 self.call_state.write().await.option = Some(option.clone());
1444 info!(
1445 session_id = self.session_id,
1446 call_type = ?self.call_type,
1447 "setup caller track"
1448 );
1449
1450 let track = match self.call_type {
1451 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1452 ActiveCallType::WebSocket => {
1453 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1454 if let Some(receiver) = audio_receiver {
1455 Some(self.create_websocket_track(receiver).await?)
1456 } else {
1457 None
1458 }
1459 }
1460 ActiveCallType::Sip => {
1461 if let Some(dialog_id) = self
1462 .invitation
1463 .find_dialog_id_by_session_id(&self.session_id)
1464 {
1465 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1466 return self
1467 .prepare_incoming_sip_track(
1468 self.cancel_token.clone(),
1469 self.call_state.clone(),
1470 &self.session_id,
1471 pending_dialog,
1472 )
1473 .await;
1474 }
1475 }
1476
1477 let mut option = option.clone();
1479 if option.sip.is_none()
1480 || option
1481 .sip
1482 .as_ref()
1483 .and_then(|s| s.username.as_ref())
1484 .is_none()
1485 {
1486 if let Some(callee) = &option.callee {
1487 if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1488 if option.sip.is_none() {
1489 option.sip = Some(crate::SipOption {
1490 username: Some(cred.username.clone()),
1491 password: Some(cred.password.clone()),
1492 realm: cred.realm.clone(),
1493 ..Default::default()
1494 });
1495 }
1496 }
1497 }
1498 }
1499
1500 let mut invite_option = option.build_invite_option()?;
1501 invite_option.call_id = Some(self.session_id.clone());
1502
1503 match self
1504 .create_outgoing_sip_track(
1505 self.cancel_token.clone(),
1506 self.call_state.clone(),
1507 &self.session_id,
1508 invite_option,
1509 &option,
1510 None,
1511 false,
1512 )
1513 .await
1514 {
1515 Ok(answer) => {
1516 self.event_sender
1517 .send(SessionEvent::Answer {
1518 timestamp: crate::media::get_timestamp(),
1519 track_id: self.session_id.clone(),
1520 sdp: answer,
1521 refer: Some(false),
1522 })
1523 .ok();
1524 return Ok(());
1525 }
1526 Err(e) => {
1527 warn!(
1528 session_id = self.session_id,
1529 "failed to create sip track: {}", e
1530 );
1531 match &e {
1532 rsipstack::Error::DialogError(reason, _, code) => {
1533 self.event_sender
1534 .send(SessionEvent::Reject {
1535 track_id: self.session_id.clone(),
1536 timestamp: crate::media::get_timestamp(),
1537 reason: reason.clone(),
1538 code: Some(code.code() as u32),
1539 refer: Some(false),
1540 })
1541 .ok();
1542 }
1543 _ => {}
1544 }
1545 return Err(e.into());
1546 }
1547 }
1548 }
1549 ActiveCallType::B2bua => {
1550 if let Some(dialog_id) = self
1551 .invitation
1552 .find_dialog_id_by_session_id(&self.session_id)
1553 {
1554 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1555 return self
1556 .prepare_incoming_sip_track(
1557 self.cancel_token.clone(),
1558 self.call_state.clone(),
1559 &self.session_id,
1560 pending_dialog,
1561 )
1562 .await;
1563 }
1564 }
1565
1566 warn!(
1567 session_id = self.session_id,
1568 "no pending dialog found for B2BUA call"
1569 );
1570 return Err(anyhow::anyhow!(
1571 "no pending dialog found for session_id: {}",
1572 self.session_id
1573 ));
1574 }
1575 };
1576 match track {
1577 Some(track) => {
1578 self.finish_caller_stack(&option, Some(track)).await?;
1579 }
1580 None => {
1581 warn!(session_id = self.session_id, "no track created for caller");
1582 return Err(anyhow::anyhow!("no track created for caller"));
1583 }
1584 }
1585 Ok(())
1586 }
1587
1588 async fn finish_caller_stack(
1589 &self,
1590 option: &CallOption,
1591 track: Option<Box<dyn Track>>,
1592 ) -> Result<()> {
1593 if let Some(track) = track {
1594 self.setup_track_with_stream(&option, track).await?;
1595 }
1596
1597 {
1598 let call_state = self.call_state.read().await;
1599 if let Some(ref answer) = call_state.answer {
1600 info!(
1601 session_id = self.session_id,
1602 "sending answer event: {}", answer,
1603 );
1604 self.event_sender
1605 .send(SessionEvent::Answer {
1606 timestamp: crate::media::get_timestamp(),
1607 track_id: self.session_id.clone(),
1608 sdp: answer.clone(),
1609 refer: Some(false),
1610 })
1611 .ok();
1612 } else {
1613 warn!(
1614 session_id = self.session_id,
1615 "no answer in state to send event"
1616 );
1617 }
1618 }
1619 Ok(())
1620 }
1621
1622 pub async fn setup_track_with_stream(
1623 &self,
1624 option: &CallOption,
1625 mut track: Box<dyn Track>,
1626 ) -> Result<()> {
1627 let processors = match StreamEngine::create_processors(
1628 self.app_state.stream_engine.clone(),
1629 track.as_ref(),
1630 self.cancel_token.child_token(),
1631 self.event_sender.clone(),
1632 self.media_stream.packet_sender.clone(),
1633 option,
1634 )
1635 .await
1636 {
1637 Ok(processors) => processors,
1638 Err(e) => {
1639 warn!(
1640 session_id = self.session_id,
1641 "failed to prepare stream processors: {}", e
1642 );
1643 vec![]
1644 }
1645 };
1646
1647 for processor in processors {
1649 track.append_processor(processor);
1650 }
1651
1652 self.update_track_wrapper(track, None).await;
1653 Ok(())
1654 }
1655
1656 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1657 let ambiance_opt = {
1658 let state = self.call_state.read().await;
1659 let mut opt = state
1660 .option
1661 .as_ref()
1662 .and_then(|o| o.ambiance.clone())
1663 .unwrap_or_default();
1664
1665 if let Some(global) = &self.app_state.config.ambiance {
1666 opt.merge(global);
1667 }
1668 opt
1669 };
1670 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1671 match AmbianceProcessor::new(ambiance_opt).await {
1672 Ok(ambiance) => {
1673 info!(session_id = self.session_id, "loaded ambiance processor");
1674 track.append_processor(Box::new(ambiance));
1675 }
1676 Err(e) => {
1677 tracing::error!("failed to load ambiance wav {}", e);
1678 }
1679 }
1680 }
1681 self.call_state.write().await.current_play_id = play_id.clone();
1682 self.media_stream.update_track(track, play_id).await;
1683 }
1684
1685 pub async fn create_websocket_track(
1686 &self,
1687 audio_receiver: WebsocketBytesReceiver,
1688 ) -> Result<Box<dyn Track>> {
1689 let (ssrc, codec) = {
1690 let call_state = self.call_state.read().await;
1691 (
1692 call_state.ssrc,
1693 call_state
1694 .option
1695 .as_ref()
1696 .map(|o| o.codec.clone())
1697 .unwrap_or_default(),
1698 )
1699 };
1700
1701 let ws_track = WebsocketTrack::new(
1702 self.cancel_token.child_token(),
1703 self.session_id.clone(),
1704 self.track_config.clone(),
1705 self.event_sender.clone(),
1706 audio_receiver,
1707 codec,
1708 ssrc,
1709 );
1710
1711 {
1712 let mut call_state = self.call_state.write().await;
1713 call_state.answer_time = Some(Utc::now());
1714 call_state.answer = Some("".to_string());
1715 call_state.last_status_code = 200;
1716 }
1717
1718 Ok(Box::new(ws_track))
1719 }
1720
1721 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1722 let (ssrc, option) = {
1723 let call_state = self.call_state.read().await;
1724 (
1725 call_state.ssrc,
1726 call_state.option.clone().unwrap_or_default(),
1727 )
1728 };
1729
1730 let mut rtc_config = RtcTrackConfig::default();
1731 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1733
1734 if let Some(codecs) = &self.app_state.config.codecs {
1735 let mut codec_types = Vec::new();
1736 for c in codecs {
1737 match c.to_lowercase().as_str() {
1738 "pcmu" => codec_types.push(CodecType::PCMU),
1739 "pcma" => codec_types.push(CodecType::PCMA),
1740 "g722" => codec_types.push(CodecType::G722),
1741 "g729" => codec_types.push(CodecType::G729),
1742 "opus" => codec_types.push(CodecType::Opus),
1743 "dtmf" | "2833" | "telephone_event" => {
1744 codec_types.push(CodecType::TelephoneEvent)
1745 }
1746 _ => {}
1747 }
1748 }
1749 if !codec_types.is_empty() {
1750 rtc_config.preferred_codec = Some(codec_types[0].clone());
1751 rtc_config.codecs = codec_types;
1752 }
1753 }
1754
1755 if let Some(ref external_ip) = self.app_state.config.external_ip {
1756 rtc_config.external_ip = Some(external_ip.clone());
1757 }
1758
1759 let mut webrtc_track = RtcTrack::new(
1760 self.cancel_token.child_token(),
1761 self.session_id.clone(),
1762 self.track_config.clone(),
1763 rtc_config,
1764 )
1765 .with_ssrc(ssrc);
1766
1767 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1768 let offer = match option.enable_ipv6 {
1769 Some(false) | None => {
1770 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1771 }
1772 _ => option.offer.clone().unwrap_or("".to_string()),
1773 };
1774 let answer: Option<String>;
1775 match webrtc_track.handshake(offer, timeout).await {
1776 Ok(answer_sdp) => {
1777 answer = match option.enable_ipv6 {
1778 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1779 Some(true) => Some(answer_sdp.to_string()),
1780 };
1781 }
1782 Err(e) => {
1783 warn!(session_id = self.session_id, "failed to setup track: {}", e);
1784 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1785 }
1786 }
1787
1788 {
1789 let mut call_state = self.call_state.write().await;
1790 call_state.answer_time = Some(Utc::now());
1791 call_state.answer = answer;
1792 call_state.last_status_code = 200;
1793 }
1794 Ok(Box::new(webrtc_track))
1795 }
1796
1797 async fn create_outgoing_sip_track(
1798 &self,
1799 cancel_token: CancellationToken,
1800 call_state_ref: ActiveCallStateRef,
1801 track_id: &String,
1802 mut invite_option: InviteOption,
1803 call_option: &CallOption,
1804 moh: Option<String>,
1805 auto_hangup: bool,
1806 ) -> Result<String, rsipstack::Error> {
1807 let ssrc = call_state_ref.read().await.ssrc;
1808 let rtp_track = self
1809 .create_rtp_track(track_id.clone(), ssrc)
1810 .await
1811 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1812
1813 let offer = Some(
1814 rtp_track
1815 .local_description()
1816 .await
1817 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1818 );
1819
1820 {
1821 let mut cs = call_state_ref.write().await;
1822 if let Some(o) = cs.option.as_mut() {
1823 o.offer = offer.clone();
1824 }
1825 cs.start_time = Utc::now();
1826 };
1827
1828 invite_option.offer = offer.clone().map(|s| s.into());
1829
1830 let needs_contact = invite_option.contact.scheme.is_none()
1833 || invite_option
1834 .contact
1835 .host_with_port
1836 .to_string()
1837 .starts_with("127.0.0.1");
1838
1839 if needs_contact {
1840 if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1841 invite_option.contact = rsip::Uri {
1842 scheme: Some(rsip::Scheme::Sip),
1843 auth: None,
1844 host_with_port: addr.addr.clone(),
1845 params: vec![],
1846 headers: vec![],
1847 };
1848 }
1849 }
1850
1851 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1852
1853 if let Some(moh) = moh {
1854 let ssrc_and_moh = {
1855 let mut state = call_state_ref.write().await;
1856 state.moh = Some(moh.clone());
1857 if state.current_play_id.is_none() {
1858 let ssrc = rand::random::<u32>();
1859 Some((ssrc, moh.clone()))
1860 } else {
1861 info!(
1862 session_id = self.session_id,
1863 "Something is playing, MOH will start after it ends"
1864 );
1865 None
1866 }
1867 };
1868
1869 if let Some((ssrc, moh_path)) = ssrc_and_moh {
1870 let file_track = FileTrack::new(self.server_side_track_id.clone())
1871 .with_play_id(Some(moh_path.clone()))
1872 .with_ssrc(ssrc)
1873 .with_path(moh_path.clone())
1874 .with_cancel_token(self.cancel_token.child_token());
1875 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1876 .await;
1877 }
1878 } else {
1879 let track = rtp_track_to_setup.take().unwrap();
1880 self.setup_track_with_stream(&call_option, track)
1881 .await
1882 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1883 }
1884
1885 info!(
1886 session_id = self.session_id,
1887 track_id,
1888 contact = %invite_option.contact,
1889 "invite {} -> {} offer: \n{}",
1890 invite_option.caller,
1891 invite_option.callee,
1892 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1893 );
1894
1895 let (dlg_state_sender, dlg_state_receiver) =
1896 self.invitation.dialog_layer.new_dialog_state_channel();
1897
1898 let states = InviteDialogStates {
1899 is_client: true,
1900 session_id: self.session_id.clone(),
1901 track_id: track_id.clone(),
1902 event_sender: self.event_sender.clone(),
1903 media_stream: self.media_stream.clone(),
1904 call_state: call_state_ref.clone(),
1905 cancel_token,
1906 terminated_reason: None,
1907 has_early_media: false,
1908 };
1909
1910 let mut client_dialog_handler =
1911 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1912
1913 crate::spawn(async move {
1914 client_dialog_handler.process_dialog(states).await;
1915 });
1916
1917 let (dialog_id, answer) = self
1918 .invitation
1919 .invite(invite_option, dlg_state_sender)
1920 .await?;
1921
1922 self.call_state.write().await.moh = None;
1923
1924 if let Some(track) = rtp_track_to_setup {
1925 self.setup_track_with_stream(&call_option, track)
1926 .await
1927 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1928 }
1929
1930 let answer = match answer {
1931 Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1932 None => {
1933 warn!(session_id = self.session_id, "no answer received");
1934 return Err(rsipstack::Error::DialogError(
1935 "No answer received".to_string(),
1936 dialog_id,
1937 rsip::StatusCode::NotAcceptableHere,
1938 ));
1939 }
1940 };
1941
1942 {
1943 let mut cs = call_state_ref.write().await;
1944 if cs.answer.is_none() {
1945 cs.answer = Some(answer.clone());
1946 }
1947 if auto_hangup {
1948 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1949 }
1950 }
1951
1952 self.media_stream
1953 .update_remote_description(&track_id, &answer)
1954 .await
1955 .ok();
1956
1957 Ok(answer)
1958 }
1959
1960 pub fn is_webrtc_sdp(sdp: &str) -> bool {
1962 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1963 && sdp.contains("a=fingerprint:")
1964 }
1965
1966 pub async fn setup_answer_track(
1967 &self,
1968 ssrc: u32,
1969 option: &CallOption,
1970 offer: String,
1971 ) -> Result<(String, Box<dyn Track>)> {
1972 let offer = match option.enable_ipv6 {
1973 Some(false) | None => strip_ipv6_candidates(&offer),
1974 _ => offer.clone(),
1975 };
1976
1977 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1978
1979 let mut media_track = if Self::is_webrtc_sdp(&offer) {
1980 let mut rtc_config = RtcTrackConfig::default();
1981 rtc_config.mode = rustrtc::TransportMode::WebRtc;
1982 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1983 if let Some(ref external_ip) = self.app_state.config.external_ip {
1984 rtc_config.external_ip = Some(external_ip.clone());
1985 }
1986 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1987
1988 let webrtc_track = RtcTrack::new(
1989 self.cancel_token.child_token(),
1990 self.session_id.clone(),
1991 self.track_config.clone(),
1992 rtc_config,
1993 )
1994 .with_ssrc(ssrc);
1995
1996 Box::new(webrtc_track) as Box<dyn Track>
1997 } else {
1998 let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1999 Box::new(rtp_track) as Box<dyn Track>
2000 };
2001
2002 let answer = match media_track.handshake(offer.clone(), timeout).await {
2003 Ok(answer) => answer,
2004 Err(e) => {
2005 return Err(anyhow::anyhow!("handshake failed: {e}"));
2006 }
2007 };
2008
2009 return Ok((answer, media_track));
2010 }
2011
2012 pub async fn prepare_incoming_sip_track(
2013 &self,
2014 cancel_token: CancellationToken,
2015 call_state_ref: ActiveCallStateRef,
2016 track_id: &String,
2017 pending_dialog: PendingDialog,
2018 ) -> Result<()> {
2019 let state_receiver = pending_dialog.state_receiver;
2020 let states = InviteDialogStates {
2023 is_client: false,
2024 session_id: self.session_id.clone(),
2025 track_id: track_id.clone(),
2026 event_sender: self.event_sender.clone(),
2027 media_stream: self.media_stream.clone(),
2028 call_state: self.call_state.clone(),
2029 cancel_token,
2030 terminated_reason: None,
2031 has_early_media: false,
2032 };
2033
2034 let initial_request = pending_dialog.dialog.initial_request();
2035 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2036
2037 let (ssrc, option) = {
2038 let call_state = call_state_ref.read().await;
2039 (
2040 call_state.ssrc,
2041 call_state.option.clone().unwrap_or_default(),
2042 )
2043 };
2044
2045 match self.setup_answer_track(ssrc, &option, offer).await {
2046 Ok((offer, track)) => {
2047 self.setup_track_with_stream(&option, track).await?;
2048 {
2049 let mut state = self.call_state.write().await;
2050 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2051 }
2052 }
2053 Err(e) => {
2054 return Err(anyhow::anyhow!("error creating track: {}", e));
2055 }
2056 }
2057
2058 let mut client_dialog_handler =
2059 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
2060
2061 crate::spawn(async move {
2062 client_dialog_handler.process_dialog(states).await;
2063 });
2064 Ok(())
2065 }
2066}
2067
2068impl Drop for ActiveCall {
2069 fn drop(&mut self) {
2070 info!(session_id = self.session_id, "dropping active call");
2071 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2072 if let Some(record) = self.get_callrecord() {
2073 if let Err(e) = sender.send(record) {
2074 warn!(
2075 session_id = self.session_id,
2076 "failed to send call record: {}", e
2077 );
2078 }
2079 }
2080 }
2081 }
2082}
2083
2084impl ActiveCallState {
2085 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2086 if let Some(existing) = &self.option {
2087 if option.asr.is_none() {
2088 option.asr = existing.asr.clone();
2089 }
2090 if option.tts.is_none() {
2091 option.tts = existing.tts.clone();
2092 }
2093 if option.vad.is_none() {
2094 option.vad = existing.vad.clone();
2095 }
2096 if option.denoise.is_none() {
2097 option.denoise = existing.denoise;
2098 }
2099 if option.recorder.is_none() {
2100 option.recorder = existing.recorder.clone();
2101 }
2102 if option.eou.is_none() {
2103 option.eou = existing.eou.clone();
2104 }
2105 if option.extra.is_none() {
2106 option.extra = existing.extra.clone();
2107 }
2108 if option.ambiance.is_none() {
2109 option.ambiance = existing.ambiance.clone();
2110 }
2111 }
2112 option
2113 }
2114
2115 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2116 if self.hangup_reason.is_none() {
2117 self.hangup_reason = Some(reason);
2118 }
2119 }
2120
2121 pub fn build_hangup_event(
2122 &self,
2123 track_id: TrackId,
2124 initiator: Option<String>,
2125 ) -> crate::event::SessionEvent {
2126 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2127 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2128 let extra = self.extras.clone();
2129
2130 crate::event::SessionEvent::Hangup {
2131 track_id,
2132 timestamp: crate::media::get_timestamp(),
2133 reason: Some(format!("{:?}", self.hangup_reason)),
2134 initiator,
2135 start_time: self.start_time.to_rfc3339(),
2136 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2137 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2138 hangup_time: Utc::now().to_rfc3339(),
2139 extra,
2140 from: from.map(|f| f.into()),
2141 to: to.map(|f| f.into()),
2142 refer: Some(self.is_refer),
2143 }
2144 }
2145
2146 pub fn build_callrecord(
2147 &self,
2148 app_state: AppState,
2149 session_id: String,
2150 call_type: ActiveCallType,
2151 ) -> CallRecord {
2152 let option = self.option.clone().unwrap_or_default();
2153 let recorder = if option.recorder.is_some() {
2154 let recorder_file = app_state.get_recorder_file(&session_id);
2155 if std::path::Path::new(&recorder_file).exists() {
2156 let file_size = std::fs::metadata(&recorder_file)
2157 .map(|m| m.len())
2158 .unwrap_or(0);
2159 vec![crate::callrecord::CallRecordMedia {
2160 track_id: session_id.clone(),
2161 path: recorder_file,
2162 size: file_size,
2163 extra: None,
2164 }]
2165 } else {
2166 vec![]
2167 }
2168 } else {
2169 vec![]
2170 };
2171
2172 let dump_event_file = app_state.get_dump_events_file(&session_id);
2173 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2174 Some(dump_event_file)
2175 } else {
2176 None
2177 };
2178
2179 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2180 if let Ok(rc) = rc.try_read() {
2181 Some(Box::new(rc.build_callrecord(
2182 app_state.clone(),
2183 rc.session_id.clone(),
2184 ActiveCallType::B2bua,
2185 )))
2186 } else {
2187 None
2188 }
2189 });
2190
2191 let caller = option.caller.clone().unwrap_or_default();
2192 let callee = option.callee.clone().unwrap_or_default();
2193
2194 CallRecord {
2195 option: Some(option),
2196 call_id: session_id,
2197 call_type,
2198 start_time: self.start_time,
2199 ring_time: self.ring_time.clone(),
2200 answer_time: self.answer_time.clone(),
2201 end_time: Utc::now(),
2202 caller,
2203 callee,
2204 hangup_reason: self.hangup_reason.clone(),
2205 hangup_messages: Vec::new(),
2206 status_code: self.last_status_code,
2207 extras: self.extras.clone(),
2208 dump_event_file,
2209 recorder,
2210 refer_callrecord,
2211 }
2212 }
2213}