1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 recorder::RecorderOption,
11 stream::{MediaStream, MediaStreamBuilder},
12 track::{
13 Track, TrackConfig,
14 file::FileTrack,
15 media_pass::MediaPassTrack,
16 rtc::{RtcTrack, RtcTrackConfig},
17 tts::SynthesisHandle,
18 websocket::{WebsocketBytesReceiver, WebsocketTrack},
19 },
20 },
21 synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24 app::AppState,
25 call::{
26 CommandReceiver, CommandSender,
27 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28 },
29 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30 useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[derive(Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CallParams {
45 pub id: Option<String>,
46 #[serde(rename = "dump")]
47 pub dump_events: Option<bool>,
48 #[serde(rename = "ping")]
49 pub ping_interval: Option<u32>,
50 pub server_side_track: Option<String>,
51}
52
53#[derive(Debug, Serialize, Deserialize, Clone, Default)]
54#[serde(rename_all = "camelCase")]
55pub enum ActiveCallType {
56 Webrtc,
57 B2bua,
58 WebSocket,
59 #[default]
60 Sip,
61}
62
63#[derive(Default)]
64pub struct ActiveCallState {
65 pub session_id: String,
66 pub start_time: DateTime<Utc>,
67 pub ring_time: Option<DateTime<Utc>>,
68 pub answer_time: Option<DateTime<Utc>>,
69 pub hangup_reason: Option<CallRecordHangupReason>,
70 pub last_status_code: u16,
71 pub option: Option<CallOption>,
72 pub answer: Option<String>,
73 pub ssrc: u32,
74 pub refer_callstate: Option<ActiveCallStateRef>,
75 pub extras: Option<HashMap<String, serde_json::Value>>,
76 pub is_refer: bool,
77
78 pub tts_handle: Option<SynthesisHandle>,
80 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
81 pub wait_input_timeout: Option<u32>,
82 pub moh: Option<String>,
83 pub current_play_id: Option<String>,
84 pub audio_receiver: Option<WebsocketBytesReceiver>,
85 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
86}
87
88pub type ActiveCallRef = Arc<ActiveCall>;
89pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
90
91pub struct ActiveCall {
92 pub call_state: ActiveCallStateRef,
93 pub cancel_token: CancellationToken,
94 pub call_type: ActiveCallType,
95 pub session_id: String,
96 pub media_stream: Arc<MediaStream>,
97 pub track_config: TrackConfig,
98 pub event_sender: EventSender,
99 pub app_state: AppState,
100 pub invitation: Invitation,
101 pub cmd_sender: CommandSender,
102 pub dump_events: bool,
103 pub server_side_track_id: TrackId,
104}
105
106pub struct ActiveCallGuard {
107 pub call: ActiveCallRef,
108 pub active_calls: usize,
109}
110
111impl ActiveCallGuard {
112 pub fn new(call: ActiveCallRef) -> Self {
113 let active_calls = {
114 call.app_state
115 .total_calls
116 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117 let mut calls = call.app_state.active_calls.lock().unwrap();
118 calls.insert(call.session_id.clone(), call.clone());
119 calls.len()
120 };
121 Self { call, active_calls }
122 }
123}
124
125impl Drop for ActiveCallGuard {
126 fn drop(&mut self) {
127 self.call
128 .app_state
129 .active_calls
130 .lock()
131 .unwrap()
132 .remove(&self.call.session_id);
133 }
134}
135
136pub struct ActiveCallReceiver {
137 pub cmd_receiver: CommandReceiver,
138 pub dump_cmd_receiver: CommandReceiver,
139 pub dump_event_receiver: EventReceiver,
140}
141
142impl ActiveCall {
143 pub fn new(
144 call_type: ActiveCallType,
145 cancel_token: CancellationToken,
146 session_id: String,
147 invitation: Invitation,
148 app_state: AppState,
149 track_config: TrackConfig,
150 audio_receiver: Option<WebsocketBytesReceiver>,
151 dump_events: bool,
152 server_side_track_id: Option<TrackId>,
153 extras: Option<HashMap<String, serde_json::Value>>,
154 ) -> Self {
155 let event_sender = crate::event::create_event_sender();
156 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
157 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
158 .with_id(session_id.clone())
159 .with_cancel_token(cancel_token.child_token());
160 let media_stream = Arc::new(media_stream_builder.build());
161 let call_state = Arc::new(RwLock::new(ActiveCallState {
162 session_id: session_id.clone(),
163 start_time: Utc::now(),
164 ssrc: rand::random::<u32>(),
165 extras,
166 audio_receiver,
167 ..Default::default()
168 }));
169 Self {
170 cancel_token,
171 call_type,
172 session_id,
173 call_state,
174 media_stream,
175 track_config,
176 event_sender,
177 app_state,
178 invitation,
179 cmd_sender,
180 dump_events,
181 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
182 }
183 }
184
185 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
186 self.cmd_sender
187 .send(command)
188 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
189 Ok(())
190 }
191
192 pub fn new_receiver(&self) -> ActiveCallReceiver {
196 ActiveCallReceiver {
197 cmd_receiver: self.cmd_sender.subscribe(),
198 dump_cmd_receiver: self.cmd_sender.subscribe(),
199 dump_event_receiver: self.event_sender.subscribe(),
200 }
201 }
202
203 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
204 let ActiveCallReceiver {
205 mut cmd_receiver,
206 dump_cmd_receiver,
207 dump_event_receiver,
208 } = receiver;
209
210 let process_command_loop = async move {
211 while let Ok(command) = cmd_receiver.recv().await {
212 match self.dispatch(command).await {
213 Ok(_) => (),
214 Err(e) => {
215 warn!(session_id = self.session_id, "{}", e);
216 self.event_sender
217 .send(SessionEvent::Error {
218 track_id: self.session_id.clone(),
219 timestamp: crate::media::get_timestamp(),
220 sender: "command".to_string(),
221 error: e.to_string(),
222 code: None,
223 })
224 .ok();
225 }
226 }
227 }
228 };
229 self.app_state
230 .total_calls
231 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
232
233 tokio::join!(
234 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
235 async {
236 select! {
237 _ = process_command_loop => {
238 info!(session_id = self.session_id, "command loop done");
239 }
240 _ = self.process() => {
241 info!(session_id = self.session_id, "call serve done");
242 }
243 _ = self.cancel_token.cancelled() => {
244 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
245 }
246 }
247 self.cancel_token.cancel();
248 }
249 );
250 Ok(())
251 }
252
253 async fn process(&self) -> Result<()> {
254 let mut event_receiver = self.event_sender.subscribe();
255
256 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
257 let input_timeout_expire_ref = input_timeout_expire.clone();
258 let event_sender = self.event_sender.clone();
259 let wait_input_timeout_loop = async {
260 loop {
261 let (start_time, expire) = { *input_timeout_expire.lock().await };
262 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
263 info!(session_id = self.session_id, "wait input timeout reached");
264 *input_timeout_expire.lock().await = (0, 0);
265 event_sender
266 .send(SessionEvent::Silence {
267 track_id: self.server_side_track_id.clone(),
268 timestamp: crate::media::get_timestamp(),
269 start_time,
270 duration: expire as u64,
271 samples: None,
272 })
273 .ok();
274 }
275 sleep(Duration::from_millis(100)).await;
276 }
277 };
278 let server_side_track_id = self.server_side_track_id.clone();
279 let event_hook_loop = async move {
280 while let Ok(event) = event_receiver.recv().await {
281 match event {
282 SessionEvent::Speaking { .. }
283 | SessionEvent::Dtmf { .. }
284 | SessionEvent::AsrDelta { .. }
285 | SessionEvent::AsrFinal { .. }
286 | SessionEvent::TrackStart { .. } => {
287 *input_timeout_expire_ref.lock().await = (0, 0);
288 }
289 SessionEvent::TrackEnd {
290 track_id,
291 play_id,
292 ssrc,
293 ..
294 } => {
295 if track_id != server_side_track_id {
296 continue;
297 }
298
299 let (moh_path, auto_hangup, wait_timeout_val) = {
300 let mut state = self.call_state.write().await;
301 if play_id != state.current_play_id {
302 debug!(
303 session_id = self.session_id,
304 ?play_id,
305 current = ?state.current_play_id,
306 "ignoring interrupted track end"
307 );
308 continue;
309 }
310 state.current_play_id = None;
311 (
312 state.moh.clone(),
313 state.auto_hangup.clone(),
314 state.wait_input_timeout.take(),
315 )
316 };
317
318 if let Some(path) = moh_path {
319 info!(session_id = self.session_id, "looping moh: {}", path);
320 let ssrc = rand::random::<u32>();
321 let file_track = FileTrack::new(self.server_side_track_id.clone())
322 .with_play_id(Some(path.clone()))
323 .with_ssrc(ssrc)
324 .with_path(path.clone())
325 .with_cancel_token(self.cancel_token.child_token());
326 self.update_track_wrapper(Box::new(file_track), Some(path))
327 .await;
328 continue;
329 }
330
331 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
332 if hangup_ssrc == ssrc {
333 info!(
334 session_id = self.session_id,
335 ssrc, "auto hangup when track end track_id:{}", track_id
336 );
337 self.do_hangup(Some(hangup_reason), None).await.ok();
338 }
339 }
340
341 if let Some(timeout) = wait_timeout_val {
342 let expire = if timeout > 0 {
343 (crate::media::get_timestamp(), timeout)
344 } else {
345 (0, 0)
346 };
347 *input_timeout_expire_ref.lock().await = expire;
348 }
349 }
350 SessionEvent::Interrupt { receiver } => {
351 let track_id =
352 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
353 if track_id == self.server_side_track_id {
354 debug!(
355 session_id = self.session_id,
356 "received interrupt event, stopping playback"
357 );
358 self.do_interrupt(true).await.ok();
359 }
360 }
361 SessionEvent::Inactivity { track_id, .. } => {
362 info!(
363 session_id = self.session_id,
364 track_id, "inactivity timeout reached, hanging up"
365 );
366 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
367 .await
368 .ok();
369 }
370 SessionEvent::Error { track_id, .. } => {
371 if track_id != server_side_track_id {
372 continue;
373 }
374
375 let moh_info = {
376 let mut state = self.call_state.write().await;
377 if let Some(path) = state.moh.clone() {
378 let fallback = "./config/sounds/refer_moh.wav".to_string();
379 let next_path = if path != fallback
380 && std::path::Path::new(&fallback).exists()
381 {
382 info!(
383 session_id = self.session_id,
384 "moh error, switching to fallback: {}", fallback
385 );
386 state.moh = Some(fallback.clone());
387 fallback
388 } else {
389 info!(
390 session_id = self.session_id,
391 "looping moh on error: {}", path
392 );
393 path
394 };
395 Some(next_path)
396 } else {
397 None
398 }
399 };
400
401 if let Some(next_path) = moh_info {
402 let ssrc = rand::random::<u32>();
403 let file_track = FileTrack::new(self.server_side_track_id.clone())
404 .with_play_id(Some(next_path.clone()))
405 .with_ssrc(ssrc)
406 .with_path(next_path.clone())
407 .with_cancel_token(self.cancel_token.child_token());
408 self.update_track_wrapper(Box::new(file_track), Some(next_path))
409 .await;
410 continue;
411 }
412 }
413 _ => {}
414 }
415 }
416 };
417
418 select! {
419 _ = wait_input_timeout_loop=>{
420 info!(session_id = self.session_id, "wait input timeout loop done");
421 }
422 _ = self.media_stream.serve() => {
423 info!(session_id = self.session_id, "media stream loop done");
424 }
425 _ = event_hook_loop => {
426 info!(session_id = self.session_id, "event loop done");
427 }
428 }
429 Ok(())
430 }
431
432 async fn dispatch(&self, command: Command) -> Result<()> {
433 match command {
434 Command::Invite { option } => self.do_invite(option).await,
435 Command::Accept { option } => self.do_accept(option).await,
436 Command::Reject { reason, code } => {
437 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
438 .await
439 }
440 Command::Ringing {
441 ringtone,
442 recorder,
443 early_media,
444 } => self.do_ringing(ringtone, recorder, early_media).await,
445 Command::Tts {
446 text,
447 speaker,
448 play_id,
449 auto_hangup,
450 streaming,
451 end_of_stream,
452 option,
453 wait_input_timeout,
454 base64,
455 } => {
456 self.do_tts(
457 text,
458 speaker,
459 play_id,
460 auto_hangup,
461 streaming.unwrap_or_default(),
462 end_of_stream.unwrap_or_default(),
463 option,
464 wait_input_timeout,
465 base64.unwrap_or_default(),
466 )
467 .await
468 }
469 Command::Play {
470 url,
471 play_id,
472 auto_hangup,
473 wait_input_timeout,
474 } => {
475 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
476 .await
477 }
478 Command::Hangup { reason, initiator } => {
479 let reason = reason.map(|r| {
480 r.parse::<CallRecordHangupReason>()
481 .unwrap_or(CallRecordHangupReason::BySystem)
482 });
483 self.do_hangup(reason, initiator).await
484 }
485 Command::Refer {
486 caller,
487 callee,
488 options,
489 } => self.do_refer(caller, callee, options).await,
490 Command::Mute { track_id } => self.do_mute(track_id).await,
491 Command::Unmute { track_id } => self.do_unmute(track_id).await,
492 Command::Pause {} => self.do_pause().await,
493 Command::Resume {} => self.do_resume().await,
494 Command::Interrupt {
495 graceful: passage,
496 fade_out_ms: _,
497 } => self.do_interrupt(passage.unwrap_or_default()).await,
498 Command::History { speaker, text } => self.do_history(speaker, text).await,
499 }
500 }
501
502 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
503 if let Some(recorder_option) = &option.recorder {
504 let mut recorder_file = recorder_option.recorder_file.clone();
505 if recorder_file.contains("{id}") {
506 recorder_file = recorder_file.replace("{id}", &self.session_id);
507 }
508
509 let recorder_file = if recorder_file.is_empty() {
510 self.app_state.get_recorder_file(&self.session_id)
511 } else {
512 let p = Path::new(&recorder_file);
513 p.is_absolute()
514 .then(|| recorder_file.clone())
515 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
516 };
517 info!(
518 session_id = self.session_id,
519 recorder_file, "created recording file"
520 );
521
522 let track_samplerate = self.track_config.samplerate;
523 let recorder_samplerate = if track_samplerate > 0 {
524 track_samplerate
525 } else {
526 recorder_option.samplerate
527 };
528 let recorder_ptime = if recorder_option.ptime == 0 {
529 200
530 } else {
531 recorder_option.ptime
532 };
533 let requested_format = recorder_option
534 .format
535 .unwrap_or(self.app_state.config.recorder_format());
536 let format = requested_format.effective();
537 if requested_format != format {
538 warn!(
539 session_id = self.session_id,
540 requested = requested_format.extension(),
541 "Recorder format fallback to wav due to unsupported feature"
542 );
543 }
544 let mut recorder_config = RecorderOption {
545 recorder_file,
546 samplerate: recorder_samplerate,
547 ptime: recorder_ptime,
548 format: Some(format),
549 };
550 recorder_config.ensure_path_extension(format);
551 Some(recorder_config)
552 } else {
553 None
554 }
555 }
556
557 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
558 {
560 let state = self.call_state.read().await;
561 option = state.merge_option(option);
562 }
563
564 option.check_default();
565 if let Some(opt) = self.build_record_option(&option) {
566 self.media_stream.update_recorder_option(opt).await;
567 }
568
569 if let Some(opt) = &option.media_pass {
570 let track_id = self.server_side_track_id.clone();
571 let cancel_token = self.cancel_token.child_token();
572 let ssrc = rand::random::<u32>();
573 let media_pass_track = MediaPassTrack::new(
574 self.session_id.clone(),
575 ssrc,
576 track_id,
577 cancel_token,
578 opt.clone(),
579 );
580 self.update_track_wrapper(Box::new(media_pass_track), None)
581 .await;
582 }
583
584 info!(
585 session_id = self.session_id,
586 call_type = ?self.call_type,
587 sender,
588 ?option,
589 "caller with option"
590 );
591
592 match self.setup_caller_track(&option).await {
593 Ok(_) => return Ok(option),
594 Err(e) => {
595 self.app_state
596 .total_failed_calls
597 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
598 let error_event = crate::event::SessionEvent::Error {
599 track_id: self.session_id.clone(),
600 timestamp: crate::media::get_timestamp(),
601 sender,
602 error: e.to_string(),
603 code: None,
604 };
605 self.event_sender.send(error_event).ok();
606 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
607 .await
608 .ok();
609 return Err(e);
610 }
611 }
612 }
613
614 async fn do_invite(&self, option: CallOption) -> Result<()> {
615 self.invite_or_accept(option, "invite".to_string())
616 .await
617 .map(|_| ())
618 }
619
620 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
621 let has_pending = self.invitation.has_pending_call(&self.session_id).is_some();
622 let ready_to_answer_val = {
623 let state = self.call_state.read().await;
624 state.ready_to_answer.is_none()
625 };
626
627 if ready_to_answer_val {
628 if !has_pending {
629 warn!(session_id = self.session_id, "no pending call to accept");
631 let rejet_event = crate::event::SessionEvent::Reject {
632 track_id: self.session_id.clone(),
633 timestamp: crate::media::get_timestamp(),
634 reason: "no pending call".to_string(),
635 refer: None,
636 code: Some(486),
637 };
638 self.event_sender.send(rejet_event).ok();
639 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
640 .await
641 .ok();
642 return Err(anyhow::anyhow!("no pending call to accept"));
643 }
644 option = self.invite_or_accept(option, "accept".to_string()).await?;
645 } else {
646 option.check_default();
647 self.call_state.write().await.option = Some(option.clone());
648 }
649 info!(session_id = self.session_id, ?option, "accepting call");
650 let ready = self.call_state.write().await.ready_to_answer.take();
651 if let Some((answer, track, dialog)) = ready {
652 info!(
653 session_id = self.session_id,
654 track_id = track.as_ref().map(|t| t.id()),
655 "ready to answer with track"
656 );
657
658 let headers = vec![rsip::Header::ContentType(
659 "application/sdp".to_string().into(),
660 )];
661
662 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
663 Ok(_) => {
664 {
665 let mut state = self.call_state.write().await;
666 state.answer = Some(answer);
667 state.answer_time = Some(Utc::now());
668 }
669 self.finish_caller_stack(&option, track).await?;
670 }
671 Err(e) => {
672 warn!(session_id = self.session_id, "failed to accept call: {}", e);
673 return Err(anyhow::anyhow!("failed to accept call"));
674 }
675 }
676 }
677 return Ok(());
678 }
679
680 async fn do_reject(
681 &self,
682 code: Option<rsip::StatusCode>,
683 reason: Option<String>,
684 ) -> Result<()> {
685 match self.invitation.has_pending_call(&self.session_id) {
686 Some(id) => {
687 info!(
688 session_id = self.session_id,
689 ?reason,
690 ?code,
691 "rejecting call"
692 );
693 self.invitation.hangup(id, code, reason).await
694 }
695 None => Ok(()),
696 }
697 }
698
699 async fn do_ringing(
700 &self,
701 ringtone: Option<String>,
702 recorder: Option<RecorderOption>,
703 early_media: Option<bool>,
704 ) -> Result<()> {
705 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
706 if ready_to_answer_val {
707 let option = CallOption {
708 recorder,
709 ..Default::default()
710 };
711 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
712 }
713
714 let state = self.call_state.read().await;
715 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
716 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
717 let headers = vec![rsip::Header::ContentType(
718 "application/sdp".to_string().into(),
719 )];
720 (Some(headers), Some(answer.as_bytes().to_vec()))
721 } else {
722 (None, None)
723 };
724
725 dialog.ringing(headers, body).ok();
726 info!(
727 session_id = self.session_id,
728 ringtone, early_media, "playing ringtone"
729 );
730 if let Some(ringtone_url) = ringtone {
731 drop(state);
732 self.do_play(ringtone_url, None, None, None).await.ok();
733 } else {
734 info!(session_id = self.session_id, "no ringtone to play");
735 }
736 }
737 Ok(())
738 }
739
740 async fn do_tts(
741 &self,
742 text: String,
743 speaker: Option<String>,
744 play_id: Option<String>,
745 auto_hangup: Option<bool>,
746 streaming: bool,
747 end_of_stream: bool,
748 option: Option<SynthesisOption>,
749 wait_input_timeout: Option<u32>,
750 base64: bool,
751 ) -> Result<()> {
752 let tts_option = {
753 let call_state = self.call_state.read().await;
754 match call_state.option.clone().unwrap_or_default().tts {
755 Some(opt) => opt.merge_with(option),
756 None => {
757 if let Some(opt) = option {
758 opt
759 } else {
760 return Err(anyhow::anyhow!("no tts option available"));
761 }
762 }
763 }
764 };
765 let speaker = match speaker {
766 Some(s) => Some(s),
767 None => tts_option.speaker.clone(),
768 };
769
770 let mut play_command = SynthesisCommand {
771 text,
772 speaker,
773 play_id: play_id.clone(),
774 streaming,
775 end_of_stream,
776 option: tts_option,
777 base64,
778 };
779 info!(
780 session_id = self.session_id,
781 provider = ?play_command.option.provider,
782 text = %play_command.text.chars().take(10).collect::<String>(),
783 speaker = play_command.speaker.as_deref(),
784 auto_hangup = auto_hangup.unwrap_or_default(),
785 play_id = play_command.play_id.as_deref(),
786 streaming = play_command.streaming,
787 end_of_stream = play_command.end_of_stream,
788 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
789 is_base64 = play_command.base64,
790 "new synthesis"
791 );
792
793 let ssrc = rand::random::<u32>();
794 let should_interrupt = {
795 let mut state = self.call_state.write().await;
796 state.auto_hangup = match auto_hangup {
797 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
798 _ => None,
799 };
800 state.wait_input_timeout = wait_input_timeout;
801
802 let changed = play_id.is_some() && state.current_play_id != play_id;
803 state.current_play_id = play_id.clone();
804 changed
805 };
806
807 if should_interrupt {
808 let _ = self.do_interrupt(false).await;
809 }
810
811 let existing_handle = self.call_state.read().await.tts_handle.clone();
812 if let Some(tts_handle) = existing_handle {
813 match tts_handle.try_send(play_command) {
814 Ok(_) => return Ok(()),
815 Err(e) => {
816 play_command = e.0;
817 }
818 }
819 }
820
821 let (new_handle, tts_track) = StreamEngine::create_tts_track(
822 self.app_state.stream_engine.clone(),
823 self.cancel_token.child_token(),
824 self.session_id.clone(),
825 self.server_side_track_id.clone(),
826 ssrc,
827 play_id.clone(),
828 streaming,
829 &play_command.option,
830 )
831 .await?;
832
833 new_handle.try_send(play_command)?;
834 self.call_state.write().await.tts_handle = Some(new_handle);
835 self.update_track_wrapper(tts_track, play_id).await;
836 Ok(())
837 }
838
839 async fn do_play(
840 &self,
841 url: String,
842 play_id: Option<String>,
843 auto_hangup: Option<bool>,
844 wait_input_timeout: Option<u32>,
845 ) -> Result<()> {
846 let ssrc = rand::random::<u32>();
847 info!(
848 session_id = self.session_id,
849 ssrc, url, play_id, auto_hangup, "play file track"
850 );
851
852 let play_id = play_id.or(Some(url.clone()));
853
854 let file_track = FileTrack::new(self.server_side_track_id.clone())
855 .with_play_id(play_id.clone())
856 .with_ssrc(ssrc)
857 .with_path(url)
858 .with_cancel_token(self.cancel_token.child_token());
859
860 {
861 let mut state = self.call_state.write().await;
862 state.tts_handle = None;
863 state.auto_hangup = match auto_hangup {
864 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
865 _ => None,
866 };
867 state.wait_input_timeout = wait_input_timeout;
868 }
869
870 self.update_track_wrapper(Box::new(file_track), play_id)
871 .await;
872 Ok(())
873 }
874
875 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
876 self.event_sender
877 .send(SessionEvent::AddHistory {
878 sender: Some(self.session_id.clone()),
879 timestamp: crate::media::get_timestamp(),
880 speaker,
881 text,
882 })
883 .map(|_| ())
884 .map_err(Into::into)
885 }
886
887 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
888 {
889 let mut state = self.call_state.write().await;
890 state.tts_handle = None;
891 state.moh = None;
892 }
893 self.media_stream
894 .remove_track(&self.server_side_track_id, graceful)
895 .await;
896 Ok(())
897 }
898 async fn do_pause(&self) -> Result<()> {
899 Ok(())
900 }
901 async fn do_resume(&self) -> Result<()> {
902 Ok(())
903 }
904 async fn do_hangup(
905 &self,
906 reason: Option<CallRecordHangupReason>,
907 initiator: Option<String>,
908 ) -> Result<()> {
909 info!(
910 session_id = self.session_id,
911 ?reason,
912 ?initiator,
913 "do_hangup"
914 );
915
916 let hangup_reason = match initiator.as_deref() {
918 Some("caller") => CallRecordHangupReason::ByCaller,
919 Some("callee") => CallRecordHangupReason::ByCallee,
920 Some("system") => CallRecordHangupReason::Autohangup,
921 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
922 };
923
924 self.media_stream
925 .stop(Some(hangup_reason.to_string()), initiator);
926
927 self.call_state
928 .write()
929 .await
930 .set_hangup_reason(hangup_reason);
931 Ok(())
932 }
933
934 async fn do_refer(
935 &self,
936 caller: String,
937 callee: String,
938 refer_option: Option<ReferOption>,
939 ) -> Result<()> {
940 self.do_interrupt(false).await.ok();
941 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
942 if let Some(ref path) = moh {
943 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
944 let fallback = "./config/sounds/refer_moh.wav";
945 if std::path::Path::new(fallback).exists() {
946 info!(
947 session_id = self.session_id,
948 "moh {} not found, using fallback {}", path, fallback
949 );
950 moh = Some(fallback.to_string());
951 }
952 }
953 }
954 let session_id = self.session_id.clone();
955 let track_id = self.server_side_track_id.clone();
956
957 let recorder = {
958 let cs = self.call_state.read().await;
959 cs.option
960 .as_ref()
961 .map(|o| o.recorder.clone())
962 .unwrap_or_default()
963 };
964
965 let call_option = CallOption {
966 caller: Some(caller),
967 callee: Some(callee.clone()),
968 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
969 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
970 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
971 recorder,
972 ..Default::default()
973 };
974
975 let mut invite_option = call_option.build_invite_option()?;
976 invite_option.call_id = Some(self.session_id.clone());
977
978 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
979
980 {
981 let cs = self.call_state.read().await;
982 if let Some(opt) = cs.option.as_ref() {
983 if let Some(callee) = opt.callee.as_ref() {
984 headers.push(rsip::Header::Other(
985 "X-Referred-To".to_string(),
986 callee.clone(),
987 ));
988 }
989 if let Some(caller) = opt.caller.as_ref() {
990 headers.push(rsip::Header::Other(
991 "X-Referred-From".to_string(),
992 caller.clone(),
993 ));
994 }
995 }
996 }
997
998 headers.push(rsip::Header::Other(
999 "X-Referred-Id".to_string(),
1000 self.session_id.clone(),
1001 ));
1002
1003 let ssrc = rand::random::<u32>();
1004 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1005 start_time: Utc::now(),
1006 ssrc,
1007 option: Some(call_option.clone()),
1008 is_refer: true,
1009 ..Default::default()
1010 }));
1011
1012 {
1013 let mut cs = self.call_state.write().await;
1014 cs.refer_callstate.replace(refer_call_state.clone());
1015 }
1016
1017 let auto_hangup_requested = refer_option
1018 .as_ref()
1019 .and_then(|o| o.auto_hangup)
1020 .unwrap_or(true);
1021
1022 if auto_hangup_requested {
1023 self.call_state.write().await.auto_hangup =
1024 Some((ssrc, CallRecordHangupReason::ByRefer));
1025 } else {
1026 self.call_state.write().await.auto_hangup = None;
1027 }
1028
1029 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1030
1031 info!(
1032 session_id = self.session_id,
1033 ssrc,
1034 auto_hangup = auto_hangup_requested,
1035 callee,
1036 timeout_secs,
1037 "do_refer"
1038 );
1039
1040 let r = tokio::time::timeout(
1041 Duration::from_secs(timeout_secs as u64),
1042 self.create_outgoing_sip_track(
1043 self.cancel_token.child_token(),
1044 refer_call_state.clone(),
1045 &track_id,
1046 invite_option,
1047 &call_option,
1048 moh,
1049 auto_hangup_requested,
1050 ),
1051 )
1052 .await;
1053
1054 {
1055 self.call_state.write().await.moh = None;
1056 }
1057
1058 let result = match r {
1059 Ok(res) => res,
1060 Err(_) => {
1061 warn!(
1062 session_id = session_id,
1063 "refer sip track creation timed out after {} seconds", timeout_secs
1064 );
1065 self.event_sender
1066 .send(SessionEvent::Reject {
1067 track_id,
1068 timestamp: crate::media::get_timestamp(),
1069 reason: "Timeout when refer".into(),
1070 code: Some(408),
1071 refer: Some(true),
1072 })
1073 .ok();
1074 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1075 }
1076 };
1077
1078 match result {
1079 Ok(answer) => {
1080 self.event_sender
1081 .send(SessionEvent::Answer {
1082 timestamp: crate::media::get_timestamp(),
1083 track_id,
1084 sdp: answer,
1085 refer: Some(true),
1086 })
1087 .ok();
1088 }
1089 Err(e) => {
1090 warn!(
1091 session_id = session_id,
1092 "failed to create refer sip track: {}", e
1093 );
1094 match &e {
1095 rsipstack::Error::DialogError(reason, _, code) => {
1096 self.event_sender
1097 .send(SessionEvent::Reject {
1098 track_id,
1099 timestamp: crate::media::get_timestamp(),
1100 reason: reason.clone(),
1101 code: Some(code.code() as u32),
1102 refer: Some(true),
1103 })
1104 .ok();
1105 }
1106 _ => {}
1107 }
1108 return Err(e.into());
1109 }
1110 }
1111 Ok(())
1112 }
1113
1114 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1115 self.media_stream.mute_track(track_id).await;
1116 Ok(())
1117 }
1118
1119 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1120 self.media_stream.unmute_track(track_id).await;
1121 Ok(())
1122 }
1123
1124 pub async fn cleanup(&self) -> Result<()> {
1125 self.call_state.write().await.tts_handle = None;
1126 self.media_stream.cleanup().await.ok();
1127 Ok(())
1128 }
1129
1130 pub fn get_callrecord(&self) -> Option<CallRecord> {
1131 self.call_state.try_read().ok().map(|call_state| {
1132 call_state.build_callrecord(
1133 self.app_state.clone(),
1134 self.session_id.clone(),
1135 self.call_type.clone(),
1136 )
1137 })
1138 }
1139
1140 async fn dump_to_file(
1141 &self,
1142 dump_file: &mut File,
1143 cmd_receiver: &mut CommandReceiver,
1144 event_receiver: &mut EventReceiver,
1145 ) {
1146 loop {
1147 select! {
1148 _ = self.cancel_token.cancelled() => {
1149 break;
1150 }
1151 Ok(cmd) = cmd_receiver.recv() => {
1152 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1153 .await;
1154 }
1155 Ok(event) = event_receiver.recv() => {
1156 if matches!(event, SessionEvent::Binary{..}) {
1157 continue;
1158 }
1159 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1160 .await;
1161 }
1162 };
1163 }
1164 }
1165
1166 async fn dump_loop(
1167 &self,
1168 dump_events: bool,
1169 mut dump_cmd_receiver: CommandReceiver,
1170 mut dump_event_receiver: EventReceiver,
1171 ) {
1172 if !dump_events {
1173 return;
1174 }
1175
1176 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1177 let mut dump_file = match File::options()
1178 .create(true)
1179 .append(true)
1180 .open(&file_name)
1181 .await
1182 {
1183 Ok(file) => file,
1184 Err(e) => {
1185 warn!(
1186 session_id = self.session_id,
1187 file_name, "failed to open dump events file: {}", e
1188 );
1189 return;
1190 }
1191 };
1192 self.dump_to_file(
1193 &mut dump_file,
1194 &mut dump_cmd_receiver,
1195 &mut dump_event_receiver,
1196 )
1197 .await;
1198
1199 while let Ok(event) = dump_event_receiver.try_recv() {
1200 if matches!(event, SessionEvent::Binary { .. }) {
1201 continue;
1202 }
1203 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1204 }
1205 }
1206
1207 pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1208 let mut rtc_config = RtcTrackConfig::default();
1209 rtc_config.mode = rustrtc::TransportMode::Rtp;
1210
1211 if let Some(codecs) = &self.app_state.config.codecs {
1212 let mut codec_types = Vec::new();
1213 for c in codecs {
1214 match c.to_lowercase().as_str() {
1215 "pcmu" => codec_types.push(CodecType::PCMU),
1216 "pcma" => codec_types.push(CodecType::PCMA),
1217 "g722" => codec_types.push(CodecType::G722),
1218 "g729" => codec_types.push(CodecType::G729),
1219 "opus" => codec_types.push(CodecType::Opus),
1220 "dtmf" | "2833" | "telephone_event" => {
1221 codec_types.push(CodecType::TelephoneEvent)
1222 }
1223 _ => {}
1224 }
1225 }
1226 if !codec_types.is_empty() {
1227 rtc_config.preferred_codec = Some(codec_types[0].clone());
1228 rtc_config.codecs = codec_types;
1229 }
1230 }
1231
1232 if rtc_config.preferred_codec.is_none() {
1233 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1234 }
1235
1236 rtc_config.rtp_port_range = self
1237 .app_state
1238 .config
1239 .rtp_start_port
1240 .zip(self.app_state.config.rtp_end_port);
1241
1242 if let Some(ref external_ip) = self.app_state.config.external_ip {
1243 rtc_config.external_ip = Some(external_ip.clone());
1244 }
1245
1246 let mut track = RtcTrack::new(
1247 self.cancel_token.child_token(),
1248 track_id,
1249 self.track_config.clone(),
1250 rtc_config,
1251 )
1252 .with_ssrc(ssrc);
1253
1254 track.create().await?;
1255
1256 Ok(track)
1257 }
1258
1259 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1260 self.call_state.write().await.option = Some(option.clone());
1261 info!(
1262 session_id = self.session_id,
1263 call_type = ?self.call_type,
1264 "setup caller track"
1265 );
1266
1267 let track = match self.call_type {
1268 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1269 ActiveCallType::WebSocket => {
1270 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1271 if let Some(receiver) = audio_receiver {
1272 Some(self.create_websocket_track(receiver).await?)
1273 } else {
1274 None
1275 }
1276 }
1277 ActiveCallType::Sip => {
1278 if let Some(pending_dialog) = self.invitation.get_pending_call(&self.session_id) {
1279 return self
1280 .prepare_incoming_sip_track(
1281 self.cancel_token.clone(),
1282 self.call_state.clone(),
1283 &self.session_id,
1284 pending_dialog,
1285 )
1286 .await;
1287 }
1288
1289 let invite_option = option.build_invite_option()?;
1290 match self
1291 .create_outgoing_sip_track(
1292 self.cancel_token.clone(),
1293 self.call_state.clone(),
1294 &self.session_id,
1295 invite_option,
1296 &option,
1297 None,
1298 false,
1299 )
1300 .await
1301 {
1302 Ok(answer) => {
1303 self.event_sender
1304 .send(SessionEvent::Answer {
1305 timestamp: crate::media::get_timestamp(),
1306 track_id: self.session_id.clone(),
1307 sdp: answer,
1308 refer: Some(false),
1309 })
1310 .ok();
1311 return Ok(());
1312 }
1313 Err(e) => {
1314 warn!(
1315 session_id = self.session_id,
1316 "failed to create sip track: {}", e
1317 );
1318 match &e {
1319 rsipstack::Error::DialogError(reason, _, code) => {
1320 self.event_sender
1321 .send(SessionEvent::Reject {
1322 track_id: self.session_id.clone(),
1323 timestamp: crate::media::get_timestamp(),
1324 reason: reason.clone(),
1325 code: Some(code.code() as u32),
1326 refer: Some(false),
1327 })
1328 .ok();
1329 }
1330 _ => {}
1331 }
1332 return Err(e.into());
1333 }
1334 }
1335 }
1336 ActiveCallType::B2bua => match self.invitation.get_pending_call(&self.session_id) {
1337 Some(pending_dialog) => {
1338 return self
1339 .prepare_incoming_sip_track(
1340 self.cancel_token.clone(),
1341 self.call_state.clone(),
1342 &self.session_id,
1343 pending_dialog,
1344 )
1345 .await;
1346 }
1347 None => {
1348 warn!(
1349 session_id = self.session_id,
1350 "no pending dialog found for B2BUA call"
1351 );
1352 return Err(anyhow::anyhow!(
1353 "no pending dialog found for session_id: {}",
1354 self.session_id
1355 ));
1356 }
1357 },
1358 };
1359 match track {
1360 Some(track) => {
1361 self.finish_caller_stack(&option, Some(track)).await?;
1362 }
1363 None => {
1364 warn!(session_id = self.session_id, "no track created for caller");
1365 return Err(anyhow::anyhow!("no track created for caller"));
1366 }
1367 }
1368 Ok(())
1369 }
1370
1371 async fn finish_caller_stack(
1372 &self,
1373 option: &CallOption,
1374 track: Option<Box<dyn Track>>,
1375 ) -> Result<()> {
1376 if let Some(track) = track {
1377 self.setup_track_with_stream(&option, track).await?;
1378 }
1379
1380 {
1381 let call_state = self.call_state.read().await;
1382 if let Some(ref answer) = call_state.answer {
1383 info!(
1384 session_id = self.session_id,
1385 "sending answer event: {}", answer,
1386 );
1387 self.event_sender
1388 .send(SessionEvent::Answer {
1389 timestamp: crate::media::get_timestamp(),
1390 track_id: self.session_id.clone(),
1391 sdp: answer.clone(),
1392 refer: Some(false),
1393 })
1394 .ok();
1395 } else {
1396 warn!(
1397 session_id = self.session_id,
1398 "no answer in state to send event"
1399 );
1400 }
1401 }
1402 Ok(())
1403 }
1404
1405 pub async fn setup_track_with_stream(
1406 &self,
1407 option: &CallOption,
1408 mut track: Box<dyn Track>,
1409 ) -> Result<()> {
1410 let processors = match StreamEngine::create_processors(
1411 self.app_state.stream_engine.clone(),
1412 track.as_ref(),
1413 self.cancel_token.child_token(),
1414 self.event_sender.clone(),
1415 self.media_stream.packet_sender.clone(),
1416 option,
1417 )
1418 .await
1419 {
1420 Ok(processors) => processors,
1421 Err(e) => {
1422 warn!(
1423 session_id = self.session_id,
1424 "failed to prepare stream processors: {}", e
1425 );
1426 vec![]
1427 }
1428 };
1429
1430 for processor in processors {
1432 track.append_processor(processor);
1433 }
1434
1435 self.update_track_wrapper(track, None).await;
1436 Ok(())
1437 }
1438
1439 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1440 let ambiance_opt = {
1441 let state = self.call_state.read().await;
1442 let mut opt = state
1443 .option
1444 .as_ref()
1445 .and_then(|o| o.ambiance.clone())
1446 .unwrap_or_default();
1447
1448 if let Some(global) = &self.app_state.config.ambiance {
1449 opt.merge(global);
1450 }
1451 opt
1452 };
1453 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1454 match AmbianceProcessor::new(ambiance_opt).await {
1455 Ok(ambiance) => {
1456 info!(session_id = self.session_id, "loaded ambiance processor");
1457 track.append_processor(Box::new(ambiance));
1458 }
1459 Err(e) => {
1460 tracing::error!("failed to load ambiance wav {}", e);
1461 }
1462 }
1463 }
1464 self.call_state.write().await.current_play_id = play_id.clone();
1465 self.media_stream.update_track(track, play_id).await;
1466 }
1467
1468 pub async fn create_websocket_track(
1469 &self,
1470 audio_receiver: WebsocketBytesReceiver,
1471 ) -> Result<Box<dyn Track>> {
1472 let (ssrc, codec) = {
1473 let call_state = self.call_state.read().await;
1474 (
1475 call_state.ssrc,
1476 call_state
1477 .option
1478 .as_ref()
1479 .map(|o| o.codec.clone())
1480 .unwrap_or_default(),
1481 )
1482 };
1483
1484 let ws_track = WebsocketTrack::new(
1485 self.cancel_token.child_token(),
1486 self.session_id.clone(),
1487 self.track_config.clone(),
1488 self.event_sender.clone(),
1489 audio_receiver,
1490 codec,
1491 ssrc,
1492 );
1493
1494 {
1495 let mut call_state = self.call_state.write().await;
1496 call_state.answer_time = Some(Utc::now());
1497 call_state.answer = Some("".to_string());
1498 call_state.last_status_code = 200;
1499 }
1500
1501 Ok(Box::new(ws_track))
1502 }
1503
1504 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1505 let (ssrc, option) = {
1506 let call_state = self.call_state.read().await;
1507 (
1508 call_state.ssrc,
1509 call_state.option.clone().unwrap_or_default(),
1510 )
1511 };
1512
1513 let mut rtc_config = RtcTrackConfig::default();
1514 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1516
1517 if let Some(codecs) = &self.app_state.config.codecs {
1518 let mut codec_types = Vec::new();
1519 for c in codecs {
1520 match c.to_lowercase().as_str() {
1521 "pcmu" => codec_types.push(CodecType::PCMU),
1522 "pcma" => codec_types.push(CodecType::PCMA),
1523 "g722" => codec_types.push(CodecType::G722),
1524 "g729" => codec_types.push(CodecType::G729),
1525 "opus" => codec_types.push(CodecType::Opus),
1526 "dtmf" | "2833" | "telephone_event" => {
1527 codec_types.push(CodecType::TelephoneEvent)
1528 }
1529 _ => {}
1530 }
1531 }
1532 if !codec_types.is_empty() {
1533 rtc_config.preferred_codec = Some(codec_types[0].clone());
1534 rtc_config.codecs = codec_types;
1535 }
1536 }
1537
1538 if let Some(ref external_ip) = self.app_state.config.external_ip {
1539 rtc_config.external_ip = Some(external_ip.clone());
1540 }
1541
1542 let mut webrtc_track = RtcTrack::new(
1543 self.cancel_token.child_token(),
1544 self.session_id.clone(),
1545 self.track_config.clone(),
1546 rtc_config,
1547 )
1548 .with_ssrc(ssrc);
1549
1550 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1551 let offer = match option.enable_ipv6 {
1552 Some(false) | None => {
1553 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1554 }
1555 _ => option.offer.clone().unwrap_or("".to_string()),
1556 };
1557 let answer: Option<String>;
1558 match webrtc_track.handshake(offer, timeout).await {
1559 Ok(answer_sdp) => {
1560 answer = match option.enable_ipv6 {
1561 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1562 Some(true) => Some(answer_sdp.to_string()),
1563 };
1564 }
1565 Err(e) => {
1566 warn!(session_id = self.session_id, "failed to setup track: {}", e);
1567 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1568 }
1569 }
1570
1571 {
1572 let mut call_state = self.call_state.write().await;
1573 call_state.answer_time = Some(Utc::now());
1574 call_state.answer = answer;
1575 call_state.last_status_code = 200;
1576 }
1577 Ok(Box::new(webrtc_track))
1578 }
1579
1580 async fn create_outgoing_sip_track(
1581 &self,
1582 cancel_token: CancellationToken,
1583 call_state_ref: ActiveCallStateRef,
1584 track_id: &String,
1585 mut invite_option: InviteOption,
1586 call_option: &CallOption,
1587 moh: Option<String>,
1588 auto_hangup: bool,
1589 ) -> Result<String, rsipstack::Error> {
1590 let ssrc = call_state_ref.read().await.ssrc;
1591 let rtp_track = self
1592 .create_rtp_track(track_id.clone(), ssrc)
1593 .await
1594 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1595
1596 let offer = Some(
1597 rtp_track
1598 .local_description()
1599 .await
1600 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1601 );
1602
1603 {
1604 let mut cs = call_state_ref.write().await;
1605 if let Some(o) = cs.option.as_mut() {
1606 o.offer = offer.clone();
1607 }
1608 cs.start_time = Utc::now();
1609 };
1610
1611 invite_option.offer = offer.clone().map(|s| s.into());
1612
1613 let needs_contact = invite_option.contact.scheme.is_none()
1616 || invite_option
1617 .contact
1618 .host_with_port
1619 .to_string()
1620 .starts_with("127.0.0.1");
1621
1622 if needs_contact {
1623 if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1624 invite_option.contact = rsip::Uri {
1625 scheme: Some(rsip::Scheme::Sip),
1626 auth: None,
1627 host_with_port: addr.addr.clone(),
1628 params: vec![],
1629 headers: vec![],
1630 };
1631 }
1632 }
1633
1634 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1635
1636 if let Some(moh) = moh {
1637 let ssrc_and_moh = {
1638 let mut state = call_state_ref.write().await;
1639 state.moh = Some(moh.clone());
1640 if state.current_play_id.is_none() {
1641 let ssrc = rand::random::<u32>();
1642 Some((ssrc, moh.clone()))
1643 } else {
1644 info!(
1645 session_id = self.session_id,
1646 "Something is playing, MOH will start after it ends"
1647 );
1648 None
1649 }
1650 };
1651
1652 if let Some((ssrc, moh_path)) = ssrc_and_moh {
1653 let file_track = FileTrack::new(self.server_side_track_id.clone())
1654 .with_play_id(Some(moh_path.clone()))
1655 .with_ssrc(ssrc)
1656 .with_path(moh_path.clone())
1657 .with_cancel_token(self.cancel_token.child_token());
1658 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1659 .await;
1660 }
1661 } else {
1662 let track = rtp_track_to_setup.take().unwrap();
1663 self.setup_track_with_stream(&call_option, track)
1664 .await
1665 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1666 }
1667
1668 info!(
1669 session_id = self.session_id,
1670 track_id,
1671 contact = %invite_option.contact,
1672 "invite {} -> {} offer: \n{}",
1673 invite_option.caller,
1674 invite_option.callee,
1675 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1676 );
1677
1678 let (dlg_state_sender, dlg_state_receiver) =
1679 self.invitation.dialog_layer.new_dialog_state_channel();
1680
1681 let states = InviteDialogStates {
1682 is_client: true,
1683 session_id: self.session_id.clone(),
1684 track_id: track_id.clone(),
1685 event_sender: self.event_sender.clone(),
1686 media_stream: self.media_stream.clone(),
1687 call_state: call_state_ref.clone(),
1688 cancel_token,
1689 terminated_reason: None,
1690 has_early_media: false,
1691 };
1692
1693 let mut client_dialog_handler =
1694 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1695
1696 crate::spawn(async move {
1697 client_dialog_handler.process_dialog(states).await;
1698 });
1699
1700 let (dialog_id, answer) = self
1701 .invitation
1702 .invite(invite_option, dlg_state_sender)
1703 .await?;
1704
1705 self.call_state.write().await.moh = None;
1706
1707 if let Some(track) = rtp_track_to_setup {
1708 self.setup_track_with_stream(&call_option, track)
1709 .await
1710 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1711 }
1712
1713 let answer = match answer {
1714 Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1715 None => {
1716 warn!(session_id = self.session_id, "no answer received");
1717 return Err(rsipstack::Error::DialogError(
1718 "No answer received".to_string(),
1719 dialog_id,
1720 rsip::StatusCode::NotAcceptableHere,
1721 ));
1722 }
1723 };
1724
1725 {
1726 let mut cs = call_state_ref.write().await;
1727 if cs.answer.is_none() {
1728 cs.answer = Some(answer.clone());
1729 }
1730 if auto_hangup {
1731 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1732 }
1733 }
1734
1735 self.media_stream
1736 .update_remote_description(&track_id, &answer)
1737 .await
1738 .ok();
1739
1740 Ok(answer)
1741 }
1742
1743 pub fn is_webrtc_sdp(sdp: &str) -> bool {
1745 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1746 && sdp.contains("a=fingerprint:")
1747 }
1748
1749 pub async fn setup_answer_track(
1750 &self,
1751 ssrc: u32,
1752 option: &CallOption,
1753 offer: String,
1754 ) -> Result<(String, Box<dyn Track>)> {
1755 let offer = match option.enable_ipv6 {
1756 Some(false) | None => strip_ipv6_candidates(&offer),
1757 _ => offer.clone(),
1758 };
1759
1760 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1761
1762 let mut media_track = if Self::is_webrtc_sdp(&offer) {
1763 let mut rtc_config = RtcTrackConfig::default();
1764 rtc_config.mode = rustrtc::TransportMode::WebRtc;
1765 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1766 if let Some(ref external_ip) = self.app_state.config.external_ip {
1767 rtc_config.external_ip = Some(external_ip.clone());
1768 }
1769
1770 let webrtc_track = RtcTrack::new(
1771 self.cancel_token.child_token(),
1772 self.session_id.clone(),
1773 self.track_config.clone(),
1774 rtc_config,
1775 )
1776 .with_ssrc(ssrc);
1777
1778 Box::new(webrtc_track) as Box<dyn Track>
1779 } else {
1780 let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1781 Box::new(rtp_track) as Box<dyn Track>
1782 };
1783
1784 let answer = match media_track.handshake(offer.clone(), timeout).await {
1785 Ok(answer) => answer,
1786 Err(e) => {
1787 return Err(anyhow::anyhow!("handshake failed: {e}"));
1788 }
1789 };
1790
1791 return Ok((answer, media_track));
1792 }
1793
1794 pub async fn prepare_incoming_sip_track(
1795 &self,
1796 cancel_token: CancellationToken,
1797 call_state_ref: ActiveCallStateRef,
1798 track_id: &String,
1799 pending_dialog: PendingDialog,
1800 ) -> Result<()> {
1801 let state_receiver = pending_dialog.state_receiver;
1802 let states = InviteDialogStates {
1805 is_client: false,
1806 session_id: self.session_id.clone(),
1807 track_id: track_id.clone(),
1808 event_sender: self.event_sender.clone(),
1809 media_stream: self.media_stream.clone(),
1810 call_state: self.call_state.clone(),
1811 cancel_token,
1812 terminated_reason: None,
1813 has_early_media: false,
1814 };
1815
1816 let initial_request = pending_dialog.dialog.initial_request();
1817 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
1818
1819 let (ssrc, option) = {
1820 let call_state = call_state_ref.read().await;
1821 (
1822 call_state.ssrc,
1823 call_state.option.clone().unwrap_or_default(),
1824 )
1825 };
1826
1827 match self.setup_answer_track(ssrc, &option, offer).await {
1828 Ok((offer, track)) => {
1829 self.setup_track_with_stream(&option, track).await?;
1830 {
1831 let mut state = self.call_state.write().await;
1832 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
1833 }
1834 }
1835 Err(e) => {
1836 return Err(anyhow::anyhow!("error creating track: {}", e));
1837 }
1838 }
1839
1840 let mut client_dialog_handler =
1841 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
1842
1843 crate::spawn(async move {
1844 client_dialog_handler.process_dialog(states).await;
1845 });
1846 Ok(())
1847 }
1848}
1849
1850impl Drop for ActiveCall {
1851 fn drop(&mut self) {
1852 info!(session_id = self.session_id, "dropping active call");
1853 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
1854 if let Some(record) = self.get_callrecord() {
1855 if let Err(e) = sender.send(record) {
1856 warn!(
1857 session_id = self.session_id,
1858 "failed to send call record: {}", e
1859 );
1860 }
1861 }
1862 }
1863 }
1864}
1865
1866impl ActiveCallState {
1867 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
1868 if let Some(existing) = &self.option {
1869 if option.asr.is_none() {
1870 option.asr = existing.asr.clone();
1871 }
1872 if option.tts.is_none() {
1873 option.tts = existing.tts.clone();
1874 }
1875 if option.vad.is_none() {
1876 option.vad = existing.vad.clone();
1877 }
1878 if option.denoise.is_none() {
1879 option.denoise = existing.denoise;
1880 }
1881 if option.recorder.is_none() {
1882 option.recorder = existing.recorder.clone();
1883 }
1884 if option.eou.is_none() {
1885 option.eou = existing.eou.clone();
1886 }
1887 if option.extra.is_none() {
1888 option.extra = existing.extra.clone();
1889 }
1890 if option.ambiance.is_none() {
1891 option.ambiance = existing.ambiance.clone();
1892 }
1893 }
1894 option
1895 }
1896
1897 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
1898 if self.hangup_reason.is_none() {
1899 self.hangup_reason = Some(reason);
1900 }
1901 }
1902
1903 pub fn build_hangup_event(
1904 &self,
1905 track_id: TrackId,
1906 initiator: Option<String>,
1907 ) -> crate::event::SessionEvent {
1908 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
1909 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
1910 let extra = self.extras.clone();
1911
1912 crate::event::SessionEvent::Hangup {
1913 track_id,
1914 timestamp: crate::media::get_timestamp(),
1915 reason: Some(format!("{:?}", self.hangup_reason)),
1916 initiator,
1917 start_time: self.start_time.to_rfc3339(),
1918 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
1919 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
1920 hangup_time: Utc::now().to_rfc3339(),
1921 extra,
1922 from: from.map(|f| f.into()),
1923 to: to.map(|f| f.into()),
1924 refer: Some(self.is_refer),
1925 }
1926 }
1927
1928 pub fn build_callrecord(
1929 &self,
1930 app_state: AppState,
1931 session_id: String,
1932 call_type: ActiveCallType,
1933 ) -> CallRecord {
1934 let option = self.option.clone().unwrap_or_default();
1935 let recorder = if option.recorder.is_some() {
1936 let recorder_file = app_state.get_recorder_file(&session_id);
1937 if std::path::Path::new(&recorder_file).exists() {
1938 let file_size = std::fs::metadata(&recorder_file)
1939 .map(|m| m.len())
1940 .unwrap_or(0);
1941 vec![crate::callrecord::CallRecordMedia {
1942 track_id: session_id.clone(),
1943 path: recorder_file,
1944 size: file_size,
1945 extra: None,
1946 }]
1947 } else {
1948 vec![]
1949 }
1950 } else {
1951 vec![]
1952 };
1953
1954 let dump_event_file = app_state.get_dump_events_file(&session_id);
1955 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
1956 Some(dump_event_file)
1957 } else {
1958 None
1959 };
1960
1961 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
1962 if let Ok(rc) = rc.try_read() {
1963 Some(Box::new(rc.build_callrecord(
1964 app_state.clone(),
1965 rc.session_id.clone(),
1966 ActiveCallType::B2bua,
1967 )))
1968 } else {
1969 None
1970 }
1971 });
1972
1973 let caller = option.caller.clone().unwrap_or_default();
1974 let callee = option.callee.clone().unwrap_or_default();
1975
1976 CallRecord {
1977 option: Some(option),
1978 call_id: session_id,
1979 call_type,
1980 start_time: self.start_time,
1981 ring_time: self.ring_time.clone(),
1982 answer_time: self.answer_time.clone(),
1983 end_time: Utc::now(),
1984 caller,
1985 callee,
1986 hangup_reason: self.hangup_reason.clone(),
1987 hangup_messages: Vec::new(),
1988 status_code: self.last_status_code,
1989 extras: self.extras.clone(),
1990 dump_event_file,
1991 recorder,
1992 refer_callrecord,
1993 }
1994 }
1995}