1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 processor::SubscribeProcessor,
11 recorder::RecorderOption,
12 stream::{MediaStream, MediaStreamBuilder},
13 track::{
14 Track, TrackConfig,
15 file::FileTrack,
16 media_pass::MediaPassTrack,
17 rtc::{RtcTrack, RtcTrackConfig},
18 tts::SynthesisHandle,
19 websocket::{WebsocketBytesReceiver, WebsocketTrack},
20 },
21 },
22 synthesis::{SynthesisCommand, SynthesisOption},
23};
24use crate::{
25 app::AppState,
26 call::{
27 CommandReceiver, CommandSender,
28 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
29 },
30 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
31 useragent::invitation::PendingDialog,
32};
33use anyhow::Result;
34use audio_codec::CodecType;
35use chrono::{DateTime, Utc};
36use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
37use serde::{Deserialize, Serialize};
38use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
39use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
40use tokio_util::sync::CancellationToken;
41use tracing::{debug, info, warn};
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46 use crate::app::AppStateBuilder;
47 use crate::callrecord::CallRecordHangupReason;
48 use crate::config::Config;
49 use crate::media::track::tts::SynthesisHandle;
50 use crate::synthesis::SynthesisCommand;
51 use tokio::sync::mpsc;
52
53 #[tokio::test]
54 async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
55 let mut config = Config::default();
56 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
58 let stream_engine = Arc::new(StreamEngine::default());
59 let app_state = AppStateBuilder::new()
60 .with_config(config)
61 .with_stream_engine(stream_engine)
62 .build()
63 .await?;
64
65 let cancel_token = CancellationToken::new();
66 let session_id = "test-session".to_string();
67 let track_config = TrackConfig::default();
68
69 let mut option = crate::CallOption::default();
70 option.tts = Some(crate::synthesis::SynthesisOption::default());
71
72 let active_call = Arc::new(ActiveCall::new(
73 ActiveCallType::Sip,
74 cancel_token.clone(),
75 session_id.clone(),
76 app_state.invitation.clone(),
77 app_state.clone(),
78 track_config,
79 None,
80 false,
81 None,
82 None,
83 ));
84
85 {
86 let mut state = active_call.call_state.write().await;
87 state.option = Some(option);
88 }
89
90 let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
91 let initial_ssrc = 12345;
92 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
93
94 {
96 let mut state = active_call.call_state.write().await;
97 state.tts_handle = Some(handle);
98 state.current_play_id = Some("play_1".to_string());
99 }
100
101 active_call
103 .do_tts(
104 "hangup now".to_string(),
105 None,
106 Some("play_1".to_string()),
107 Some(true),
108 false,
109 true,
110 None,
111 None,
112 false,
113 )
114 .await?;
115
116 {
118 let state = active_call.call_state.read().await;
119 assert!(state.auto_hangup.is_some());
120 let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
121 assert_eq!(
122 h_ssrc, initial_ssrc,
123 "SSRC should be reused from existing handle"
124 );
125 assert_eq!(reason, CallRecordHangupReason::BySystem);
126 }
127
128 let cmd = rx.try_recv().expect("Should have received tts command");
130 assert_eq!(cmd.text, "hangup now");
131
132 Ok(())
133 }
134
135 #[tokio::test]
136 async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
137 let mut config = Config::default();
138 config.udp_port = 0; config.media_cache_path = "/tmp/mediacache".to_string();
140 let stream_engine = Arc::new(StreamEngine::default());
141 let app_state = AppStateBuilder::new()
142 .with_config(config)
143 .with_stream_engine(stream_engine)
144 .build()
145 .await?;
146
147 let active_call = Arc::new(ActiveCall::new(
148 ActiveCallType::Sip,
149 CancellationToken::new(),
150 "test-session".to_string(),
151 app_state.invitation.clone(),
152 app_state.clone(),
153 TrackConfig::default(),
154 None,
155 false,
156 None,
157 None,
158 ));
159
160 let mut tts_opt = crate::synthesis::SynthesisOption::default();
161 tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
162 let mut option = crate::CallOption::default();
163 option.tts = Some(tts_opt);
164 {
165 let mut state = active_call.call_state.write().await;
166 state.option = Some(option);
167 }
168
169 let (tx, _rx) = mpsc::unbounded_channel();
170 let initial_ssrc = 111;
171 let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
172
173 {
174 let mut state = active_call.call_state.write().await;
175 state.tts_handle = Some(handle);
176 state.current_play_id = Some("play_1".to_string());
177 }
178
179 active_call
181 .do_tts(
182 "new play".to_string(),
183 None,
184 Some("play_2".to_string()),
185 Some(true),
186 false,
187 true,
188 None,
189 None,
190 false,
191 )
192 .await?;
193
194 {
196 let state = active_call.call_state.read().await;
197 let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
198 assert_ne!(
199 h_ssrc, initial_ssrc,
200 "Should use a new SSRC for different play_id"
201 );
202 }
203
204 Ok(())
205 }
206}
207
208#[derive(Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct CallParams {
211 pub id: Option<String>,
212 #[serde(rename = "dump")]
213 pub dump_events: Option<bool>,
214 #[serde(rename = "ping")]
215 pub ping_interval: Option<u32>,
216 pub server_side_track: Option<String>,
217}
218
219#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
220#[serde(rename_all = "camelCase")]
221pub enum ActiveCallType {
222 Webrtc,
223 B2bua,
224 WebSocket,
225 #[default]
226 Sip,
227}
228
229#[derive(Default)]
230pub struct ActiveCallState {
231 pub session_id: String,
232 pub start_time: DateTime<Utc>,
233 pub ring_time: Option<DateTime<Utc>>,
234 pub answer_time: Option<DateTime<Utc>>,
235 pub hangup_reason: Option<CallRecordHangupReason>,
236 pub last_status_code: u16,
237 pub option: Option<CallOption>,
238 pub answer: Option<String>,
239 pub ssrc: u32,
240 pub refer_callstate: Option<ActiveCallStateRef>,
241 pub extras: Option<HashMap<String, serde_json::Value>>,
242 pub is_refer: bool,
243
244 pub tts_handle: Option<SynthesisHandle>,
246 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
247 pub wait_input_timeout: Option<u32>,
248 pub moh: Option<String>,
249 pub current_play_id: Option<String>,
250 pub audio_receiver: Option<WebsocketBytesReceiver>,
251 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
252}
253
254pub type ActiveCallRef = Arc<ActiveCall>;
255pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
256
257pub struct ActiveCall {
258 pub call_state: ActiveCallStateRef,
259 pub cancel_token: CancellationToken,
260 pub call_type: ActiveCallType,
261 pub session_id: String,
262 pub media_stream: Arc<MediaStream>,
263 pub track_config: TrackConfig,
264 pub event_sender: EventSender,
265 pub app_state: AppState,
266 pub invitation: Invitation,
267 pub cmd_sender: CommandSender,
268 pub dump_events: bool,
269 pub server_side_track_id: TrackId,
270}
271
272pub struct ActiveCallGuard {
273 pub call: ActiveCallRef,
274 pub active_calls: usize,
275}
276
277impl ActiveCallGuard {
278 pub fn new(call: ActiveCallRef) -> Self {
279 let active_calls = {
280 call.app_state
281 .total_calls
282 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
283 let mut calls = call.app_state.active_calls.lock().unwrap();
284 calls.insert(call.session_id.clone(), call.clone());
285 calls.len()
286 };
287 Self { call, active_calls }
288 }
289}
290
291impl Drop for ActiveCallGuard {
292 fn drop(&mut self) {
293 self.call
294 .app_state
295 .active_calls
296 .lock()
297 .unwrap()
298 .remove(&self.call.session_id);
299 }
300}
301
302pub struct ActiveCallReceiver {
303 pub cmd_receiver: CommandReceiver,
304 pub dump_cmd_receiver: CommandReceiver,
305 pub dump_event_receiver: EventReceiver,
306}
307
308impl ActiveCall {
309 pub fn new(
310 call_type: ActiveCallType,
311 cancel_token: CancellationToken,
312 session_id: String,
313 invitation: Invitation,
314 app_state: AppState,
315 track_config: TrackConfig,
316 audio_receiver: Option<WebsocketBytesReceiver>,
317 dump_events: bool,
318 server_side_track_id: Option<TrackId>,
319 extras: Option<HashMap<String, serde_json::Value>>,
320 ) -> Self {
321 let event_sender = crate::event::create_event_sender();
322 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
323 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
324 .with_id(session_id.clone())
325 .with_cancel_token(cancel_token.child_token());
326 let media_stream = Arc::new(media_stream_builder.build());
327 let call_state = Arc::new(RwLock::new(ActiveCallState {
328 session_id: session_id.clone(),
329 start_time: Utc::now(),
330 ssrc: rand::random::<u32>(),
331 extras,
332 audio_receiver,
333 ..Default::default()
334 }));
335 Self {
336 cancel_token,
337 call_type,
338 session_id,
339 call_state,
340 media_stream,
341 track_config,
342 event_sender,
343 app_state,
344 invitation,
345 cmd_sender,
346 dump_events,
347 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
348 }
349 }
350
351 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
352 self.cmd_sender
353 .send(command)
354 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
355 Ok(())
356 }
357
358 pub fn new_receiver(&self) -> ActiveCallReceiver {
362 ActiveCallReceiver {
363 cmd_receiver: self.cmd_sender.subscribe(),
364 dump_cmd_receiver: self.cmd_sender.subscribe(),
365 dump_event_receiver: self.event_sender.subscribe(),
366 }
367 }
368
369 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
370 let ActiveCallReceiver {
371 mut cmd_receiver,
372 dump_cmd_receiver,
373 dump_event_receiver,
374 } = receiver;
375
376 let process_command_loop = async move {
377 while let Ok(command) = cmd_receiver.recv().await {
378 match self.dispatch(command).await {
379 Ok(_) => (),
380 Err(e) => {
381 warn!(session_id = self.session_id, "{}", e);
382 self.event_sender
383 .send(SessionEvent::Error {
384 track_id: self.session_id.clone(),
385 timestamp: crate::media::get_timestamp(),
386 sender: "command".to_string(),
387 error: e.to_string(),
388 code: None,
389 })
390 .ok();
391 }
392 }
393 }
394 };
395 self.app_state
396 .total_calls
397 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
398
399 tokio::join!(
400 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
401 async {
402 select! {
403 _ = process_command_loop => {
404 info!(session_id = self.session_id, "command loop done");
405 }
406 _ = self.process() => {
407 info!(session_id = self.session_id, "call serve done");
408 }
409 _ = self.cancel_token.cancelled() => {
410 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
411 }
412 }
413 self.cancel_token.cancel();
414 }
415 );
416 Ok(())
417 }
418
419 async fn process(&self) -> Result<()> {
420 let mut event_receiver = self.event_sender.subscribe();
421
422 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
423 let input_timeout_expire_ref = input_timeout_expire.clone();
424 let event_sender = self.event_sender.clone();
425 let wait_input_timeout_loop = async {
426 loop {
427 let (start_time, expire) = { *input_timeout_expire.lock().await };
428 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
429 info!(session_id = self.session_id, "wait input timeout reached");
430 *input_timeout_expire.lock().await = (0, 0);
431 event_sender
432 .send(SessionEvent::Silence {
433 track_id: self.server_side_track_id.clone(),
434 timestamp: crate::media::get_timestamp(),
435 start_time,
436 duration: expire as u64,
437 samples: None,
438 })
439 .ok();
440 }
441 sleep(Duration::from_millis(100)).await;
442 }
443 };
444 let server_side_track_id = self.server_side_track_id.clone();
445 let event_hook_loop = async move {
446 while let Ok(event) = event_receiver.recv().await {
447 match event {
448 SessionEvent::Speaking { .. }
449 | SessionEvent::Dtmf { .. }
450 | SessionEvent::AsrDelta { .. }
451 | SessionEvent::AsrFinal { .. }
452 | SessionEvent::TrackStart { .. } => {
453 *input_timeout_expire_ref.lock().await = (0, 0);
454 }
455 SessionEvent::TrackEnd {
456 track_id,
457 play_id,
458 ssrc,
459 ..
460 } => {
461 if track_id != server_side_track_id {
462 continue;
463 }
464
465 let (moh_path, auto_hangup, wait_timeout_val) = {
466 let mut state = self.call_state.write().await;
467 if play_id != state.current_play_id {
468 debug!(
469 session_id = self.session_id,
470 ?play_id,
471 current = ?state.current_play_id,
472 "ignoring interrupted track end"
473 );
474 continue;
475 }
476 state.current_play_id = None;
477 (
478 state.moh.clone(),
479 state.auto_hangup.clone(),
480 state.wait_input_timeout.take(),
481 )
482 };
483
484 if let Some(path) = moh_path {
485 info!(session_id = self.session_id, "looping moh: {}", path);
486 let ssrc = rand::random::<u32>();
487 let file_track = FileTrack::new(self.server_side_track_id.clone())
488 .with_play_id(Some(path.clone()))
489 .with_ssrc(ssrc)
490 .with_path(path.clone())
491 .with_cancel_token(self.cancel_token.child_token());
492 self.update_track_wrapper(Box::new(file_track), Some(path))
493 .await;
494 continue;
495 }
496
497 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
498 if hangup_ssrc == ssrc {
499 info!(
500 session_id = self.session_id,
501 ssrc, "auto hangup when track end track_id:{}", track_id
502 );
503 self.do_hangup(Some(hangup_reason), None).await.ok();
504 }
505 }
506
507 if let Some(timeout) = wait_timeout_val {
508 let expire = if timeout > 0 {
509 (crate::media::get_timestamp(), timeout)
510 } else {
511 (0, 0)
512 };
513 *input_timeout_expire_ref.lock().await = expire;
514 }
515 }
516 SessionEvent::Interrupt { receiver } => {
517 let track_id =
518 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
519 if track_id == self.server_side_track_id {
520 debug!(
521 session_id = self.session_id,
522 "received interrupt event, stopping playback"
523 );
524 self.do_interrupt(true).await.ok();
525 }
526 }
527 SessionEvent::Inactivity { track_id, .. } => {
528 info!(
529 session_id = self.session_id,
530 track_id, "inactivity timeout reached, hanging up"
531 );
532 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
533 .await
534 .ok();
535 }
536 SessionEvent::Error { track_id, .. } => {
537 if track_id != server_side_track_id {
538 continue;
539 }
540
541 let moh_info = {
542 let mut state = self.call_state.write().await;
543 if let Some(path) = state.moh.clone() {
544 let fallback = "./config/sounds/refer_moh.wav".to_string();
545 let next_path = if path != fallback
546 && std::path::Path::new(&fallback).exists()
547 {
548 info!(
549 session_id = self.session_id,
550 "moh error, switching to fallback: {}", fallback
551 );
552 state.moh = Some(fallback.clone());
553 fallback
554 } else {
555 info!(
556 session_id = self.session_id,
557 "looping moh on error: {}", path
558 );
559 path
560 };
561 Some(next_path)
562 } else {
563 None
564 }
565 };
566
567 if let Some(next_path) = moh_info {
568 let ssrc = rand::random::<u32>();
569 let file_track = FileTrack::new(self.server_side_track_id.clone())
570 .with_play_id(Some(next_path.clone()))
571 .with_ssrc(ssrc)
572 .with_path(next_path.clone())
573 .with_cancel_token(self.cancel_token.child_token());
574 self.update_track_wrapper(Box::new(file_track), Some(next_path))
575 .await;
576 continue;
577 }
578 }
579 _ => {}
580 }
581 }
582 };
583
584 select! {
585 _ = wait_input_timeout_loop=>{
586 info!(session_id = self.session_id, "wait input timeout loop done");
587 }
588 _ = self.media_stream.serve() => {
589 info!(session_id = self.session_id, "media stream loop done");
590 }
591 _ = event_hook_loop => {
592 info!(session_id = self.session_id, "event loop done");
593 }
594 }
595 Ok(())
596 }
597
598 async fn dispatch(&self, command: Command) -> Result<()> {
599 match command {
600 Command::Invite { option } => self.do_invite(option).await,
601 Command::Accept { option } => self.do_accept(option).await,
602 Command::Reject { reason, code } => {
603 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
604 .await
605 }
606 Command::Ringing {
607 ringtone,
608 recorder,
609 early_media,
610 } => self.do_ringing(ringtone, recorder, early_media).await,
611 Command::Tts {
612 text,
613 speaker,
614 play_id,
615 auto_hangup,
616 streaming,
617 end_of_stream,
618 option,
619 wait_input_timeout,
620 base64,
621 } => {
622 self.do_tts(
623 text,
624 speaker,
625 play_id,
626 auto_hangup,
627 streaming.unwrap_or_default(),
628 end_of_stream.unwrap_or_default(),
629 option,
630 wait_input_timeout,
631 base64.unwrap_or_default(),
632 )
633 .await
634 }
635 Command::Play {
636 url,
637 play_id,
638 auto_hangup,
639 wait_input_timeout,
640 } => {
641 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
642 .await
643 }
644 Command::Hangup { reason, initiator } => {
645 let reason = reason.map(|r| {
646 r.parse::<CallRecordHangupReason>()
647 .unwrap_or(CallRecordHangupReason::BySystem)
648 });
649 self.do_hangup(reason, initiator).await
650 }
651 Command::Refer {
652 caller,
653 callee,
654 options,
655 } => self.do_refer(caller, callee, options).await,
656 Command::Mute { track_id } => self.do_mute(track_id).await,
657 Command::Unmute { track_id } => self.do_unmute(track_id).await,
658 Command::Pause {} => self.do_pause().await,
659 Command::Resume {} => self.do_resume().await,
660 Command::Interrupt {
661 graceful: passage,
662 fade_out_ms: _,
663 } => self.do_interrupt(passage.unwrap_or_default()).await,
664 Command::History { speaker, text } => self.do_history(speaker, text).await,
665 }
666 }
667
668 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
669 if let Some(recorder_option) = &option.recorder {
670 let mut recorder_file = recorder_option.recorder_file.clone();
671 if recorder_file.contains("{id}") {
672 recorder_file = recorder_file.replace("{id}", &self.session_id);
673 }
674
675 let recorder_file = if recorder_file.is_empty() {
676 self.app_state.get_recorder_file(&self.session_id)
677 } else {
678 let p = Path::new(&recorder_file);
679 p.is_absolute()
680 .then(|| recorder_file.clone())
681 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
682 };
683 info!(
684 session_id = self.session_id,
685 recorder_file, "created recording file"
686 );
687
688 let track_samplerate = self.track_config.samplerate;
689 let recorder_samplerate = if track_samplerate > 0 {
690 track_samplerate
691 } else {
692 recorder_option.samplerate
693 };
694 let recorder_ptime = if recorder_option.ptime == 0 {
695 200
696 } else {
697 recorder_option.ptime
698 };
699 let requested_format = recorder_option
700 .format
701 .unwrap_or(self.app_state.config.recorder_format());
702 let format = requested_format.effective();
703 if requested_format != format {
704 warn!(
705 session_id = self.session_id,
706 requested = requested_format.extension(),
707 "Recorder format fallback to wav due to unsupported feature"
708 );
709 }
710 let mut recorder_config = RecorderOption {
711 recorder_file,
712 samplerate: recorder_samplerate,
713 ptime: recorder_ptime,
714 format: Some(format),
715 };
716 recorder_config.ensure_path_extension(format);
717 Some(recorder_config)
718 } else {
719 None
720 }
721 }
722
723 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
724 {
726 let state = self.call_state.read().await;
727 option = state.merge_option(option);
728 }
729
730 option.check_default();
731 if let Some(opt) = self.build_record_option(&option) {
732 self.media_stream.update_recorder_option(opt).await;
733 }
734
735 if let Some(opt) = &option.media_pass {
736 let track_id = self.server_side_track_id.clone();
737 let cancel_token = self.cancel_token.child_token();
738 let ssrc = rand::random::<u32>();
739 let media_pass_track = MediaPassTrack::new(
740 self.session_id.clone(),
741 ssrc,
742 track_id,
743 cancel_token,
744 opt.clone(),
745 );
746 self.update_track_wrapper(Box::new(media_pass_track), None)
747 .await;
748 }
749
750 info!(
751 session_id = self.session_id,
752 call_type = ?self.call_type,
753 sender,
754 ?option,
755 "caller with option"
756 );
757
758 match self.setup_caller_track(&option).await {
759 Ok(_) => return Ok(option),
760 Err(e) => {
761 self.app_state
762 .total_failed_calls
763 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
764 let error_event = crate::event::SessionEvent::Error {
765 track_id: self.session_id.clone(),
766 timestamp: crate::media::get_timestamp(),
767 sender,
768 error: e.to_string(),
769 code: None,
770 };
771 self.event_sender.send(error_event).ok();
772 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
773 .await
774 .ok();
775 return Err(e);
776 }
777 }
778 }
779
780 async fn do_invite(&self, option: CallOption) -> Result<()> {
781 self.invite_or_accept(option, "invite".to_string())
782 .await
783 .map(|_| ())
784 }
785
786 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
787 let has_pending = self
788 .invitation
789 .find_dialog_id_by_session_id(&self.session_id)
790 .is_some();
791 let ready_to_answer_val = {
792 let state = self.call_state.read().await;
793 state.ready_to_answer.is_none()
794 };
795
796 if ready_to_answer_val {
797 if !has_pending {
798 warn!(session_id = self.session_id, "no pending call to accept");
800 let rejet_event = crate::event::SessionEvent::Reject {
801 track_id: self.session_id.clone(),
802 timestamp: crate::media::get_timestamp(),
803 reason: "no pending call".to_string(),
804 refer: None,
805 code: Some(486),
806 };
807 self.event_sender.send(rejet_event).ok();
808 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
809 .await
810 .ok();
811 return Err(anyhow::anyhow!("no pending call to accept"));
812 }
813 option = self.invite_or_accept(option, "accept".to_string()).await?;
814 } else {
815 option.check_default();
816 self.call_state.write().await.option = Some(option.clone());
817 }
818 info!(session_id = self.session_id, ?option, "accepting call");
819 let ready = self.call_state.write().await.ready_to_answer.take();
820 if let Some((answer, track, dialog)) = ready {
821 info!(
822 session_id = self.session_id,
823 track_id = track.as_ref().map(|t| t.id()),
824 "ready to answer with track"
825 );
826
827 let headers = vec![rsip::Header::ContentType(
828 "application/sdp".to_string().into(),
829 )];
830
831 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
832 Ok(_) => {
833 {
834 let mut state = self.call_state.write().await;
835 state.answer = Some(answer);
836 state.answer_time = Some(Utc::now());
837 }
838 self.finish_caller_stack(&option, track).await?;
839 }
840 Err(e) => {
841 warn!(session_id = self.session_id, "failed to accept call: {}", e);
842 return Err(anyhow::anyhow!("failed to accept call"));
843 }
844 }
845 }
846 return Ok(());
847 }
848
849 async fn do_reject(
850 &self,
851 code: Option<rsip::StatusCode>,
852 reason: Option<String>,
853 ) -> Result<()> {
854 match self
855 .invitation
856 .find_dialog_id_by_session_id(&self.session_id)
857 {
858 Some(id) => {
859 info!(
860 session_id = self.session_id,
861 ?reason,
862 ?code,
863 "rejecting call"
864 );
865 self.invitation.hangup(id, code, reason).await
866 }
867 None => Ok(()),
868 }
869 }
870
871 async fn do_ringing(
872 &self,
873 ringtone: Option<String>,
874 recorder: Option<RecorderOption>,
875 early_media: Option<bool>,
876 ) -> Result<()> {
877 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
878 if ready_to_answer_val {
879 let option = CallOption {
880 recorder,
881 ..Default::default()
882 };
883 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
884 }
885
886 let state = self.call_state.read().await;
887 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
888 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
889 let headers = vec![rsip::Header::ContentType(
890 "application/sdp".to_string().into(),
891 )];
892 (Some(headers), Some(answer.as_bytes().to_vec()))
893 } else {
894 (None, None)
895 };
896
897 dialog.ringing(headers, body).ok();
898 info!(
899 session_id = self.session_id,
900 ringtone, early_media, "playing ringtone"
901 );
902 if let Some(ringtone_url) = ringtone {
903 drop(state);
904 self.do_play(ringtone_url, None, None, None).await.ok();
905 } else {
906 info!(session_id = self.session_id, "no ringtone to play");
907 }
908 }
909 Ok(())
910 }
911
912 async fn do_tts(
913 &self,
914 text: String,
915 speaker: Option<String>,
916 play_id: Option<String>,
917 auto_hangup: Option<bool>,
918 streaming: bool,
919 end_of_stream: bool,
920 option: Option<SynthesisOption>,
921 wait_input_timeout: Option<u32>,
922 base64: bool,
923 ) -> Result<()> {
924 let tts_option = {
925 let call_state = self.call_state.read().await;
926 match call_state.option.clone().unwrap_or_default().tts {
927 Some(opt) => opt.merge_with(option),
928 None => {
929 if let Some(opt) = option {
930 opt
931 } else {
932 return Err(anyhow::anyhow!("no tts option available"));
933 }
934 }
935 }
936 };
937 let speaker = match speaker {
938 Some(s) => Some(s),
939 None => tts_option.speaker.clone(),
940 };
941
942 let mut play_command = SynthesisCommand {
943 text,
944 speaker,
945 play_id: play_id.clone(),
946 streaming,
947 end_of_stream,
948 option: tts_option,
949 base64,
950 };
951 info!(
952 session_id = self.session_id,
953 provider = ?play_command.option.provider,
954 text = %play_command.text.chars().take(10).collect::<String>(),
955 speaker = play_command.speaker.as_deref(),
956 auto_hangup = auto_hangup.unwrap_or_default(),
957 play_id = play_command.play_id.as_deref(),
958 streaming = play_command.streaming,
959 end_of_stream = play_command.end_of_stream,
960 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
961 is_base64 = play_command.base64,
962 "new synthesis"
963 );
964
965 let ssrc = rand::random::<u32>();
966 let (should_interrupt, picked_ssrc) = {
967 let mut state = self.call_state.write().await;
968
969 let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
970 if play_id.is_some() && state.current_play_id != play_id {
971 (ssrc, true)
972 } else {
973 (handle.ssrc, false)
974 }
975 } else {
976 (ssrc, false)
977 };
978
979 state.auto_hangup = match auto_hangup {
980 Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
981 _ => state.auto_hangup.clone(),
982 };
983 state.wait_input_timeout = wait_input_timeout;
984
985 state.current_play_id = play_id.clone();
986 (changed, target_ssrc)
987 };
988
989 if should_interrupt {
990 let _ = self.do_interrupt(false).await;
991 }
992
993 let existing_handle = self.call_state.read().await.tts_handle.clone();
994 if let Some(tts_handle) = existing_handle {
995 match tts_handle.try_send(play_command) {
996 Ok(_) => return Ok(()),
997 Err(e) => {
998 play_command = e.0;
999 }
1000 }
1001 }
1002
1003 let (new_handle, tts_track) = StreamEngine::create_tts_track(
1004 self.app_state.stream_engine.clone(),
1005 self.cancel_token.child_token(),
1006 self.session_id.clone(),
1007 self.server_side_track_id.clone(),
1008 picked_ssrc,
1009 play_id.clone(),
1010 streaming,
1011 &play_command.option,
1012 )
1013 .await?;
1014
1015 new_handle.try_send(play_command)?;
1016 self.call_state.write().await.tts_handle = Some(new_handle);
1017 self.update_track_wrapper(tts_track, play_id).await;
1018 Ok(())
1019 }
1020
1021 async fn do_play(
1022 &self,
1023 url: String,
1024 play_id: Option<String>,
1025 auto_hangup: Option<bool>,
1026 wait_input_timeout: Option<u32>,
1027 ) -> Result<()> {
1028 let ssrc = rand::random::<u32>();
1029 info!(
1030 session_id = self.session_id,
1031 ssrc, url, play_id, auto_hangup, "play file track"
1032 );
1033
1034 let play_id = play_id.or(Some(url.clone()));
1035
1036 let file_track = FileTrack::new(self.server_side_track_id.clone())
1037 .with_play_id(play_id.clone())
1038 .with_ssrc(ssrc)
1039 .with_path(url)
1040 .with_cancel_token(self.cancel_token.child_token());
1041
1042 {
1043 let mut state = self.call_state.write().await;
1044 state.tts_handle = None;
1045 state.auto_hangup = match auto_hangup {
1046 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1047 _ => None,
1048 };
1049 state.wait_input_timeout = wait_input_timeout;
1050 }
1051
1052 self.update_track_wrapper(Box::new(file_track), play_id)
1053 .await;
1054 Ok(())
1055 }
1056
1057 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1058 self.event_sender
1059 .send(SessionEvent::AddHistory {
1060 sender: Some(self.session_id.clone()),
1061 timestamp: crate::media::get_timestamp(),
1062 speaker,
1063 text,
1064 })
1065 .map(|_| ())
1066 .map_err(Into::into)
1067 }
1068
1069 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1070 {
1071 let mut state = self.call_state.write().await;
1072 state.tts_handle = None;
1073 state.moh = None;
1074 }
1075 self.media_stream
1076 .remove_track(&self.server_side_track_id, graceful)
1077 .await;
1078 Ok(())
1079 }
1080 async fn do_pause(&self) -> Result<()> {
1081 Ok(())
1082 }
1083 async fn do_resume(&self) -> Result<()> {
1084 Ok(())
1085 }
1086 async fn do_hangup(
1087 &self,
1088 reason: Option<CallRecordHangupReason>,
1089 initiator: Option<String>,
1090 ) -> Result<()> {
1091 info!(
1092 session_id = self.session_id,
1093 ?reason,
1094 ?initiator,
1095 "do_hangup"
1096 );
1097
1098 let hangup_reason = match initiator.as_deref() {
1100 Some("caller") => CallRecordHangupReason::ByCaller,
1101 Some("callee") => CallRecordHangupReason::ByCallee,
1102 Some("system") => CallRecordHangupReason::Autohangup,
1103 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1104 };
1105
1106 self.media_stream
1107 .stop(Some(hangup_reason.to_string()), initiator);
1108
1109 self.call_state
1110 .write()
1111 .await
1112 .set_hangup_reason(hangup_reason);
1113 Ok(())
1114 }
1115
1116 async fn do_refer(
1117 &self,
1118 caller: String,
1119 callee: String,
1120 refer_option: Option<ReferOption>,
1121 ) -> Result<()> {
1122 self.do_interrupt(false).await.ok();
1123 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1124 if let Some(ref path) = moh {
1125 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1126 let fallback = "./config/sounds/refer_moh.wav";
1127 if std::path::Path::new(fallback).exists() {
1128 info!(
1129 session_id = self.session_id,
1130 "moh {} not found, using fallback {}", path, fallback
1131 );
1132 moh = Some(fallback.to_string());
1133 }
1134 }
1135 }
1136 let session_id = self.session_id.clone();
1137 let track_id = self.server_side_track_id.clone();
1138
1139 let recorder = {
1140 let cs = self.call_state.read().await;
1141 cs.option
1142 .as_ref()
1143 .map(|o| o.recorder.clone())
1144 .unwrap_or_default()
1145 };
1146
1147 let call_option = CallOption {
1148 caller: Some(caller),
1149 callee: Some(callee.clone()),
1150 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1151 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1152 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1153 recorder,
1154 ..Default::default()
1155 };
1156
1157 let mut invite_option = call_option.build_invite_option()?;
1158 invite_option.call_id = Some(self.session_id.clone());
1159
1160 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1161
1162 {
1163 let cs = self.call_state.read().await;
1164 if let Some(opt) = cs.option.as_ref() {
1165 if let Some(callee) = opt.callee.as_ref() {
1166 headers.push(rsip::Header::Other(
1167 "X-Referred-To".to_string(),
1168 callee.clone(),
1169 ));
1170 }
1171 if let Some(caller) = opt.caller.as_ref() {
1172 headers.push(rsip::Header::Other(
1173 "X-Referred-From".to_string(),
1174 caller.clone(),
1175 ));
1176 }
1177 }
1178 }
1179
1180 headers.push(rsip::Header::Other(
1181 "X-Referred-Id".to_string(),
1182 self.session_id.clone(),
1183 ));
1184
1185 let ssrc = rand::random::<u32>();
1186 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1187 start_time: Utc::now(),
1188 ssrc,
1189 option: Some(call_option.clone()),
1190 is_refer: true,
1191 ..Default::default()
1192 }));
1193
1194 {
1195 let mut cs = self.call_state.write().await;
1196 cs.refer_callstate.replace(refer_call_state.clone());
1197 }
1198
1199 let auto_hangup_requested = refer_option
1200 .as_ref()
1201 .and_then(|o| o.auto_hangup)
1202 .unwrap_or(true);
1203
1204 if auto_hangup_requested {
1205 self.call_state.write().await.auto_hangup =
1206 Some((ssrc, CallRecordHangupReason::ByRefer));
1207 } else {
1208 self.call_state.write().await.auto_hangup = None;
1209 }
1210
1211 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1212
1213 info!(
1214 session_id = self.session_id,
1215 ssrc,
1216 auto_hangup = auto_hangup_requested,
1217 callee,
1218 timeout_secs,
1219 "do_refer"
1220 );
1221
1222 let r = tokio::time::timeout(
1223 Duration::from_secs(timeout_secs as u64),
1224 self.create_outgoing_sip_track(
1225 self.cancel_token.child_token(),
1226 refer_call_state.clone(),
1227 &track_id,
1228 invite_option,
1229 &call_option,
1230 moh,
1231 auto_hangup_requested,
1232 ),
1233 )
1234 .await;
1235
1236 {
1237 self.call_state.write().await.moh = None;
1238 }
1239
1240 let result = match r {
1241 Ok(res) => res,
1242 Err(_) => {
1243 warn!(
1244 session_id = session_id,
1245 "refer sip track creation timed out after {} seconds", timeout_secs
1246 );
1247 self.event_sender
1248 .send(SessionEvent::Reject {
1249 track_id,
1250 timestamp: crate::media::get_timestamp(),
1251 reason: "Timeout when refer".into(),
1252 code: Some(408),
1253 refer: Some(true),
1254 })
1255 .ok();
1256 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1257 }
1258 };
1259
1260 match result {
1261 Ok(answer) => {
1262 self.event_sender
1263 .send(SessionEvent::Answer {
1264 timestamp: crate::media::get_timestamp(),
1265 track_id,
1266 sdp: answer,
1267 refer: Some(true),
1268 })
1269 .ok();
1270 }
1271 Err(e) => {
1272 warn!(
1273 session_id = session_id,
1274 "failed to create refer sip track: {}", e
1275 );
1276 match &e {
1277 rsipstack::Error::DialogError(reason, _, code) => {
1278 self.event_sender
1279 .send(SessionEvent::Reject {
1280 track_id,
1281 timestamp: crate::media::get_timestamp(),
1282 reason: reason.clone(),
1283 code: Some(code.code() as u32),
1284 refer: Some(true),
1285 })
1286 .ok();
1287 }
1288 _ => {}
1289 }
1290 return Err(e.into());
1291 }
1292 }
1293 Ok(())
1294 }
1295
1296 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1297 self.media_stream.mute_track(track_id).await;
1298 Ok(())
1299 }
1300
1301 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1302 self.media_stream.unmute_track(track_id).await;
1303 Ok(())
1304 }
1305
1306 pub async fn cleanup(&self) -> Result<()> {
1307 self.call_state.write().await.tts_handle = None;
1308 self.media_stream.cleanup().await.ok();
1309 Ok(())
1310 }
1311
1312 pub fn get_callrecord(&self) -> Option<CallRecord> {
1313 self.call_state.try_read().ok().map(|call_state| {
1314 call_state.build_callrecord(
1315 self.app_state.clone(),
1316 self.session_id.clone(),
1317 self.call_type.clone(),
1318 )
1319 })
1320 }
1321
1322 async fn dump_to_file(
1323 &self,
1324 dump_file: &mut File,
1325 cmd_receiver: &mut CommandReceiver,
1326 event_receiver: &mut EventReceiver,
1327 ) {
1328 loop {
1329 select! {
1330 _ = self.cancel_token.cancelled() => {
1331 break;
1332 }
1333 Ok(cmd) = cmd_receiver.recv() => {
1334 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1335 .await;
1336 }
1337 Ok(event) = event_receiver.recv() => {
1338 if matches!(event, SessionEvent::Binary{..}) {
1339 continue;
1340 }
1341 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1342 .await;
1343 }
1344 };
1345 }
1346 }
1347
1348 async fn dump_loop(
1349 &self,
1350 dump_events: bool,
1351 mut dump_cmd_receiver: CommandReceiver,
1352 mut dump_event_receiver: EventReceiver,
1353 ) {
1354 if !dump_events {
1355 return;
1356 }
1357
1358 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1359 let mut dump_file = match File::options()
1360 .create(true)
1361 .append(true)
1362 .open(&file_name)
1363 .await
1364 {
1365 Ok(file) => file,
1366 Err(e) => {
1367 warn!(
1368 session_id = self.session_id,
1369 file_name, "failed to open dump events file: {}", e
1370 );
1371 return;
1372 }
1373 };
1374 self.dump_to_file(
1375 &mut dump_file,
1376 &mut dump_cmd_receiver,
1377 &mut dump_event_receiver,
1378 )
1379 .await;
1380
1381 while let Ok(event) = dump_event_receiver.try_recv() {
1382 if matches!(event, SessionEvent::Binary { .. }) {
1383 continue;
1384 }
1385 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1386 }
1387 }
1388
1389 pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1390 let mut rtc_config = RtcTrackConfig::default();
1391 rtc_config.mode = rustrtc::TransportMode::Rtp;
1392
1393 if let Some(codecs) = &self.app_state.config.codecs {
1394 let mut codec_types = Vec::new();
1395 for c in codecs {
1396 match c.to_lowercase().as_str() {
1397 "pcmu" => codec_types.push(CodecType::PCMU),
1398 "pcma" => codec_types.push(CodecType::PCMA),
1399 "g722" => codec_types.push(CodecType::G722),
1400 "g729" => codec_types.push(CodecType::G729),
1401 "opus" => codec_types.push(CodecType::Opus),
1402 "dtmf" | "2833" | "telephone_event" => {
1403 codec_types.push(CodecType::TelephoneEvent)
1404 }
1405 _ => {}
1406 }
1407 }
1408 if !codec_types.is_empty() {
1409 rtc_config.preferred_codec = Some(codec_types[0].clone());
1410 rtc_config.codecs = codec_types;
1411 }
1412 }
1413
1414 if rtc_config.preferred_codec.is_none() {
1415 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1416 }
1417
1418 rtc_config.rtp_port_range = self
1419 .app_state
1420 .config
1421 .rtp_start_port
1422 .zip(self.app_state.config.rtp_end_port);
1423
1424 if let Some(ref external_ip) = self.app_state.config.external_ip {
1425 rtc_config.external_ip = Some(external_ip.clone());
1426 }
1427 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1428 rtc_config.bind_ip = Some(bind_ip.clone());
1429 }
1430
1431 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1432
1433 let mut track = RtcTrack::new(
1434 self.cancel_token.child_token(),
1435 track_id,
1436 self.track_config.clone(),
1437 rtc_config,
1438 )
1439 .with_ssrc(ssrc);
1440
1441 track.create().await?;
1442
1443 Ok(track)
1444 }
1445
1446 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1447 let hangup_headers = option
1448 .sip
1449 .as_ref()
1450 .and_then(|s| s.hangup_headers.as_ref())
1451 .map(|headers_map| {
1452 headers_map
1453 .iter()
1454 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1455 .collect::<Vec<rsip::Header>>()
1456 });
1457 self.call_state.write().await.option = Some(option.clone());
1458 info!(
1459 session_id = self.session_id,
1460 call_type = ?self.call_type,
1461 "setup caller track"
1462 );
1463
1464 let track = match self.call_type {
1465 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1466 ActiveCallType::WebSocket => {
1467 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1468 if let Some(receiver) = audio_receiver {
1469 Some(self.create_websocket_track(receiver).await?)
1470 } else {
1471 None
1472 }
1473 }
1474 ActiveCallType::Sip => {
1475 if let Some(dialog_id) = self
1476 .invitation
1477 .find_dialog_id_by_session_id(&self.session_id)
1478 {
1479 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1480 return self
1481 .prepare_incoming_sip_track(
1482 self.cancel_token.clone(),
1483 self.call_state.clone(),
1484 &self.session_id,
1485 pending_dialog,
1486 hangup_headers,
1487 )
1488 .await;
1489 }
1490 }
1491
1492 let mut option = option.clone();
1494 if option.sip.is_none()
1495 || option
1496 .sip
1497 .as_ref()
1498 .and_then(|s| s.username.as_ref())
1499 .is_none()
1500 {
1501 if let Some(callee) = &option.callee {
1502 if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1503 if option.sip.is_none() {
1504 option.sip = Some(crate::SipOption {
1505 username: Some(cred.username.clone()),
1506 password: Some(cred.password.clone()),
1507 realm: cred.realm.clone(),
1508 ..Default::default()
1509 });
1510 }
1511 }
1512 }
1513 }
1514
1515 let mut invite_option = option.build_invite_option()?;
1516 invite_option.call_id = Some(self.session_id.clone());
1517
1518 match self
1519 .create_outgoing_sip_track(
1520 self.cancel_token.clone(),
1521 self.call_state.clone(),
1522 &self.session_id,
1523 invite_option,
1524 &option,
1525 None,
1526 false,
1527 )
1528 .await
1529 {
1530 Ok(answer) => {
1531 self.event_sender
1532 .send(SessionEvent::Answer {
1533 timestamp: crate::media::get_timestamp(),
1534 track_id: self.session_id.clone(),
1535 sdp: answer,
1536 refer: Some(false),
1537 })
1538 .ok();
1539 return Ok(());
1540 }
1541 Err(e) => {
1542 warn!(
1543 session_id = self.session_id,
1544 "failed to create sip track: {}", e
1545 );
1546 match &e {
1547 rsipstack::Error::DialogError(reason, _, code) => {
1548 self.event_sender
1549 .send(SessionEvent::Reject {
1550 track_id: self.session_id.clone(),
1551 timestamp: crate::media::get_timestamp(),
1552 reason: reason.clone(),
1553 code: Some(code.code() as u32),
1554 refer: Some(false),
1555 })
1556 .ok();
1557 }
1558 _ => {}
1559 }
1560 return Err(e.into());
1561 }
1562 }
1563 }
1564 ActiveCallType::B2bua => {
1565 if let Some(dialog_id) = self
1566 .invitation
1567 .find_dialog_id_by_session_id(&self.session_id)
1568 {
1569 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1570 return self
1571 .prepare_incoming_sip_track(
1572 self.cancel_token.clone(),
1573 self.call_state.clone(),
1574 &self.session_id,
1575 pending_dialog,
1576 hangup_headers,
1577 )
1578 .await;
1579 }
1580 }
1581
1582 warn!(
1583 session_id = self.session_id,
1584 "no pending dialog found for B2BUA call"
1585 );
1586 return Err(anyhow::anyhow!(
1587 "no pending dialog found for session_id: {}",
1588 self.session_id
1589 ));
1590 }
1591 };
1592 match track {
1593 Some(track) => {
1594 self.finish_caller_stack(&option, Some(track)).await?;
1595 }
1596 None => {
1597 warn!(session_id = self.session_id, "no track created for caller");
1598 return Err(anyhow::anyhow!("no track created for caller"));
1599 }
1600 }
1601 Ok(())
1602 }
1603
1604 async fn finish_caller_stack(
1605 &self,
1606 option: &CallOption,
1607 track: Option<Box<dyn Track>>,
1608 ) -> Result<()> {
1609 if let Some(track) = track {
1610 self.setup_track_with_stream(&option, track).await?;
1611 }
1612
1613 {
1614 let call_state = self.call_state.read().await;
1615 if let Some(ref answer) = call_state.answer {
1616 info!(
1617 session_id = self.session_id,
1618 "sending answer event: {}", answer,
1619 );
1620 self.event_sender
1621 .send(SessionEvent::Answer {
1622 timestamp: crate::media::get_timestamp(),
1623 track_id: self.session_id.clone(),
1624 sdp: answer.clone(),
1625 refer: Some(false),
1626 })
1627 .ok();
1628 } else {
1629 warn!(
1630 session_id = self.session_id,
1631 "no answer in state to send event"
1632 );
1633 }
1634 }
1635 Ok(())
1636 }
1637
1638 pub async fn setup_track_with_stream(
1639 &self,
1640 option: &CallOption,
1641 mut track: Box<dyn Track>,
1642 ) -> Result<()> {
1643 let processors = match StreamEngine::create_processors(
1644 self.app_state.stream_engine.clone(),
1645 track.as_ref(),
1646 self.cancel_token.child_token(),
1647 self.event_sender.clone(),
1648 self.media_stream.packet_sender.clone(),
1649 option,
1650 )
1651 .await
1652 {
1653 Ok(processors) => processors,
1654 Err(e) => {
1655 warn!(
1656 session_id = self.session_id,
1657 "failed to prepare stream processors: {}", e
1658 );
1659 vec![]
1660 }
1661 };
1662
1663 for processor in processors {
1665 track.append_processor(processor);
1666 }
1667
1668 self.update_track_wrapper(track, None).await;
1669 Ok(())
1670 }
1671
1672 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1673 let (ambiance_opt, subscribe) = {
1674 let state = self.call_state.read().await;
1675 let mut opt = state
1676 .option
1677 .as_ref()
1678 .and_then(|o| o.ambiance.clone())
1679 .unwrap_or_default();
1680
1681 if let Some(global) = &self.app_state.config.ambiance {
1682 opt.merge(global);
1683 }
1684
1685 let subscribe = state
1686 .option
1687 .as_ref()
1688 .and_then(|o| o.subscribe)
1689 .unwrap_or_default();
1690
1691 (opt, subscribe)
1692 };
1693 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1694 match AmbianceProcessor::new(ambiance_opt).await {
1695 Ok(ambiance) => {
1696 info!(session_id = self.session_id, "loaded ambiance processor");
1697 track.append_processor(Box::new(ambiance));
1698 }
1699 Err(e) => {
1700 tracing::error!("failed to load ambiance wav {}", e);
1701 }
1702 }
1703 }
1704
1705 if subscribe && self.call_type != ActiveCallType::WebSocket {
1706 let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1707 (0, self.server_side_track_id.clone())
1708 } else {
1709 (1, self.session_id.clone())
1710 };
1711 let sub_processor =
1712 SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1713 track.append_processor(Box::new(sub_processor));
1714 }
1715
1716 self.call_state.write().await.current_play_id = play_id.clone();
1717 self.media_stream.update_track(track, play_id).await;
1718 }
1719
1720 pub async fn create_websocket_track(
1721 &self,
1722 audio_receiver: WebsocketBytesReceiver,
1723 ) -> Result<Box<dyn Track>> {
1724 let (ssrc, codec) = {
1725 let call_state = self.call_state.read().await;
1726 (
1727 call_state.ssrc,
1728 call_state
1729 .option
1730 .as_ref()
1731 .map(|o| o.codec.clone())
1732 .unwrap_or_default(),
1733 )
1734 };
1735
1736 let ws_track = WebsocketTrack::new(
1737 self.cancel_token.child_token(),
1738 self.session_id.clone(),
1739 self.track_config.clone(),
1740 self.event_sender.clone(),
1741 audio_receiver,
1742 codec,
1743 ssrc,
1744 );
1745
1746 {
1747 let mut call_state = self.call_state.write().await;
1748 call_state.answer_time = Some(Utc::now());
1749 call_state.answer = Some("".to_string());
1750 call_state.last_status_code = 200;
1751 }
1752
1753 Ok(Box::new(ws_track))
1754 }
1755
1756 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1757 let (ssrc, option) = {
1758 let call_state = self.call_state.read().await;
1759 (
1760 call_state.ssrc,
1761 call_state.option.clone().unwrap_or_default(),
1762 )
1763 };
1764
1765 let mut rtc_config = RtcTrackConfig::default();
1766 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1768
1769 if let Some(codecs) = &self.app_state.config.codecs {
1770 let mut codec_types = Vec::new();
1771 for c in codecs {
1772 match c.to_lowercase().as_str() {
1773 "pcmu" => codec_types.push(CodecType::PCMU),
1774 "pcma" => codec_types.push(CodecType::PCMA),
1775 "g722" => codec_types.push(CodecType::G722),
1776 "g729" => codec_types.push(CodecType::G729),
1777 "opus" => codec_types.push(CodecType::Opus),
1778 "dtmf" | "2833" | "telephone_event" => {
1779 codec_types.push(CodecType::TelephoneEvent)
1780 }
1781 _ => {}
1782 }
1783 }
1784 if !codec_types.is_empty() {
1785 rtc_config.preferred_codec = Some(codec_types[0].clone());
1786 rtc_config.codecs = codec_types;
1787 }
1788 }
1789
1790 if let Some(ref external_ip) = self.app_state.config.external_ip {
1791 rtc_config.external_ip = Some(external_ip.clone());
1792 }
1793 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1794 rtc_config.bind_ip = Some(bind_ip.clone());
1795 }
1796
1797 let mut webrtc_track = RtcTrack::new(
1798 self.cancel_token.child_token(),
1799 self.session_id.clone(),
1800 self.track_config.clone(),
1801 rtc_config,
1802 )
1803 .with_ssrc(ssrc);
1804
1805 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1806 let offer = match option.enable_ipv6 {
1807 Some(false) | None => {
1808 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1809 }
1810 _ => option.offer.clone().unwrap_or("".to_string()),
1811 };
1812 let answer: Option<String>;
1813 match webrtc_track.handshake(offer, timeout).await {
1814 Ok(answer_sdp) => {
1815 answer = match option.enable_ipv6 {
1816 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1817 Some(true) => Some(answer_sdp.to_string()),
1818 };
1819 }
1820 Err(e) => {
1821 warn!(session_id = self.session_id, "failed to setup track: {}", e);
1822 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1823 }
1824 }
1825
1826 {
1827 let mut call_state = self.call_state.write().await;
1828 call_state.answer_time = Some(Utc::now());
1829 call_state.answer = answer;
1830 call_state.last_status_code = 200;
1831 }
1832 Ok(Box::new(webrtc_track))
1833 }
1834
1835 async fn create_outgoing_sip_track(
1836 &self,
1837 cancel_token: CancellationToken,
1838 call_state_ref: ActiveCallStateRef,
1839 track_id: &String,
1840 mut invite_option: InviteOption,
1841 call_option: &CallOption,
1842 moh: Option<String>,
1843 auto_hangup: bool,
1844 ) -> Result<String, rsipstack::Error> {
1845 let ssrc = call_state_ref.read().await.ssrc;
1846 let rtp_track = self
1847 .create_rtp_track(track_id.clone(), ssrc)
1848 .await
1849 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1850
1851 let offer = Some(
1852 rtp_track
1853 .local_description()
1854 .await
1855 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1856 );
1857
1858 {
1859 let mut cs = call_state_ref.write().await;
1860 if let Some(o) = cs.option.as_mut() {
1861 o.offer = offer.clone();
1862 }
1863 cs.start_time = Utc::now();
1864 };
1865
1866 invite_option.offer = offer.clone().map(|s| s.into());
1867
1868 let needs_contact = invite_option.contact.scheme.is_none()
1871 || invite_option
1872 .contact
1873 .host_with_port
1874 .to_string()
1875 .starts_with("127.0.0.1");
1876
1877 if needs_contact {
1878 if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1879 invite_option.contact = rsip::Uri {
1880 scheme: Some(rsip::Scheme::Sip),
1881 auth: None,
1882 host_with_port: addr.addr.clone(),
1883 params: vec![],
1884 headers: vec![],
1885 };
1886 }
1887 }
1888
1889 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1890
1891 if let Some(moh) = moh {
1892 let ssrc_and_moh = {
1893 let mut state = call_state_ref.write().await;
1894 state.moh = Some(moh.clone());
1895 if state.current_play_id.is_none() {
1896 let ssrc = rand::random::<u32>();
1897 Some((ssrc, moh.clone()))
1898 } else {
1899 info!(
1900 session_id = self.session_id,
1901 "Something is playing, MOH will start after it ends"
1902 );
1903 None
1904 }
1905 };
1906
1907 if let Some((ssrc, moh_path)) = ssrc_and_moh {
1908 let file_track = FileTrack::new(self.server_side_track_id.clone())
1909 .with_play_id(Some(moh_path.clone()))
1910 .with_ssrc(ssrc)
1911 .with_path(moh_path.clone())
1912 .with_cancel_token(self.cancel_token.child_token());
1913 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1914 .await;
1915 }
1916 } else {
1917 let track = rtp_track_to_setup.take().unwrap();
1918 self.setup_track_with_stream(&call_option, track)
1919 .await
1920 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1921 }
1922
1923 info!(
1924 session_id = self.session_id,
1925 track_id,
1926 contact = %invite_option.contact,
1927 "invite {} -> {} offer: \n{}",
1928 invite_option.caller,
1929 invite_option.callee,
1930 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1931 );
1932
1933 let (dlg_state_sender, dlg_state_receiver) =
1934 self.invitation.dialog_layer.new_dialog_state_channel();
1935
1936 let states = InviteDialogStates {
1937 is_client: true,
1938 session_id: self.session_id.clone(),
1939 track_id: track_id.clone(),
1940 event_sender: self.event_sender.clone(),
1941 media_stream: self.media_stream.clone(),
1942 call_state: call_state_ref.clone(),
1943 cancel_token,
1944 terminated_reason: None,
1945 has_early_media: false,
1946 };
1947
1948 let hangup_headers = call_option
1949 .sip
1950 .as_ref()
1951 .and_then(|s| s.hangup_headers.as_ref())
1952 .map(|headers_map| {
1953 headers_map
1954 .iter()
1955 .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1956 .collect::<Vec<rsip::Header>>()
1957 });
1958
1959 let mut client_dialog_handler = DialogStateReceiverGuard::new(
1960 self.invitation.dialog_layer.clone(),
1961 dlg_state_receiver,
1962 hangup_headers,
1963 );
1964
1965 crate::spawn(async move {
1966 client_dialog_handler.process_dialog(states).await;
1967 });
1968
1969 let (dialog_id, answer) = self
1970 .invitation
1971 .invite(invite_option, dlg_state_sender)
1972 .await?;
1973
1974 self.call_state.write().await.moh = None;
1975
1976 if let Some(track) = rtp_track_to_setup {
1977 self.setup_track_with_stream(&call_option, track)
1978 .await
1979 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1980 }
1981
1982 let answer = match answer {
1983 Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1984 None => {
1985 warn!(session_id = self.session_id, "no answer received");
1986 return Err(rsipstack::Error::DialogError(
1987 "No answer received".to_string(),
1988 dialog_id,
1989 rsip::StatusCode::NotAcceptableHere,
1990 ));
1991 }
1992 };
1993
1994 {
1995 let mut cs = call_state_ref.write().await;
1996 if cs.answer.is_none() {
1997 cs.answer = Some(answer.clone());
1998 }
1999 if auto_hangup {
2000 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2001 }
2002 }
2003
2004 self.media_stream
2005 .update_remote_description(&track_id, &answer)
2006 .await
2007 .ok();
2008
2009 Ok(answer)
2010 }
2011
2012 pub fn is_webrtc_sdp(sdp: &str) -> bool {
2014 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2015 && sdp.contains("a=fingerprint:")
2016 }
2017
2018 pub async fn setup_answer_track(
2019 &self,
2020 ssrc: u32,
2021 option: &CallOption,
2022 offer: String,
2023 ) -> Result<(String, Box<dyn Track>)> {
2024 let offer = match option.enable_ipv6 {
2025 Some(false) | None => strip_ipv6_candidates(&offer),
2026 _ => offer.clone(),
2027 };
2028
2029 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2030
2031 let mut media_track = if Self::is_webrtc_sdp(&offer) {
2032 let mut rtc_config = RtcTrackConfig::default();
2033 rtc_config.mode = rustrtc::TransportMode::WebRtc;
2034 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2035 if let Some(ref external_ip) = self.app_state.config.external_ip {
2036 rtc_config.external_ip = Some(external_ip.clone());
2037 }
2038 if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2039 rtc_config.bind_ip = Some(bind_ip.clone());
2040 }
2041 rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2042
2043 let webrtc_track = RtcTrack::new(
2044 self.cancel_token.child_token(),
2045 self.session_id.clone(),
2046 self.track_config.clone(),
2047 rtc_config,
2048 )
2049 .with_ssrc(ssrc);
2050
2051 Box::new(webrtc_track) as Box<dyn Track>
2052 } else {
2053 let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
2054 Box::new(rtp_track) as Box<dyn Track>
2055 };
2056
2057 let answer = match media_track.handshake(offer.clone(), timeout).await {
2058 Ok(answer) => answer,
2059 Err(e) => {
2060 return Err(anyhow::anyhow!("handshake failed: {e}"));
2061 }
2062 };
2063
2064 return Ok((answer, media_track));
2065 }
2066
2067 pub async fn prepare_incoming_sip_track(
2068 &self,
2069 cancel_token: CancellationToken,
2070 call_state_ref: ActiveCallStateRef,
2071 track_id: &String,
2072 pending_dialog: PendingDialog,
2073 hangup_headers: Option<Vec<rsip::Header>>,
2074 ) -> Result<()> {
2075 let state_receiver = pending_dialog.state_receiver;
2076 let states = InviteDialogStates {
2079 is_client: false,
2080 session_id: self.session_id.clone(),
2081 track_id: track_id.clone(),
2082 event_sender: self.event_sender.clone(),
2083 media_stream: self.media_stream.clone(),
2084 call_state: self.call_state.clone(),
2085 cancel_token,
2086 terminated_reason: None,
2087 has_early_media: false,
2088 };
2089
2090 let initial_request = pending_dialog.dialog.initial_request();
2091 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2092
2093 let (ssrc, option) = {
2094 let call_state = call_state_ref.read().await;
2095 (
2096 call_state.ssrc,
2097 call_state.option.clone().unwrap_or_default(),
2098 )
2099 };
2100
2101 match self.setup_answer_track(ssrc, &option, offer).await {
2102 Ok((offer, track)) => {
2103 self.setup_track_with_stream(&option, track).await?;
2104 {
2105 let mut state = self.call_state.write().await;
2106 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2107 }
2108 }
2109 Err(e) => {
2110 return Err(anyhow::anyhow!("error creating track: {}", e));
2111 }
2112 }
2113
2114 let mut client_dialog_handler = DialogStateReceiverGuard::new(
2115 self.invitation.dialog_layer.clone(),
2116 state_receiver,
2117 hangup_headers,
2118 );
2119
2120 crate::spawn(async move {
2121 client_dialog_handler.process_dialog(states).await;
2122 });
2123 Ok(())
2124 }
2125}
2126
2127impl Drop for ActiveCall {
2128 fn drop(&mut self) {
2129 info!(session_id = self.session_id, "dropping active call");
2130 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2131 if let Some(record) = self.get_callrecord() {
2132 if let Err(e) = sender.send(record) {
2133 warn!(
2134 session_id = self.session_id,
2135 "failed to send call record: {}", e
2136 );
2137 }
2138 }
2139 }
2140 }
2141}
2142
2143impl ActiveCallState {
2144 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2145 if let Some(existing) = &self.option {
2146 if option.asr.is_none() {
2147 option.asr = existing.asr.clone();
2148 }
2149 if option.tts.is_none() {
2150 option.tts = existing.tts.clone();
2151 }
2152 if option.vad.is_none() {
2153 option.vad = existing.vad.clone();
2154 }
2155 if option.denoise.is_none() {
2156 option.denoise = existing.denoise;
2157 }
2158 if option.recorder.is_none() {
2159 option.recorder = existing.recorder.clone();
2160 }
2161 if option.eou.is_none() {
2162 option.eou = existing.eou.clone();
2163 }
2164 if option.extra.is_none() {
2165 option.extra = existing.extra.clone();
2166 }
2167 if option.ambiance.is_none() {
2168 option.ambiance = existing.ambiance.clone();
2169 }
2170 }
2171 option
2172 }
2173
2174 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2175 if self.hangup_reason.is_none() {
2176 self.hangup_reason = Some(reason);
2177 }
2178 }
2179
2180 pub fn build_hangup_event(
2181 &self,
2182 track_id: TrackId,
2183 initiator: Option<String>,
2184 ) -> crate::event::SessionEvent {
2185 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2186 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2187 let extra = self.extras.clone();
2188
2189 crate::event::SessionEvent::Hangup {
2190 track_id,
2191 timestamp: crate::media::get_timestamp(),
2192 reason: Some(format!("{:?}", self.hangup_reason)),
2193 initiator,
2194 start_time: self.start_time.to_rfc3339(),
2195 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2196 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2197 hangup_time: Utc::now().to_rfc3339(),
2198 extra,
2199 from: from.map(|f| f.into()),
2200 to: to.map(|f| f.into()),
2201 refer: Some(self.is_refer),
2202 }
2203 }
2204
2205 pub fn build_callrecord(
2206 &self,
2207 app_state: AppState,
2208 session_id: String,
2209 call_type: ActiveCallType,
2210 ) -> CallRecord {
2211 let option = self.option.clone().unwrap_or_default();
2212 let recorder = if option.recorder.is_some() {
2213 let recorder_file = app_state.get_recorder_file(&session_id);
2214 if std::path::Path::new(&recorder_file).exists() {
2215 let file_size = std::fs::metadata(&recorder_file)
2216 .map(|m| m.len())
2217 .unwrap_or(0);
2218 vec![crate::callrecord::CallRecordMedia {
2219 track_id: session_id.clone(),
2220 path: recorder_file,
2221 size: file_size,
2222 extra: None,
2223 }]
2224 } else {
2225 vec![]
2226 }
2227 } else {
2228 vec![]
2229 };
2230
2231 let dump_event_file = app_state.get_dump_events_file(&session_id);
2232 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2233 Some(dump_event_file)
2234 } else {
2235 None
2236 };
2237
2238 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2239 if let Ok(rc) = rc.try_read() {
2240 Some(Box::new(rc.build_callrecord(
2241 app_state.clone(),
2242 rc.session_id.clone(),
2243 ActiveCallType::B2bua,
2244 )))
2245 } else {
2246 None
2247 }
2248 });
2249
2250 let caller = option.caller.clone().unwrap_or_default();
2251 let callee = option.callee.clone().unwrap_or_default();
2252
2253 CallRecord {
2254 option: Some(option),
2255 call_id: session_id,
2256 call_type,
2257 start_time: self.start_time,
2258 ring_time: self.ring_time.clone(),
2259 answer_time: self.answer_time.clone(),
2260 end_time: Utc::now(),
2261 caller,
2262 callee,
2263 hangup_reason: self.hangup_reason.clone(),
2264 hangup_messages: Vec::new(),
2265 status_code: self.last_status_code,
2266 extras: self.extras.clone(),
2267 dump_event_file,
2268 recorder,
2269 refer_callrecord,
2270 }
2271 }
2272}