1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 recorder::RecorderOption,
11 stream::{MediaStream, MediaStreamBuilder},
12 track::{
13 Track, TrackConfig,
14 file::FileTrack,
15 media_pass::MediaPassTrack,
16 rtc::{RtcTrack, RtcTrackConfig},
17 tts::SynthesisHandle,
18 websocket::{WebsocketBytesReceiver, WebsocketTrack},
19 },
20 },
21 synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24 app::AppState,
25 call::{
26 CommandReceiver, CommandSender,
27 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28 },
29 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30 useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[cfg(test)]
43mod tests {
44 use super::*;
45 use crate::app::AppStateBuilder;
46 use crate::callrecord::CallRecordHangupReason;
47 use crate::config::Config;
48 use crate::media::track::tts::SynthesisHandle;
49 use crate::synthesis::SynthesisCommand;
50 use tokio::sync::mpsc;
51
52 #[tokio::test]
53 async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
54 let mut config = Config::default();
55 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
57 let stream_engine = Arc::new(StreamEngine::default());
58 let app_state = AppStateBuilder::new()
59 .with_config(config)
60 .with_stream_engine(stream_engine)
61 .build()
62 .await?;
63
64 let cancel_token = CancellationToken::new();
65 let session_id = "test-session".to_string();
66 let track_config = TrackConfig::default();
67
68 let mut option = crate::CallOption::default();
69 option.tts = Some(crate::synthesis::SynthesisOption::default());
70
71 let active_call = Arc::new(ActiveCall::new(
72 ActiveCallType::Sip,
73 cancel_token.clone(),
74 session_id.clone(),
75 app_state.invitation.clone(),
76 app_state.clone(),
77 track_config,
78 None,
79 false,
80 None,
81 None,
82 ));
83
84 {
85 let mut state = active_call.call_state.write().await;
86 state.option = Some(option);
87 }
88
89 let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
90 let initial_ssrc = 12345;
91 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
92
93 {
95 let mut state = active_call.call_state.write().await;
96 state.tts_handle = Some(handle);
97 state.current_play_id = Some("play_1".to_string());
98 }
99
100 active_call
102 .do_tts(
103 "hangup now".to_string(),
104 None,
105 Some("play_1".to_string()),
106 Some(true),
107 false,
108 true,
109 None,
110 None,
111 false,
112 )
113 .await?;
114
115 {
117 let state = active_call.call_state.read().await;
118 assert!(state.auto_hangup.is_some());
119 let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
120 assert_eq!(
121 h_ssrc, initial_ssrc,
122 "SSRC should be reused from existing handle"
123 );
124 assert_eq!(reason, CallRecordHangupReason::BySystem);
125 }
126
127 let cmd = rx.try_recv().expect("Should have received tts command");
129 assert_eq!(cmd.text, "hangup now");
130
131 Ok(())
132 }
133
134 #[tokio::test]
135 async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
136 let mut config = Config::default();
137 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
139 let stream_engine = Arc::new(StreamEngine::default());
140 let app_state = AppStateBuilder::new()
141 .with_config(config)
142 .with_stream_engine(stream_engine)
143 .build()
144 .await?;
145
146 let active_call = Arc::new(ActiveCall::new(
147 ActiveCallType::Sip,
148 CancellationToken::new(),
149 "test-session".to_string(),
150 app_state.invitation.clone(),
151 app_state.clone(),
152 TrackConfig::default(),
153 None,
154 false,
155 None,
156 None,
157 ));
158
159 let mut tts_opt = crate::synthesis::SynthesisOption::default();
160 tts_opt.provider = Some(crate::synthesis::SynthesisType::MsEdge);
161 let mut option = crate::CallOption::default();
162 option.tts = Some(tts_opt);
163 {
164 let mut state = active_call.call_state.write().await;
165 state.option = Some(option);
166 }
167
168 let (tx, _rx) = mpsc::unbounded_channel();
169 let initial_ssrc = 111;
170 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
171
172 {
173 let mut state = active_call.call_state.write().await;
174 state.tts_handle = Some(handle);
175 state.current_play_id = Some("play_1".to_string());
176 }
177
178 active_call
180 .do_tts(
181 "new play".to_string(),
182 None,
183 Some("play_2".to_string()),
184 Some(true),
185 false,
186 true,
187 None,
188 None,
189 false,
190 )
191 .await?;
192
193 {
195 let state = active_call.call_state.read().await;
196 let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
197 assert_ne!(
198 h_ssrc, initial_ssrc,
199 "Should use a new SSRC for different play_id"
200 );
201 }
202
203 Ok(())
204 }
205}
206
207#[derive(Deserialize)]
208#[serde(rename_all = "camelCase")]
209pub struct CallParams {
210 pub id: Option<String>,
211 #[serde(rename = "dump")]
212 pub dump_events: Option<bool>,
213 #[serde(rename = "ping")]
214 pub ping_interval: Option<u32>,
215 pub server_side_track: Option<String>,
216}
217
218#[derive(Debug, Serialize, Deserialize, Clone, Default)]
219#[serde(rename_all = "camelCase")]
220pub enum ActiveCallType {
221 Webrtc,
222 B2bua,
223 WebSocket,
224 #[default]
225 Sip,
226}
227
228#[derive(Default)]
229pub struct ActiveCallState {
230 pub session_id: String,
231 pub start_time: DateTime<Utc>,
232 pub ring_time: Option<DateTime<Utc>>,
233 pub answer_time: Option<DateTime<Utc>>,
234 pub hangup_reason: Option<CallRecordHangupReason>,
235 pub last_status_code: u16,
236 pub option: Option<CallOption>,
237 pub answer: Option<String>,
238 pub ssrc: u32,
239 pub refer_callstate: Option<ActiveCallStateRef>,
240 pub extras: Option<HashMap<String, serde_json::Value>>,
241 pub is_refer: bool,
242
243 pub tts_handle: Option<SynthesisHandle>,
245 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
246 pub wait_input_timeout: Option<u32>,
247 pub moh: Option<String>,
248 pub current_play_id: Option<String>,
249 pub audio_receiver: Option<WebsocketBytesReceiver>,
250 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
251}
252
253pub type ActiveCallRef = Arc<ActiveCall>;
254pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
255
256pub struct ActiveCall {
257 pub call_state: ActiveCallStateRef,
258 pub cancel_token: CancellationToken,
259 pub call_type: ActiveCallType,
260 pub session_id: String,
261 pub media_stream: Arc<MediaStream>,
262 pub track_config: TrackConfig,
263 pub event_sender: EventSender,
264 pub app_state: AppState,
265 pub invitation: Invitation,
266 pub cmd_sender: CommandSender,
267 pub dump_events: bool,
268 pub server_side_track_id: TrackId,
269}
270
271pub struct ActiveCallGuard {
272 pub call: ActiveCallRef,
273 pub active_calls: usize,
274}
275
276impl ActiveCallGuard {
277 pub fn new(call: ActiveCallRef) -> Self {
278 let active_calls = {
279 call.app_state
280 .total_calls
281 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
282 let mut calls = call.app_state.active_calls.lock().unwrap();
283 calls.insert(call.session_id.clone(), call.clone());
284 calls.len()
285 };
286 Self { call, active_calls }
287 }
288}
289
290impl Drop for ActiveCallGuard {
291 fn drop(&mut self) {
292 self.call
293 .app_state
294 .active_calls
295 .lock()
296 .unwrap()
297 .remove(&self.call.session_id);
298 }
299}
300
301pub struct ActiveCallReceiver {
302 pub cmd_receiver: CommandReceiver,
303 pub dump_cmd_receiver: CommandReceiver,
304 pub dump_event_receiver: EventReceiver,
305}
306
307impl ActiveCall {
308 pub fn new(
309 call_type: ActiveCallType,
310 cancel_token: CancellationToken,
311 session_id: String,
312 invitation: Invitation,
313 app_state: AppState,
314 track_config: TrackConfig,
315 audio_receiver: Option<WebsocketBytesReceiver>,
316 dump_events: bool,
317 server_side_track_id: Option<TrackId>,
318 extras: Option<HashMap<String, serde_json::Value>>,
319 ) -> Self {
320 let event_sender = crate::event::create_event_sender();
321 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
322 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
323 .with_id(session_id.clone())
324 .with_cancel_token(cancel_token.child_token());
325 let media_stream = Arc::new(media_stream_builder.build());
326 let call_state = Arc::new(RwLock::new(ActiveCallState {
327 session_id: session_id.clone(),
328 start_time: Utc::now(),
329 ssrc: rand::random::<u32>(),
330 extras,
331 audio_receiver,
332 ..Default::default()
333 }));
334 Self {
335 cancel_token,
336 call_type,
337 session_id,
338 call_state,
339 media_stream,
340 track_config,
341 event_sender,
342 app_state,
343 invitation,
344 cmd_sender,
345 dump_events,
346 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
347 }
348 }
349
350 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
351 self.cmd_sender
352 .send(command)
353 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
354 Ok(())
355 }
356
357 pub fn new_receiver(&self) -> ActiveCallReceiver {
361 ActiveCallReceiver {
362 cmd_receiver: self.cmd_sender.subscribe(),
363 dump_cmd_receiver: self.cmd_sender.subscribe(),
364 dump_event_receiver: self.event_sender.subscribe(),
365 }
366 }
367
368 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
369 let ActiveCallReceiver {
370 mut cmd_receiver,
371 dump_cmd_receiver,
372 dump_event_receiver,
373 } = receiver;
374
375 let process_command_loop = async move {
376 while let Ok(command) = cmd_receiver.recv().await {
377 match self.dispatch(command).await {
378 Ok(_) => (),
379 Err(e) => {
380 warn!(session_id = self.session_id, "{}", e);
381 self.event_sender
382 .send(SessionEvent::Error {
383 track_id: self.session_id.clone(),
384 timestamp: crate::media::get_timestamp(),
385 sender: "command".to_string(),
386 error: e.to_string(),
387 code: None,
388 })
389 .ok();
390 }
391 }
392 }
393 };
394 self.app_state
395 .total_calls
396 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
397
398 tokio::join!(
399 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
400 async {
401 select! {
402 _ = process_command_loop => {
403 info!(session_id = self.session_id, "command loop done");
404 }
405 _ = self.process() => {
406 info!(session_id = self.session_id, "call serve done");
407 }
408 _ = self.cancel_token.cancelled() => {
409 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
410 }
411 }
412 self.cancel_token.cancel();
413 }
414 );
415 Ok(())
416 }
417
418 async fn process(&self) -> Result<()> {
419 let mut event_receiver = self.event_sender.subscribe();
420
421 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
422 let input_timeout_expire_ref = input_timeout_expire.clone();
423 let event_sender = self.event_sender.clone();
424 let wait_input_timeout_loop = async {
425 loop {
426 let (start_time, expire) = { *input_timeout_expire.lock().await };
427 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
428 info!(session_id = self.session_id, "wait input timeout reached");
429 *input_timeout_expire.lock().await = (0, 0);
430 event_sender
431 .send(SessionEvent::Silence {
432 track_id: self.server_side_track_id.clone(),
433 timestamp: crate::media::get_timestamp(),
434 start_time,
435 duration: expire as u64,
436 samples: None,
437 })
438 .ok();
439 }
440 sleep(Duration::from_millis(100)).await;
441 }
442 };
443 let server_side_track_id = self.server_side_track_id.clone();
444 let event_hook_loop = async move {
445 while let Ok(event) = event_receiver.recv().await {
446 match event {
447 SessionEvent::Speaking { .. }
448 | SessionEvent::Dtmf { .. }
449 | SessionEvent::AsrDelta { .. }
450 | SessionEvent::AsrFinal { .. }
451 | SessionEvent::TrackStart { .. } => {
452 *input_timeout_expire_ref.lock().await = (0, 0);
453 }
454 SessionEvent::TrackEnd {
455 track_id,
456 play_id,
457 ssrc,
458 ..
459 } => {
460 if track_id != server_side_track_id {
461 continue;
462 }
463
464 let (moh_path, auto_hangup, wait_timeout_val) = {
465 let mut state = self.call_state.write().await;
466 if play_id != state.current_play_id {
467 debug!(
468 session_id = self.session_id,
469 ?play_id,
470 current = ?state.current_play_id,
471 "ignoring interrupted track end"
472 );
473 continue;
474 }
475 state.current_play_id = None;
476 (
477 state.moh.clone(),
478 state.auto_hangup.clone(),
479 state.wait_input_timeout.take(),
480 )
481 };
482
483 if let Some(path) = moh_path {
484 info!(session_id = self.session_id, "looping moh: {}", path);
485 let ssrc = rand::random::<u32>();
486 let file_track = FileTrack::new(self.server_side_track_id.clone())
487 .with_play_id(Some(path.clone()))
488 .with_ssrc(ssrc)
489 .with_path(path.clone())
490 .with_cancel_token(self.cancel_token.child_token());
491 self.update_track_wrapper(Box::new(file_track), Some(path))
492 .await;
493 continue;
494 }
495
496 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
497 if hangup_ssrc == ssrc {
498 info!(
499 session_id = self.session_id,
500 ssrc, "auto hangup when track end track_id:{}", track_id
501 );
502 self.do_hangup(Some(hangup_reason), None).await.ok();
503 }
504 }
505
506 if let Some(timeout) = wait_timeout_val {
507 let expire = if timeout > 0 {
508 (crate::media::get_timestamp(), timeout)
509 } else {
510 (0, 0)
511 };
512 *input_timeout_expire_ref.lock().await = expire;
513 }
514 }
515 SessionEvent::Interrupt { receiver } => {
516 let track_id =
517 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
518 if track_id == self.server_side_track_id {
519 debug!(
520 session_id = self.session_id,
521 "received interrupt event, stopping playback"
522 );
523 self.do_interrupt(true).await.ok();
524 }
525 }
526 SessionEvent::Inactivity { track_id, .. } => {
527 info!(
528 session_id = self.session_id,
529 track_id, "inactivity timeout reached, hanging up"
530 );
531 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
532 .await
533 .ok();
534 }
535 SessionEvent::Error { track_id, .. } => {
536 if track_id != server_side_track_id {
537 continue;
538 }
539
540 let moh_info = {
541 let mut state = self.call_state.write().await;
542 if let Some(path) = state.moh.clone() {
543 let fallback = "./config/sounds/refer_moh.wav".to_string();
544 let next_path = if path != fallback
545 && std::path::Path::new(&fallback).exists()
546 {
547 info!(
548 session_id = self.session_id,
549 "moh error, switching to fallback: {}", fallback
550 );
551 state.moh = Some(fallback.clone());
552 fallback
553 } else {
554 info!(
555 session_id = self.session_id,
556 "looping moh on error: {}", path
557 );
558 path
559 };
560 Some(next_path)
561 } else {
562 None
563 }
564 };
565
566 if let Some(next_path) = moh_info {
567 let ssrc = rand::random::<u32>();
568 let file_track = FileTrack::new(self.server_side_track_id.clone())
569 .with_play_id(Some(next_path.clone()))
570 .with_ssrc(ssrc)
571 .with_path(next_path.clone())
572 .with_cancel_token(self.cancel_token.child_token());
573 self.update_track_wrapper(Box::new(file_track), Some(next_path))
574 .await;
575 continue;
576 }
577 }
578 _ => {}
579 }
580 }
581 };
582
583 select! {
584 _ = wait_input_timeout_loop=>{
585 info!(session_id = self.session_id, "wait input timeout loop done");
586 }
587 _ = self.media_stream.serve() => {
588 info!(session_id = self.session_id, "media stream loop done");
589 }
590 _ = event_hook_loop => {
591 info!(session_id = self.session_id, "event loop done");
592 }
593 }
594 Ok(())
595 }
596
597 async fn dispatch(&self, command: Command) -> Result<()> {
598 match command {
599 Command::Invite { option } => self.do_invite(option).await,
600 Command::Accept { option } => self.do_accept(option).await,
601 Command::Reject { reason, code } => {
602 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
603 .await
604 }
605 Command::Ringing {
606 ringtone,
607 recorder,
608 early_media,
609 } => self.do_ringing(ringtone, recorder, early_media).await,
610 Command::Tts {
611 text,
612 speaker,
613 play_id,
614 auto_hangup,
615 streaming,
616 end_of_stream,
617 option,
618 wait_input_timeout,
619 base64,
620 } => {
621 self.do_tts(
622 text,
623 speaker,
624 play_id,
625 auto_hangup,
626 streaming.unwrap_or_default(),
627 end_of_stream.unwrap_or_default(),
628 option,
629 wait_input_timeout,
630 base64.unwrap_or_default(),
631 )
632 .await
633 }
634 Command::Play {
635 url,
636 play_id,
637 auto_hangup,
638 wait_input_timeout,
639 } => {
640 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
641 .await
642 }
643 Command::Hangup { reason, initiator } => {
644 let reason = reason.map(|r| {
645 r.parse::<CallRecordHangupReason>()
646 .unwrap_or(CallRecordHangupReason::BySystem)
647 });
648 self.do_hangup(reason, initiator).await
649 }
650 Command::Refer {
651 caller,
652 callee,
653 options,
654 } => self.do_refer(caller, callee, options).await,
655 Command::Mute { track_id } => self.do_mute(track_id).await,
656 Command::Unmute { track_id } => self.do_unmute(track_id).await,
657 Command::Pause {} => self.do_pause().await,
658 Command::Resume {} => self.do_resume().await,
659 Command::Interrupt {
660 graceful: passage,
661 fade_out_ms: _,
662 } => self.do_interrupt(passage.unwrap_or_default()).await,
663 Command::History { speaker, text } => self.do_history(speaker, text).await,
664 }
665 }
666
667 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
668 if let Some(recorder_option) = &option.recorder {
669 let mut recorder_file = recorder_option.recorder_file.clone();
670 if recorder_file.contains("{id}") {
671 recorder_file = recorder_file.replace("{id}", &self.session_id);
672 }
673
674 let recorder_file = if recorder_file.is_empty() {
675 self.app_state.get_recorder_file(&self.session_id)
676 } else {
677 let p = Path::new(&recorder_file);
678 p.is_absolute()
679 .then(|| recorder_file.clone())
680 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
681 };
682 info!(
683 session_id = self.session_id,
684 recorder_file, "created recording file"
685 );
686
687 let track_samplerate = self.track_config.samplerate;
688 let recorder_samplerate = if track_samplerate > 0 {
689 track_samplerate
690 } else {
691 recorder_option.samplerate
692 };
693 let recorder_ptime = if recorder_option.ptime == 0 {
694 200
695 } else {
696 recorder_option.ptime
697 };
698 let requested_format = recorder_option
699 .format
700 .unwrap_or(self.app_state.config.recorder_format());
701 let format = requested_format.effective();
702 if requested_format != format {
703 warn!(
704 session_id = self.session_id,
705 requested = requested_format.extension(),
706 "Recorder format fallback to wav due to unsupported feature"
707 );
708 }
709 let mut recorder_config = RecorderOption {
710 recorder_file,
711 samplerate: recorder_samplerate,
712 ptime: recorder_ptime,
713 format: Some(format),
714 };
715 recorder_config.ensure_path_extension(format);
716 Some(recorder_config)
717 } else {
718 None
719 }
720 }
721
722 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
723 {
725 let state = self.call_state.read().await;
726 option = state.merge_option(option);
727 }
728
729 option.check_default();
730 if let Some(opt) = self.build_record_option(&option) {
731 self.media_stream.update_recorder_option(opt).await;
732 }
733
734 if let Some(opt) = &option.media_pass {
735 let track_id = self.server_side_track_id.clone();
736 let cancel_token = self.cancel_token.child_token();
737 let ssrc = rand::random::<u32>();
738 let media_pass_track = MediaPassTrack::new(
739 self.session_id.clone(),
740 ssrc,
741 track_id,
742 cancel_token,
743 opt.clone(),
744 );
745 self.update_track_wrapper(Box::new(media_pass_track), None)
746 .await;
747 }
748
749 info!(
750 session_id = self.session_id,
751 call_type = ?self.call_type,
752 sender,
753 ?option,
754 "caller with option"
755 );
756
757 match self.setup_caller_track(&option).await {
758 Ok(_) => return Ok(option),
759 Err(e) => {
760 self.app_state
761 .total_failed_calls
762 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
763 let error_event = crate::event::SessionEvent::Error {
764 track_id: self.session_id.clone(),
765 timestamp: crate::media::get_timestamp(),
766 sender,
767 error: e.to_string(),
768 code: None,
769 };
770 self.event_sender.send(error_event).ok();
771 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
772 .await
773 .ok();
774 return Err(e);
775 }
776 }
777 }
778
779 async fn do_invite(&self, option: CallOption) -> Result<()> {
780 self.invite_or_accept(option, "invite".to_string())
781 .await
782 .map(|_| ())
783 }
784
785 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
786 let has_pending = self
787 .invitation
788 .find_dialog_id_by_session_id(&self.session_id)
789 .is_some();
790 let ready_to_answer_val = {
791 let state = self.call_state.read().await;
792 state.ready_to_answer.is_none()
793 };
794
795 if ready_to_answer_val {
796 if !has_pending {
797 warn!(session_id = self.session_id, "no pending call to accept");
799 let rejet_event = crate::event::SessionEvent::Reject {
800 track_id: self.session_id.clone(),
801 timestamp: crate::media::get_timestamp(),
802 reason: "no pending call".to_string(),
803 refer: None,
804 code: Some(486),
805 };
806 self.event_sender.send(rejet_event).ok();
807 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
808 .await
809 .ok();
810 return Err(anyhow::anyhow!("no pending call to accept"));
811 }
812 option = self.invite_or_accept(option, "accept".to_string()).await?;
813 } else {
814 option.check_default();
815 self.call_state.write().await.option = Some(option.clone());
816 }
817 info!(session_id = self.session_id, ?option, "accepting call");
818 let ready = self.call_state.write().await.ready_to_answer.take();
819 if let Some((answer, track, dialog)) = ready {
820 info!(
821 session_id = self.session_id,
822 track_id = track.as_ref().map(|t| t.id()),
823 "ready to answer with track"
824 );
825
826 let headers = vec![rsip::Header::ContentType(
827 "application/sdp".to_string().into(),
828 )];
829
830 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
831 Ok(_) => {
832 {
833 let mut state = self.call_state.write().await;
834 state.answer = Some(answer);
835 state.answer_time = Some(Utc::now());
836 }
837 self.finish_caller_stack(&option, track).await?;
838 }
839 Err(e) => {
840 warn!(session_id = self.session_id, "failed to accept call: {}", e);
841 return Err(anyhow::anyhow!("failed to accept call"));
842 }
843 }
844 }
845 return Ok(());
846 }
847
848 async fn do_reject(
849 &self,
850 code: Option<rsip::StatusCode>,
851 reason: Option<String>,
852 ) -> Result<()> {
853 match self
854 .invitation
855 .find_dialog_id_by_session_id(&self.session_id)
856 {
857 Some(id) => {
858 info!(
859 session_id = self.session_id,
860 ?reason,
861 ?code,
862 "rejecting call"
863 );
864 self.invitation.hangup(id, code, reason).await
865 }
866 None => Ok(()),
867 }
868 }
869
870 async fn do_ringing(
871 &self,
872 ringtone: Option<String>,
873 recorder: Option<RecorderOption>,
874 early_media: Option<bool>,
875 ) -> Result<()> {
876 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
877 if ready_to_answer_val {
878 let option = CallOption {
879 recorder,
880 ..Default::default()
881 };
882 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
883 }
884
885 let state = self.call_state.read().await;
886 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
887 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
888 let headers = vec![rsip::Header::ContentType(
889 "application/sdp".to_string().into(),
890 )];
891 (Some(headers), Some(answer.as_bytes().to_vec()))
892 } else {
893 (None, None)
894 };
895
896 dialog.ringing(headers, body).ok();
897 info!(
898 session_id = self.session_id,
899 ringtone, early_media, "playing ringtone"
900 );
901 if let Some(ringtone_url) = ringtone {
902 drop(state);
903 self.do_play(ringtone_url, None, None, None).await.ok();
904 } else {
905 info!(session_id = self.session_id, "no ringtone to play");
906 }
907 }
908 Ok(())
909 }
910
911 async fn do_tts(
912 &self,
913 text: String,
914 speaker: Option<String>,
915 play_id: Option<String>,
916 auto_hangup: Option<bool>,
917 streaming: bool,
918 end_of_stream: bool,
919 option: Option<SynthesisOption>,
920 wait_input_timeout: Option<u32>,
921 base64: bool,
922 ) -> Result<()> {
923 let tts_option = {
924 let call_state = self.call_state.read().await;
925 match call_state.option.clone().unwrap_or_default().tts {
926 Some(opt) => opt.merge_with(option),
927 None => {
928 if let Some(opt) = option {
929 opt
930 } else {
931 return Err(anyhow::anyhow!("no tts option available"));
932 }
933 }
934 }
935 };
936 let speaker = match speaker {
937 Some(s) => Some(s),
938 None => tts_option.speaker.clone(),
939 };
940
941 let mut play_command = SynthesisCommand {
942 text,
943 speaker,
944 play_id: play_id.clone(),
945 streaming,
946 end_of_stream,
947 option: tts_option,
948 base64,
949 };
950 info!(
951 session_id = self.session_id,
952 provider = ?play_command.option.provider,
953 text = %play_command.text.chars().take(10).collect::<String>(),
954 speaker = play_command.speaker.as_deref(),
955 auto_hangup = auto_hangup.unwrap_or_default(),
956 play_id = play_command.play_id.as_deref(),
957 streaming = play_command.streaming,
958 end_of_stream = play_command.end_of_stream,
959 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
960 is_base64 = play_command.base64,
961 "new synthesis"
962 );
963
964 let ssrc = rand::random::<u32>();
965 let (should_interrupt, picked_ssrc) = {
966 let mut state = self.call_state.write().await;
967
968 let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
969 if play_id.is_some() && state.current_play_id != play_id {
970 (ssrc, true)
971 } else {
972 (handle.ssrc, false)
973 }
974 } else {
975 (ssrc, false)
976 };
977
978 state.auto_hangup = match auto_hangup {
979 Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
980 _ => state.auto_hangup.clone(),
981 };
982 state.wait_input_timeout = wait_input_timeout;
983
984 state.current_play_id = play_id.clone();
985 (changed, target_ssrc)
986 };
987
988 if should_interrupt {
989 let _ = self.do_interrupt(false).await;
990 }
991
992 let existing_handle = self.call_state.read().await.tts_handle.clone();
993 if let Some(tts_handle) = existing_handle {
994 match tts_handle.try_send(play_command) {
995 Ok(_) => return Ok(()),
996 Err(e) => {
997 play_command = e.0;
998 }
999 }
1000 }
1001
1002 let (new_handle, tts_track) = StreamEngine::create_tts_track(
1003 self.app_state.stream_engine.clone(),
1004 self.cancel_token.child_token(),
1005 self.session_id.clone(),
1006 self.server_side_track_id.clone(),
1007 picked_ssrc,
1008 play_id.clone(),
1009 streaming,
1010 &play_command.option,
1011 )
1012 .await?;
1013
1014 new_handle.try_send(play_command)?;
1015 self.call_state.write().await.tts_handle = Some(new_handle);
1016 self.update_track_wrapper(tts_track, play_id).await;
1017 Ok(())
1018 }
1019
1020 async fn do_play(
1021 &self,
1022 url: String,
1023 play_id: Option<String>,
1024 auto_hangup: Option<bool>,
1025 wait_input_timeout: Option<u32>,
1026 ) -> Result<()> {
1027 let ssrc = rand::random::<u32>();
1028 info!(
1029 session_id = self.session_id,
1030 ssrc, url, play_id, auto_hangup, "play file track"
1031 );
1032
1033 let play_id = play_id.or(Some(url.clone()));
1034
1035 let file_track = FileTrack::new(self.server_side_track_id.clone())
1036 .with_play_id(play_id.clone())
1037 .with_ssrc(ssrc)
1038 .with_path(url)
1039 .with_cancel_token(self.cancel_token.child_token());
1040
1041 {
1042 let mut state = self.call_state.write().await;
1043 state.tts_handle = None;
1044 state.auto_hangup = match auto_hangup {
1045 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1046 _ => None,
1047 };
1048 state.wait_input_timeout = wait_input_timeout;
1049 }
1050
1051 self.update_track_wrapper(Box::new(file_track), play_id)
1052 .await;
1053 Ok(())
1054 }
1055
1056 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1057 self.event_sender
1058 .send(SessionEvent::AddHistory {
1059 sender: Some(self.session_id.clone()),
1060 timestamp: crate::media::get_timestamp(),
1061 speaker,
1062 text,
1063 })
1064 .map(|_| ())
1065 .map_err(Into::into)
1066 }
1067
1068 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1069 {
1070 let mut state = self.call_state.write().await;
1071 state.tts_handle = None;
1072 state.moh = None;
1073 }
1074 self.media_stream
1075 .remove_track(&self.server_side_track_id, graceful)
1076 .await;
1077 Ok(())
1078 }
1079 async fn do_pause(&self) -> Result<()> {
1080 Ok(())
1081 }
1082 async fn do_resume(&self) -> Result<()> {
1083 Ok(())
1084 }
1085 async fn do_hangup(
1086 &self,
1087 reason: Option<CallRecordHangupReason>,
1088 initiator: Option<String>,
1089 ) -> Result<()> {
1090 info!(
1091 session_id = self.session_id,
1092 ?reason,
1093 ?initiator,
1094 "do_hangup"
1095 );
1096
1097 let hangup_reason = match initiator.as_deref() {
1099 Some("caller") => CallRecordHangupReason::ByCaller,
1100 Some("callee") => CallRecordHangupReason::ByCallee,
1101 Some("system") => CallRecordHangupReason::Autohangup,
1102 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1103 };
1104
1105 self.media_stream
1106 .stop(Some(hangup_reason.to_string()), initiator);
1107
1108 self.call_state
1109 .write()
1110 .await
1111 .set_hangup_reason(hangup_reason);
1112 Ok(())
1113 }
1114
1115 async fn do_refer(
1116 &self,
1117 caller: String,
1118 callee: String,
1119 refer_option: Option<ReferOption>,
1120 ) -> Result<()> {
1121 self.do_interrupt(false).await.ok();
1122 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1123 if let Some(ref path) = moh {
1124 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1125 let fallback = "./config/sounds/refer_moh.wav";
1126 if std::path::Path::new(fallback).exists() {
1127 info!(
1128 session_id = self.session_id,
1129 "moh {} not found, using fallback {}", path, fallback
1130 );
1131 moh = Some(fallback.to_string());
1132 }
1133 }
1134 }
1135 let session_id = self.session_id.clone();
1136 let track_id = self.server_side_track_id.clone();
1137
1138 let recorder = {
1139 let cs = self.call_state.read().await;
1140 cs.option
1141 .as_ref()
1142 .map(|o| o.recorder.clone())
1143 .unwrap_or_default()
1144 };
1145
1146 let call_option = CallOption {
1147 caller: Some(caller),
1148 callee: Some(callee.clone()),
1149 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1150 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1151 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1152 recorder,
1153 ..Default::default()
1154 };
1155
1156 let mut invite_option = call_option.build_invite_option()?;
1157 invite_option.call_id = Some(self.session_id.clone());
1158
1159 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1160
1161 {
1162 let cs = self.call_state.read().await;
1163 if let Some(opt) = cs.option.as_ref() {
1164 if let Some(callee) = opt.callee.as_ref() {
1165 headers.push(rsip::Header::Other(
1166 "X-Referred-To".to_string(),
1167 callee.clone(),
1168 ));
1169 }
1170 if let Some(caller) = opt.caller.as_ref() {
1171 headers.push(rsip::Header::Other(
1172 "X-Referred-From".to_string(),
1173 caller.clone(),
1174 ));
1175 }
1176 }
1177 }
1178
1179 headers.push(rsip::Header::Other(
1180 "X-Referred-Id".to_string(),
1181 self.session_id.clone(),
1182 ));
1183
1184 let ssrc = rand::random::<u32>();
1185 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1186 start_time: Utc::now(),
1187 ssrc,
1188 option: Some(call_option.clone()),
1189 is_refer: true,
1190 ..Default::default()
1191 }));
1192
1193 {
1194 let mut cs = self.call_state.write().await;
1195 cs.refer_callstate.replace(refer_call_state.clone());
1196 }
1197
1198 let auto_hangup_requested = refer_option
1199 .as_ref()
1200 .and_then(|o| o.auto_hangup)
1201 .unwrap_or(true);
1202
1203 if auto_hangup_requested {
1204 self.call_state.write().await.auto_hangup =
1205 Some((ssrc, CallRecordHangupReason::ByRefer));
1206 } else {
1207 self.call_state.write().await.auto_hangup = None;
1208 }
1209
1210 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1211
1212 info!(
1213 session_id = self.session_id,
1214 ssrc,
1215 auto_hangup = auto_hangup_requested,
1216 callee,
1217 timeout_secs,
1218 "do_refer"
1219 );
1220
1221 let r = tokio::time::timeout(
1222 Duration::from_secs(timeout_secs as u64),
1223 self.create_outgoing_sip_track(
1224 self.cancel_token.child_token(),
1225 refer_call_state.clone(),
1226 &track_id,
1227 invite_option,
1228 &call_option,
1229 moh,
1230 auto_hangup_requested,
1231 ),
1232 )
1233 .await;
1234
1235 {
1236 self.call_state.write().await.moh = None;
1237 }
1238
1239 let result = match r {
1240 Ok(res) => res,
1241 Err(_) => {
1242 warn!(
1243 session_id = session_id,
1244 "refer sip track creation timed out after {} seconds", timeout_secs
1245 );
1246 self.event_sender
1247 .send(SessionEvent::Reject {
1248 track_id,
1249 timestamp: crate::media::get_timestamp(),
1250 reason: "Timeout when refer".into(),
1251 code: Some(408),
1252 refer: Some(true),
1253 })
1254 .ok();
1255 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1256 }
1257 };
1258
1259 match result {
1260 Ok(answer) => {
1261 self.event_sender
1262 .send(SessionEvent::Answer {
1263 timestamp: crate::media::get_timestamp(),
1264 track_id,
1265 sdp: answer,
1266 refer: Some(true),
1267 })
1268 .ok();
1269 }
1270 Err(e) => {
1271 warn!(
1272 session_id = session_id,
1273 "failed to create refer sip track: {}", e
1274 );
1275 match &e {
1276 rsipstack::Error::DialogError(reason, _, code) => {
1277 self.event_sender
1278 .send(SessionEvent::Reject {
1279 track_id,
1280 timestamp: crate::media::get_timestamp(),
1281 reason: reason.clone(),
1282 code: Some(code.code() as u32),
1283 refer: Some(true),
1284 })
1285 .ok();
1286 }
1287 _ => {}
1288 }
1289 return Err(e.into());
1290 }
1291 }
1292 Ok(())
1293 }
1294
1295 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1296 self.media_stream.mute_track(track_id).await;
1297 Ok(())
1298 }
1299
1300 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1301 self.media_stream.unmute_track(track_id).await;
1302 Ok(())
1303 }
1304
1305 pub async fn cleanup(&self) -> Result<()> {
1306 self.call_state.write().await.tts_handle = None;
1307 self.media_stream.cleanup().await.ok();
1308 Ok(())
1309 }
1310
1311 pub fn get_callrecord(&self) -> Option<CallRecord> {
1312 self.call_state.try_read().ok().map(|call_state| {
1313 call_state.build_callrecord(
1314 self.app_state.clone(),
1315 self.session_id.clone(),
1316 self.call_type.clone(),
1317 )
1318 })
1319 }
1320
1321 async fn dump_to_file(
1322 &self,
1323 dump_file: &mut File,
1324 cmd_receiver: &mut CommandReceiver,
1325 event_receiver: &mut EventReceiver,
1326 ) {
1327 loop {
1328 select! {
1329 _ = self.cancel_token.cancelled() => {
1330 break;
1331 }
1332 Ok(cmd) = cmd_receiver.recv() => {
1333 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1334 .await;
1335 }
1336 Ok(event) = event_receiver.recv() => {
1337 if matches!(event, SessionEvent::Binary{..}) {
1338 continue;
1339 }
1340 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1341 .await;
1342 }
1343 };
1344 }
1345 }
1346
1347 async fn dump_loop(
1348 &self,
1349 dump_events: bool,
1350 mut dump_cmd_receiver: CommandReceiver,
1351 mut dump_event_receiver: EventReceiver,
1352 ) {
1353 if !dump_events {
1354 return;
1355 }
1356
1357 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1358 let mut dump_file = match File::options()
1359 .create(true)
1360 .append(true)
1361 .open(&file_name)
1362 .await
1363 {
1364 Ok(file) => file,
1365 Err(e) => {
1366 warn!(
1367 session_id = self.session_id,
1368 file_name, "failed to open dump events file: {}", e
1369 );
1370 return;
1371 }
1372 };
1373 self.dump_to_file(
1374 &mut dump_file,
1375 &mut dump_cmd_receiver,
1376 &mut dump_event_receiver,
1377 )
1378 .await;
1379
1380 while let Ok(event) = dump_event_receiver.try_recv() {
1381 if matches!(event, SessionEvent::Binary { .. }) {
1382 continue;
1383 }
1384 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1385 }
1386 }
1387
1388 pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1389 let mut rtc_config = RtcTrackConfig::default();
1390 rtc_config.mode = rustrtc::TransportMode::Rtp;
1391
1392 if let Some(codecs) = &self.app_state.config.codecs {
1393 let mut codec_types = Vec::new();
1394 for c in codecs {
1395 match c.to_lowercase().as_str() {
1396 "pcmu" => codec_types.push(CodecType::PCMU),
1397 "pcma" => codec_types.push(CodecType::PCMA),
1398 "g722" => codec_types.push(CodecType::G722),
1399 "g729" => codec_types.push(CodecType::G729),
1400 "opus" => codec_types.push(CodecType::Opus),
1401 "dtmf" | "2833" | "telephone_event" => {
1402 codec_types.push(CodecType::TelephoneEvent)
1403 }
1404 _ => {}
1405 }
1406 }
1407 if !codec_types.is_empty() {
1408 rtc_config.preferred_codec = Some(codec_types[0].clone());
1409 rtc_config.codecs = codec_types;
1410 }
1411 }
1412
1413 if rtc_config.preferred_codec.is_none() {
1414 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1415 }
1416
1417 rtc_config.rtp_port_range = self
1418 .app_state
1419 .config
1420 .rtp_start_port
1421 .zip(self.app_state.config.rtp_end_port);
1422
1423 if let Some(ref external_ip) = self.app_state.config.external_ip {
1424 rtc_config.external_ip = Some(external_ip.clone());
1425 }
1426
1427 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1428
1429 let mut track = RtcTrack::new(
1430 self.cancel_token.child_token(),
1431 track_id,
1432 self.track_config.clone(),
1433 rtc_config,
1434 )
1435 .with_ssrc(ssrc);
1436
1437 track.create().await?;
1438
1439 Ok(track)
1440 }
1441
1442 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1443 self.call_state.write().await.option = Some(option.clone());
1444 info!(
1445 session_id = self.session_id,
1446 call_type = ?self.call_type,
1447 "setup caller track"
1448 );
1449
1450 let track = match self.call_type {
1451 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1452 ActiveCallType::WebSocket => {
1453 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1454 if let Some(receiver) = audio_receiver {
1455 Some(self.create_websocket_track(receiver).await?)
1456 } else {
1457 None
1458 }
1459 }
1460 ActiveCallType::Sip => {
1461 if let Some(dialog_id) = self
1462 .invitation
1463 .find_dialog_id_by_session_id(&self.session_id)
1464 {
1465 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1466 return self
1467 .prepare_incoming_sip_track(
1468 self.cancel_token.clone(),
1469 self.call_state.clone(),
1470 &self.session_id,
1471 pending_dialog,
1472 )
1473 .await;
1474 }
1475 }
1476
1477 let mut invite_option = option.build_invite_option()?;
1478 invite_option.call_id = Some(self.session_id.clone());
1479
1480 match self
1481 .create_outgoing_sip_track(
1482 self.cancel_token.clone(),
1483 self.call_state.clone(),
1484 &self.session_id,
1485 invite_option,
1486 &option,
1487 None,
1488 false,
1489 )
1490 .await
1491 {
1492 Ok(answer) => {
1493 self.event_sender
1494 .send(SessionEvent::Answer {
1495 timestamp: crate::media::get_timestamp(),
1496 track_id: self.session_id.clone(),
1497 sdp: answer,
1498 refer: Some(false),
1499 })
1500 .ok();
1501 return Ok(());
1502 }
1503 Err(e) => {
1504 warn!(
1505 session_id = self.session_id,
1506 "failed to create sip track: {}", e
1507 );
1508 match &e {
1509 rsipstack::Error::DialogError(reason, _, code) => {
1510 self.event_sender
1511 .send(SessionEvent::Reject {
1512 track_id: self.session_id.clone(),
1513 timestamp: crate::media::get_timestamp(),
1514 reason: reason.clone(),
1515 code: Some(code.code() as u32),
1516 refer: Some(false),
1517 })
1518 .ok();
1519 }
1520 _ => {}
1521 }
1522 return Err(e.into());
1523 }
1524 }
1525 }
1526 ActiveCallType::B2bua => {
1527 if let Some(dialog_id) = self
1528 .invitation
1529 .find_dialog_id_by_session_id(&self.session_id)
1530 {
1531 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1532 return self
1533 .prepare_incoming_sip_track(
1534 self.cancel_token.clone(),
1535 self.call_state.clone(),
1536 &self.session_id,
1537 pending_dialog,
1538 )
1539 .await;
1540 }
1541 }
1542
1543 warn!(
1544 session_id = self.session_id,
1545 "no pending dialog found for B2BUA call"
1546 );
1547 return Err(anyhow::anyhow!(
1548 "no pending dialog found for session_id: {}",
1549 self.session_id
1550 ));
1551 }
1552 };
1553 match track {
1554 Some(track) => {
1555 self.finish_caller_stack(&option, Some(track)).await?;
1556 }
1557 None => {
1558 warn!(session_id = self.session_id, "no track created for caller");
1559 return Err(anyhow::anyhow!("no track created for caller"));
1560 }
1561 }
1562 Ok(())
1563 }
1564
1565 async fn finish_caller_stack(
1566 &self,
1567 option: &CallOption,
1568 track: Option<Box<dyn Track>>,
1569 ) -> Result<()> {
1570 if let Some(track) = track {
1571 self.setup_track_with_stream(&option, track).await?;
1572 }
1573
1574 {
1575 let call_state = self.call_state.read().await;
1576 if let Some(ref answer) = call_state.answer {
1577 info!(
1578 session_id = self.session_id,
1579 "sending answer event: {}", answer,
1580 );
1581 self.event_sender
1582 .send(SessionEvent::Answer {
1583 timestamp: crate::media::get_timestamp(),
1584 track_id: self.session_id.clone(),
1585 sdp: answer.clone(),
1586 refer: Some(false),
1587 })
1588 .ok();
1589 } else {
1590 warn!(
1591 session_id = self.session_id,
1592 "no answer in state to send event"
1593 );
1594 }
1595 }
1596 Ok(())
1597 }
1598
1599 pub async fn setup_track_with_stream(
1600 &self,
1601 option: &CallOption,
1602 mut track: Box<dyn Track>,
1603 ) -> Result<()> {
1604 let processors = match StreamEngine::create_processors(
1605 self.app_state.stream_engine.clone(),
1606 track.as_ref(),
1607 self.cancel_token.child_token(),
1608 self.event_sender.clone(),
1609 self.media_stream.packet_sender.clone(),
1610 option,
1611 )
1612 .await
1613 {
1614 Ok(processors) => processors,
1615 Err(e) => {
1616 warn!(
1617 session_id = self.session_id,
1618 "failed to prepare stream processors: {}", e
1619 );
1620 vec![]
1621 }
1622 };
1623
1624 for processor in processors {
1626 track.append_processor(processor);
1627 }
1628
1629 self.update_track_wrapper(track, None).await;
1630 Ok(())
1631 }
1632
1633 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1634 let ambiance_opt = {
1635 let state = self.call_state.read().await;
1636 let mut opt = state
1637 .option
1638 .as_ref()
1639 .and_then(|o| o.ambiance.clone())
1640 .unwrap_or_default();
1641
1642 if let Some(global) = &self.app_state.config.ambiance {
1643 opt.merge(global);
1644 }
1645 opt
1646 };
1647 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1648 match AmbianceProcessor::new(ambiance_opt).await {
1649 Ok(ambiance) => {
1650 info!(session_id = self.session_id, "loaded ambiance processor");
1651 track.append_processor(Box::new(ambiance));
1652 }
1653 Err(e) => {
1654 tracing::error!("failed to load ambiance wav {}", e);
1655 }
1656 }
1657 }
1658 self.call_state.write().await.current_play_id = play_id.clone();
1659 self.media_stream.update_track(track, play_id).await;
1660 }
1661
1662 pub async fn create_websocket_track(
1663 &self,
1664 audio_receiver: WebsocketBytesReceiver,
1665 ) -> Result<Box<dyn Track>> {
1666 let (ssrc, codec) = {
1667 let call_state = self.call_state.read().await;
1668 (
1669 call_state.ssrc,
1670 call_state
1671 .option
1672 .as_ref()
1673 .map(|o| o.codec.clone())
1674 .unwrap_or_default(),
1675 )
1676 };
1677
1678 let ws_track = WebsocketTrack::new(
1679 self.cancel_token.child_token(),
1680 self.session_id.clone(),
1681 self.track_config.clone(),
1682 self.event_sender.clone(),
1683 audio_receiver,
1684 codec,
1685 ssrc,
1686 );
1687
1688 {
1689 let mut call_state = self.call_state.write().await;
1690 call_state.answer_time = Some(Utc::now());
1691 call_state.answer = Some("".to_string());
1692 call_state.last_status_code = 200;
1693 }
1694
1695 Ok(Box::new(ws_track))
1696 }
1697
1698 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1699 let (ssrc, option) = {
1700 let call_state = self.call_state.read().await;
1701 (
1702 call_state.ssrc,
1703 call_state.option.clone().unwrap_or_default(),
1704 )
1705 };
1706
1707 let mut rtc_config = RtcTrackConfig::default();
1708 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1710
1711 if let Some(codecs) = &self.app_state.config.codecs {
1712 let mut codec_types = Vec::new();
1713 for c in codecs {
1714 match c.to_lowercase().as_str() {
1715 "pcmu" => codec_types.push(CodecType::PCMU),
1716 "pcma" => codec_types.push(CodecType::PCMA),
1717 "g722" => codec_types.push(CodecType::G722),
1718 "g729" => codec_types.push(CodecType::G729),
1719 "opus" => codec_types.push(CodecType::Opus),
1720 "dtmf" | "2833" | "telephone_event" => {
1721 codec_types.push(CodecType::TelephoneEvent)
1722 }
1723 _ => {}
1724 }
1725 }
1726 if !codec_types.is_empty() {
1727 rtc_config.preferred_codec = Some(codec_types[0].clone());
1728 rtc_config.codecs = codec_types;
1729 }
1730 }
1731
1732 if let Some(ref external_ip) = self.app_state.config.external_ip {
1733 rtc_config.external_ip = Some(external_ip.clone());
1734 }
1735
1736 let mut webrtc_track = RtcTrack::new(
1737 self.cancel_token.child_token(),
1738 self.session_id.clone(),
1739 self.track_config.clone(),
1740 rtc_config,
1741 )
1742 .with_ssrc(ssrc);
1743
1744 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1745 let offer = match option.enable_ipv6 {
1746 Some(false) | None => {
1747 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1748 }
1749 _ => option.offer.clone().unwrap_or("".to_string()),
1750 };
1751 let answer: Option<String>;
1752 match webrtc_track.handshake(offer, timeout).await {
1753 Ok(answer_sdp) => {
1754 answer = match option.enable_ipv6 {
1755 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1756 Some(true) => Some(answer_sdp.to_string()),
1757 };
1758 }
1759 Err(e) => {
1760 warn!(session_id = self.session_id, "failed to setup track: {}", e);
1761 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1762 }
1763 }
1764
1765 {
1766 let mut call_state = self.call_state.write().await;
1767 call_state.answer_time = Some(Utc::now());
1768 call_state.answer = answer;
1769 call_state.last_status_code = 200;
1770 }
1771 Ok(Box::new(webrtc_track))
1772 }
1773
1774 async fn create_outgoing_sip_track(
1775 &self,
1776 cancel_token: CancellationToken,
1777 call_state_ref: ActiveCallStateRef,
1778 track_id: &String,
1779 mut invite_option: InviteOption,
1780 call_option: &CallOption,
1781 moh: Option<String>,
1782 auto_hangup: bool,
1783 ) -> Result<String, rsipstack::Error> {
1784 let ssrc = call_state_ref.read().await.ssrc;
1785 let rtp_track = self
1786 .create_rtp_track(track_id.clone(), ssrc)
1787 .await
1788 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1789
1790 let offer = Some(
1791 rtp_track
1792 .local_description()
1793 .await
1794 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1795 );
1796
1797 {
1798 let mut cs = call_state_ref.write().await;
1799 if let Some(o) = cs.option.as_mut() {
1800 o.offer = offer.clone();
1801 }
1802 cs.start_time = Utc::now();
1803 };
1804
1805 invite_option.offer = offer.clone().map(|s| s.into());
1806
1807 let needs_contact = invite_option.contact.scheme.is_none()
1810 || invite_option
1811 .contact
1812 .host_with_port
1813 .to_string()
1814 .starts_with("127.0.0.1");
1815
1816 if needs_contact {
1817 if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1818 invite_option.contact = rsip::Uri {
1819 scheme: Some(rsip::Scheme::Sip),
1820 auth: None,
1821 host_with_port: addr.addr.clone(),
1822 params: vec![],
1823 headers: vec![],
1824 };
1825 }
1826 }
1827
1828 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1829
1830 if let Some(moh) = moh {
1831 let ssrc_and_moh = {
1832 let mut state = call_state_ref.write().await;
1833 state.moh = Some(moh.clone());
1834 if state.current_play_id.is_none() {
1835 let ssrc = rand::random::<u32>();
1836 Some((ssrc, moh.clone()))
1837 } else {
1838 info!(
1839 session_id = self.session_id,
1840 "Something is playing, MOH will start after it ends"
1841 );
1842 None
1843 }
1844 };
1845
1846 if let Some((ssrc, moh_path)) = ssrc_and_moh {
1847 let file_track = FileTrack::new(self.server_side_track_id.clone())
1848 .with_play_id(Some(moh_path.clone()))
1849 .with_ssrc(ssrc)
1850 .with_path(moh_path.clone())
1851 .with_cancel_token(self.cancel_token.child_token());
1852 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1853 .await;
1854 }
1855 } else {
1856 let track = rtp_track_to_setup.take().unwrap();
1857 self.setup_track_with_stream(&call_option, track)
1858 .await
1859 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1860 }
1861
1862 info!(
1863 session_id = self.session_id,
1864 track_id,
1865 contact = %invite_option.contact,
1866 "invite {} -> {} offer: \n{}",
1867 invite_option.caller,
1868 invite_option.callee,
1869 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1870 );
1871
1872 let (dlg_state_sender, dlg_state_receiver) =
1873 self.invitation.dialog_layer.new_dialog_state_channel();
1874
1875 let states = InviteDialogStates {
1876 is_client: true,
1877 session_id: self.session_id.clone(),
1878 track_id: track_id.clone(),
1879 event_sender: self.event_sender.clone(),
1880 media_stream: self.media_stream.clone(),
1881 call_state: call_state_ref.clone(),
1882 cancel_token,
1883 terminated_reason: None,
1884 has_early_media: false,
1885 };
1886
1887 let mut client_dialog_handler =
1888 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1889
1890 crate::spawn(async move {
1891 client_dialog_handler.process_dialog(states).await;
1892 });
1893
1894 let (dialog_id, answer) = self
1895 .invitation
1896 .invite(invite_option, dlg_state_sender)
1897 .await?;
1898
1899 self.call_state.write().await.moh = None;
1900
1901 if let Some(track) = rtp_track_to_setup {
1902 self.setup_track_with_stream(&call_option, track)
1903 .await
1904 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1905 }
1906
1907 let answer = match answer {
1908 Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1909 None => {
1910 warn!(session_id = self.session_id, "no answer received");
1911 return Err(rsipstack::Error::DialogError(
1912 "No answer received".to_string(),
1913 dialog_id,
1914 rsip::StatusCode::NotAcceptableHere,
1915 ));
1916 }
1917 };
1918
1919 {
1920 let mut cs = call_state_ref.write().await;
1921 if cs.answer.is_none() {
1922 cs.answer = Some(answer.clone());
1923 }
1924 if auto_hangup {
1925 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1926 }
1927 }
1928
1929 self.media_stream
1930 .update_remote_description(&track_id, &answer)
1931 .await
1932 .ok();
1933
1934 Ok(answer)
1935 }
1936
1937 pub fn is_webrtc_sdp(sdp: &str) -> bool {
1939 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1940 && sdp.contains("a=fingerprint:")
1941 }
1942
1943 pub async fn setup_answer_track(
1944 &self,
1945 ssrc: u32,
1946 option: &CallOption,
1947 offer: String,
1948 ) -> Result<(String, Box<dyn Track>)> {
1949 let offer = match option.enable_ipv6 {
1950 Some(false) | None => strip_ipv6_candidates(&offer),
1951 _ => offer.clone(),
1952 };
1953
1954 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1955
1956 let mut media_track = if Self::is_webrtc_sdp(&offer) {
1957 let mut rtc_config = RtcTrackConfig::default();
1958 rtc_config.mode = rustrtc::TransportMode::WebRtc;
1959 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1960 if let Some(ref external_ip) = self.app_state.config.external_ip {
1961 rtc_config.external_ip = Some(external_ip.clone());
1962 }
1963 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1964
1965 let webrtc_track = RtcTrack::new(
1966 self.cancel_token.child_token(),
1967 self.session_id.clone(),
1968 self.track_config.clone(),
1969 rtc_config,
1970 )
1971 .with_ssrc(ssrc);
1972
1973 Box::new(webrtc_track) as Box<dyn Track>
1974 } else {
1975 let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1976 Box::new(rtp_track) as Box<dyn Track>
1977 };
1978
1979 let answer = match media_track.handshake(offer.clone(), timeout).await {
1980 Ok(answer) => answer,
1981 Err(e) => {
1982 return Err(anyhow::anyhow!("handshake failed: {e}"));
1983 }
1984 };
1985
1986 return Ok((answer, media_track));
1987 }
1988
1989 pub async fn prepare_incoming_sip_track(
1990 &self,
1991 cancel_token: CancellationToken,
1992 call_state_ref: ActiveCallStateRef,
1993 track_id: &String,
1994 pending_dialog: PendingDialog,
1995 ) -> Result<()> {
1996 let state_receiver = pending_dialog.state_receiver;
1997 let states = InviteDialogStates {
2000 is_client: false,
2001 session_id: self.session_id.clone(),
2002 track_id: track_id.clone(),
2003 event_sender: self.event_sender.clone(),
2004 media_stream: self.media_stream.clone(),
2005 call_state: self.call_state.clone(),
2006 cancel_token,
2007 terminated_reason: None,
2008 has_early_media: false,
2009 };
2010
2011 let initial_request = pending_dialog.dialog.initial_request();
2012 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2013
2014 let (ssrc, option) = {
2015 let call_state = call_state_ref.read().await;
2016 (
2017 call_state.ssrc,
2018 call_state.option.clone().unwrap_or_default(),
2019 )
2020 };
2021
2022 match self.setup_answer_track(ssrc, &option, offer).await {
2023 Ok((offer, track)) => {
2024 self.setup_track_with_stream(&option, track).await?;
2025 {
2026 let mut state = self.call_state.write().await;
2027 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2028 }
2029 }
2030 Err(e) => {
2031 return Err(anyhow::anyhow!("error creating track: {}", e));
2032 }
2033 }
2034
2035 let mut client_dialog_handler =
2036 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
2037
2038 crate::spawn(async move {
2039 client_dialog_handler.process_dialog(states).await;
2040 });
2041 Ok(())
2042 }
2043}
2044
2045impl Drop for ActiveCall {
2046 fn drop(&mut self) {
2047 info!(session_id = self.session_id, "dropping active call");
2048 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2049 if let Some(record) = self.get_callrecord() {
2050 if let Err(e) = sender.send(record) {
2051 warn!(
2052 session_id = self.session_id,
2053 "failed to send call record: {}", e
2054 );
2055 }
2056 }
2057 }
2058 }
2059}
2060
2061impl ActiveCallState {
2062 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2063 if let Some(existing) = &self.option {
2064 if option.asr.is_none() {
2065 option.asr = existing.asr.clone();
2066 }
2067 if option.tts.is_none() {
2068 option.tts = existing.tts.clone();
2069 }
2070 if option.vad.is_none() {
2071 option.vad = existing.vad.clone();
2072 }
2073 if option.denoise.is_none() {
2074 option.denoise = existing.denoise;
2075 }
2076 if option.recorder.is_none() {
2077 option.recorder = existing.recorder.clone();
2078 }
2079 if option.eou.is_none() {
2080 option.eou = existing.eou.clone();
2081 }
2082 if option.extra.is_none() {
2083 option.extra = existing.extra.clone();
2084 }
2085 if option.ambiance.is_none() {
2086 option.ambiance = existing.ambiance.clone();
2087 }
2088 }
2089 option
2090 }
2091
2092 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2093 if self.hangup_reason.is_none() {
2094 self.hangup_reason = Some(reason);
2095 }
2096 }
2097
2098 pub fn build_hangup_event(
2099 &self,
2100 track_id: TrackId,
2101 initiator: Option<String>,
2102 ) -> crate::event::SessionEvent {
2103 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2104 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2105 let extra = self.extras.clone();
2106
2107 crate::event::SessionEvent::Hangup {
2108 track_id,
2109 timestamp: crate::media::get_timestamp(),
2110 reason: Some(format!("{:?}", self.hangup_reason)),
2111 initiator,
2112 start_time: self.start_time.to_rfc3339(),
2113 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2114 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2115 hangup_time: Utc::now().to_rfc3339(),
2116 extra,
2117 from: from.map(|f| f.into()),
2118 to: to.map(|f| f.into()),
2119 refer: Some(self.is_refer),
2120 }
2121 }
2122
2123 pub fn build_callrecord(
2124 &self,
2125 app_state: AppState,
2126 session_id: String,
2127 call_type: ActiveCallType,
2128 ) -> CallRecord {
2129 let option = self.option.clone().unwrap_or_default();
2130 let recorder = if option.recorder.is_some() {
2131 let recorder_file = app_state.get_recorder_file(&session_id);
2132 if std::path::Path::new(&recorder_file).exists() {
2133 let file_size = std::fs::metadata(&recorder_file)
2134 .map(|m| m.len())
2135 .unwrap_or(0);
2136 vec![crate::callrecord::CallRecordMedia {
2137 track_id: session_id.clone(),
2138 path: recorder_file,
2139 size: file_size,
2140 extra: None,
2141 }]
2142 } else {
2143 vec![]
2144 }
2145 } else {
2146 vec![]
2147 };
2148
2149 let dump_event_file = app_state.get_dump_events_file(&session_id);
2150 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2151 Some(dump_event_file)
2152 } else {
2153 None
2154 };
2155
2156 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2157 if let Ok(rc) = rc.try_read() {
2158 Some(Box::new(rc.build_callrecord(
2159 app_state.clone(),
2160 rc.session_id.clone(),
2161 ActiveCallType::B2bua,
2162 )))
2163 } else {
2164 None
2165 }
2166 });
2167
2168 let caller = option.caller.clone().unwrap_or_default();
2169 let callee = option.callee.clone().unwrap_or_default();
2170
2171 CallRecord {
2172 option: Some(option),
2173 call_id: session_id,
2174 call_type,
2175 start_time: self.start_time,
2176 ring_time: self.ring_time.clone(),
2177 answer_time: self.answer_time.clone(),
2178 end_time: Utc::now(),
2179 caller,
2180 callee,
2181 hangup_reason: self.hangup_reason.clone(),
2182 hangup_messages: Vec::new(),
2183 status_code: self.last_status_code,
2184 extras: self.extras.clone(),
2185 dump_event_file,
2186 recorder,
2187 refer_callrecord,
2188 }
2189 }
2190}