1use super::Command;
2use crate::{
3 CallOption, ReferOption,
4 event::{EventReceiver, EventSender, SessionEvent},
5 media::{
6 TrackId,
7 ambiance::AmbianceProcessor,
8 engine::StreamEngine,
9 negotiate::strip_ipv6_candidates,
10 recorder::RecorderOption,
11 stream::{MediaStream, MediaStreamBuilder},
12 track::{
13 Track, TrackConfig,
14 file::FileTrack,
15 media_pass::MediaPassTrack,
16 rtc::{RtcTrack, RtcTrackConfig},
17 tts::SynthesisHandle,
18 websocket::{WebsocketBytesReceiver, WebsocketTrack},
19 },
20 },
21 synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24 app::AppState,
25 call::{
26 CommandReceiver, CommandSender,
27 sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28 },
29 callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30 useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[derive(Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CallParams {
45 pub id: Option<String>,
46 #[serde(rename = "dump")]
47 pub dump_events: Option<bool>,
48 #[serde(rename = "ping")]
49 pub ping_interval: Option<u32>,
50 pub server_side_track: Option<String>,
51}
52
53#[derive(Debug, Serialize, Deserialize, Clone, Default)]
54#[serde(rename_all = "camelCase")]
55pub enum ActiveCallType {
56 Webrtc,
57 B2bua,
58 WebSocket,
59 #[default]
60 Sip,
61}
62
63#[derive(Default)]
64pub struct ActiveCallState {
65 pub session_id: String,
66 pub start_time: DateTime<Utc>,
67 pub ring_time: Option<DateTime<Utc>>,
68 pub answer_time: Option<DateTime<Utc>>,
69 pub hangup_reason: Option<CallRecordHangupReason>,
70 pub last_status_code: u16,
71 pub option: Option<CallOption>,
72 pub answer: Option<String>,
73 pub ssrc: u32,
74 pub refer_callstate: Option<ActiveCallStateRef>,
75 pub extras: Option<HashMap<String, serde_json::Value>>,
76 pub is_refer: bool,
77
78 pub tts_handle: Option<SynthesisHandle>,
80 pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
81 pub wait_input_timeout: Option<u32>,
82 pub moh: Option<String>,
83 pub current_play_id: Option<String>,
84 pub audio_receiver: Option<WebsocketBytesReceiver>,
85 pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
86}
87
88pub type ActiveCallRef = Arc<ActiveCall>;
89pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
90
91pub struct ActiveCall {
92 pub call_state: ActiveCallStateRef,
93 pub cancel_token: CancellationToken,
94 pub call_type: ActiveCallType,
95 pub session_id: String,
96 pub media_stream: Arc<MediaStream>,
97 pub track_config: TrackConfig,
98 pub event_sender: EventSender,
99 pub app_state: AppState,
100 pub invitation: Invitation,
101 pub cmd_sender: CommandSender,
102 pub dump_events: bool,
103 pub server_side_track_id: TrackId,
104}
105
106pub struct ActiveCallGuard {
107 pub call: ActiveCallRef,
108 pub active_calls: usize,
109}
110
111impl ActiveCallGuard {
112 pub fn new(call: ActiveCallRef) -> Self {
113 let active_calls = {
114 call.app_state
115 .total_calls
116 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117 let mut calls = call.app_state.active_calls.lock().unwrap();
118 calls.insert(call.session_id.clone(), call.clone());
119 calls.len()
120 };
121 Self { call, active_calls }
122 }
123}
124
125impl Drop for ActiveCallGuard {
126 fn drop(&mut self) {
127 self.call
128 .app_state
129 .active_calls
130 .lock()
131 .unwrap()
132 .remove(&self.call.session_id);
133 }
134}
135
136pub struct ActiveCallReceiver {
137 pub cmd_receiver: CommandReceiver,
138 pub dump_cmd_receiver: CommandReceiver,
139 pub dump_event_receiver: EventReceiver,
140}
141
142impl ActiveCall {
143 pub fn new(
144 call_type: ActiveCallType,
145 cancel_token: CancellationToken,
146 session_id: String,
147 invitation: Invitation,
148 app_state: AppState,
149 track_config: TrackConfig,
150 audio_receiver: Option<WebsocketBytesReceiver>,
151 dump_events: bool,
152 server_side_track_id: Option<TrackId>,
153 extras: Option<HashMap<String, serde_json::Value>>,
154 ) -> Self {
155 let event_sender = crate::event::create_event_sender();
156 let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
157 let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
158 .with_id(session_id.clone())
159 .with_cancel_token(cancel_token.child_token());
160 let media_stream = Arc::new(media_stream_builder.build());
161 let call_state = Arc::new(RwLock::new(ActiveCallState {
162 session_id: session_id.clone(),
163 start_time: Utc::now(),
164 ssrc: rand::random::<u32>(),
165 extras,
166 audio_receiver,
167 ..Default::default()
168 }));
169 Self {
170 cancel_token,
171 call_type,
172 session_id,
173 call_state,
174 media_stream,
175 track_config,
176 event_sender,
177 app_state,
178 invitation,
179 cmd_sender,
180 dump_events,
181 server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
182 }
183 }
184
185 pub async fn enqueue_command(&self, command: Command) -> Result<()> {
186 self.cmd_sender
187 .send(command)
188 .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
189 Ok(())
190 }
191
192 pub fn new_receiver(&self) -> ActiveCallReceiver {
196 ActiveCallReceiver {
197 cmd_receiver: self.cmd_sender.subscribe(),
198 dump_cmd_receiver: self.cmd_sender.subscribe(),
199 dump_event_receiver: self.event_sender.subscribe(),
200 }
201 }
202
203 pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
204 let ActiveCallReceiver {
205 mut cmd_receiver,
206 dump_cmd_receiver,
207 dump_event_receiver,
208 } = receiver;
209
210 let process_command_loop = async move {
211 while let Ok(command) = cmd_receiver.recv().await {
212 match self.dispatch(command).await {
213 Ok(_) => (),
214 Err(e) => {
215 warn!(session_id = self.session_id, "{}", e);
216 self.event_sender
217 .send(SessionEvent::Error {
218 track_id: self.session_id.clone(),
219 timestamp: crate::media::get_timestamp(),
220 sender: "command".to_string(),
221 error: e.to_string(),
222 code: None,
223 })
224 .ok();
225 }
226 }
227 }
228 };
229 self.app_state
230 .total_calls
231 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
232
233 tokio::join!(
234 self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
235 async {
236 select! {
237 _ = process_command_loop => {
238 info!(session_id = self.session_id, "command loop done");
239 }
240 _ = self.process() => {
241 info!(session_id = self.session_id, "call serve done");
242 }
243 _ = self.cancel_token.cancelled() => {
244 info!(session_id = self.session_id, "call cancelled - cleaning up resources");
245 }
246 }
247 self.cancel_token.cancel();
248 }
249 );
250 Ok(())
251 }
252
253 async fn process(&self) -> Result<()> {
254 let mut event_receiver = self.event_sender.subscribe();
255
256 let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
257 let input_timeout_expire_ref = input_timeout_expire.clone();
258 let event_sender = self.event_sender.clone();
259 let wait_input_timeout_loop = async {
260 loop {
261 let (start_time, expire) = { *input_timeout_expire.lock().await };
262 if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
263 info!(session_id = self.session_id, "wait input timeout reached");
264 *input_timeout_expire.lock().await = (0, 0);
265 event_sender
266 .send(SessionEvent::Silence {
267 track_id: self.server_side_track_id.clone(),
268 timestamp: crate::media::get_timestamp(),
269 start_time,
270 duration: expire as u64,
271 samples: None,
272 })
273 .ok();
274 }
275 sleep(Duration::from_millis(100)).await;
276 }
277 };
278 let server_side_track_id = self.server_side_track_id.clone();
279 let event_hook_loop = async move {
280 while let Ok(event) = event_receiver.recv().await {
281 match event {
282 SessionEvent::Speaking { .. }
283 | SessionEvent::Dtmf { .. }
284 | SessionEvent::AsrDelta { .. }
285 | SessionEvent::AsrFinal { .. }
286 | SessionEvent::TrackStart { .. } => {
287 *input_timeout_expire_ref.lock().await = (0, 0);
288 }
289 SessionEvent::TrackEnd {
290 track_id,
291 play_id,
292 ssrc,
293 ..
294 } => {
295 if track_id != server_side_track_id {
296 continue;
297 }
298
299 let (moh_path, auto_hangup, wait_timeout_val) = {
300 let mut state = self.call_state.write().await;
301 if play_id != state.current_play_id {
302 debug!(
303 session_id = self.session_id,
304 ?play_id,
305 current = ?state.current_play_id,
306 "ignoring interrupted track end"
307 );
308 continue;
309 }
310 state.current_play_id = None;
311 (
312 state.moh.clone(),
313 state.auto_hangup.clone(),
314 state.wait_input_timeout.take(),
315 )
316 };
317
318 if let Some(path) = moh_path {
319 info!(session_id = self.session_id, "looping moh: {}", path);
320 let ssrc = rand::random::<u32>();
321 let file_track = FileTrack::new(self.server_side_track_id.clone())
322 .with_play_id(Some(path.clone()))
323 .with_ssrc(ssrc)
324 .with_path(path.clone())
325 .with_cancel_token(self.cancel_token.child_token());
326 self.update_track_wrapper(Box::new(file_track), Some(path))
327 .await;
328 continue;
329 }
330
331 if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
332 if hangup_ssrc == ssrc {
333 info!(
334 session_id = self.session_id,
335 ssrc, "auto hangup when track end track_id:{}", track_id
336 );
337 self.do_hangup(Some(hangup_reason), None).await.ok();
338 }
339 }
340
341 if let Some(timeout) = wait_timeout_val {
342 let expire = if timeout > 0 {
343 (crate::media::get_timestamp(), timeout)
344 } else {
345 (0, 0)
346 };
347 *input_timeout_expire_ref.lock().await = expire;
348 }
349 }
350 SessionEvent::Interrupt { receiver } => {
351 let track_id =
352 receiver.unwrap_or_else(|| self.server_side_track_id.clone());
353 if track_id == self.server_side_track_id {
354 debug!(
355 session_id = self.session_id,
356 "received interrupt event, stopping playback"
357 );
358 self.do_interrupt(true).await.ok();
359 }
360 }
361 SessionEvent::Inactivity { track_id, .. } => {
362 info!(
363 session_id = self.session_id,
364 track_id, "inactivity timeout reached, hanging up"
365 );
366 self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
367 .await
368 .ok();
369 }
370 SessionEvent::Error { track_id, .. } => {
371 if track_id != server_side_track_id {
372 continue;
373 }
374
375 let moh_info = {
376 let mut state = self.call_state.write().await;
377 if let Some(path) = state.moh.clone() {
378 let fallback = "./config/sounds/refer_moh.wav".to_string();
379 let next_path = if path != fallback
380 && std::path::Path::new(&fallback).exists()
381 {
382 info!(
383 session_id = self.session_id,
384 "moh error, switching to fallback: {}", fallback
385 );
386 state.moh = Some(fallback.clone());
387 fallback
388 } else {
389 info!(
390 session_id = self.session_id,
391 "looping moh on error: {}", path
392 );
393 path
394 };
395 Some(next_path)
396 } else {
397 None
398 }
399 };
400
401 if let Some(next_path) = moh_info {
402 let ssrc = rand::random::<u32>();
403 let file_track = FileTrack::new(self.server_side_track_id.clone())
404 .with_play_id(Some(next_path.clone()))
405 .with_ssrc(ssrc)
406 .with_path(next_path.clone())
407 .with_cancel_token(self.cancel_token.child_token());
408 self.update_track_wrapper(Box::new(file_track), Some(next_path))
409 .await;
410 continue;
411 }
412 }
413 _ => {}
414 }
415 }
416 };
417
418 select! {
419 _ = wait_input_timeout_loop=>{
420 info!(session_id = self.session_id, "wait input timeout loop done");
421 }
422 _ = self.media_stream.serve() => {
423 info!(session_id = self.session_id, "media stream loop done");
424 }
425 _ = event_hook_loop => {
426 info!(session_id = self.session_id, "event loop done");
427 }
428 }
429 Ok(())
430 }
431
432 async fn dispatch(&self, command: Command) -> Result<()> {
433 match command {
434 Command::Invite { option } => self.do_invite(option).await,
435 Command::Accept { option } => self.do_accept(option).await,
436 Command::Reject { reason, code } => {
437 self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
438 .await
439 }
440 Command::Ringing {
441 ringtone,
442 recorder,
443 early_media,
444 } => self.do_ringing(ringtone, recorder, early_media).await,
445 Command::Tts {
446 text,
447 speaker,
448 play_id,
449 auto_hangup,
450 streaming,
451 end_of_stream,
452 option,
453 wait_input_timeout,
454 base64,
455 } => {
456 self.do_tts(
457 text,
458 speaker,
459 play_id,
460 auto_hangup,
461 streaming.unwrap_or_default(),
462 end_of_stream.unwrap_or_default(),
463 option,
464 wait_input_timeout,
465 base64.unwrap_or_default(),
466 )
467 .await
468 }
469 Command::Play {
470 url,
471 play_id,
472 auto_hangup,
473 wait_input_timeout,
474 } => {
475 self.do_play(url, play_id, auto_hangup, wait_input_timeout)
476 .await
477 }
478 Command::Hangup { reason, initiator } => {
479 let reason = reason.map(|r| {
480 r.parse::<CallRecordHangupReason>()
481 .unwrap_or(CallRecordHangupReason::BySystem)
482 });
483 self.do_hangup(reason, initiator).await
484 }
485 Command::Refer {
486 caller,
487 callee,
488 options,
489 } => self.do_refer(caller, callee, options).await,
490 Command::Mute { track_id } => self.do_mute(track_id).await,
491 Command::Unmute { track_id } => self.do_unmute(track_id).await,
492 Command::Pause {} => self.do_pause().await,
493 Command::Resume {} => self.do_resume().await,
494 Command::Interrupt {
495 graceful: passage,
496 fade_out_ms: _,
497 } => self.do_interrupt(passage.unwrap_or_default()).await,
498 Command::History { speaker, text } => self.do_history(speaker, text).await,
499 }
500 }
501
502 fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
503 if let Some(recorder_option) = &option.recorder {
504 let mut recorder_file = recorder_option.recorder_file.clone();
505 if recorder_file.contains("{id}") {
506 recorder_file = recorder_file.replace("{id}", &self.session_id);
507 }
508
509 let recorder_file = if recorder_file.is_empty() {
510 self.app_state.get_recorder_file(&self.session_id)
511 } else {
512 let p = Path::new(&recorder_file);
513 p.is_absolute()
514 .then(|| recorder_file.clone())
515 .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
516 };
517 info!(
518 session_id = self.session_id,
519 recorder_file, "created recording file"
520 );
521
522 let track_samplerate = self.track_config.samplerate;
523 let recorder_samplerate = if track_samplerate > 0 {
524 track_samplerate
525 } else {
526 recorder_option.samplerate
527 };
528 let recorder_ptime = if recorder_option.ptime == 0 {
529 200
530 } else {
531 recorder_option.ptime
532 };
533 let requested_format = recorder_option
534 .format
535 .unwrap_or(self.app_state.config.recorder_format());
536 let format = requested_format.effective();
537 if requested_format != format {
538 warn!(
539 session_id = self.session_id,
540 requested = requested_format.extension(),
541 "Recorder format fallback to wav due to unsupported feature"
542 );
543 }
544 let mut recorder_config = RecorderOption {
545 recorder_file,
546 samplerate: recorder_samplerate,
547 ptime: recorder_ptime,
548 format: Some(format),
549 };
550 recorder_config.ensure_path_extension(format);
551 Some(recorder_config)
552 } else {
553 None
554 }
555 }
556
557 async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
558 {
560 let state = self.call_state.read().await;
561 option = state.merge_option(option);
562 }
563
564 option.check_default();
565 if let Some(opt) = self.build_record_option(&option) {
566 self.media_stream.update_recorder_option(opt).await;
567 }
568
569 if let Some(opt) = &option.media_pass {
570 let track_id = self.server_side_track_id.clone();
571 let cancel_token = self.cancel_token.child_token();
572 let ssrc = rand::random::<u32>();
573 let media_pass_track = MediaPassTrack::new(
574 self.session_id.clone(),
575 ssrc,
576 track_id,
577 cancel_token,
578 opt.clone(),
579 );
580 self.update_track_wrapper(Box::new(media_pass_track), None)
581 .await;
582 }
583
584 info!(
585 session_id = self.session_id,
586 call_type = ?self.call_type,
587 sender,
588 ?option,
589 "caller with option"
590 );
591
592 match self.setup_caller_track(&option).await {
593 Ok(_) => return Ok(option),
594 Err(e) => {
595 self.app_state
596 .total_failed_calls
597 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
598 let error_event = crate::event::SessionEvent::Error {
599 track_id: self.session_id.clone(),
600 timestamp: crate::media::get_timestamp(),
601 sender,
602 error: e.to_string(),
603 code: None,
604 };
605 self.event_sender.send(error_event).ok();
606 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
607 .await
608 .ok();
609 return Err(e);
610 }
611 }
612 }
613
614 async fn do_invite(&self, option: CallOption) -> Result<()> {
615 self.invite_or_accept(option, "invite".to_string())
616 .await
617 .map(|_| ())
618 }
619
620 async fn do_accept(&self, mut option: CallOption) -> Result<()> {
621 let has_pending = self
622 .invitation
623 .find_dialog_id_by_session_id(&self.session_id)
624 .is_some();
625 let ready_to_answer_val = {
626 let state = self.call_state.read().await;
627 state.ready_to_answer.is_none()
628 };
629
630 if ready_to_answer_val {
631 if !has_pending {
632 warn!(session_id = self.session_id, "no pending call to accept");
634 let rejet_event = crate::event::SessionEvent::Reject {
635 track_id: self.session_id.clone(),
636 timestamp: crate::media::get_timestamp(),
637 reason: "no pending call".to_string(),
638 refer: None,
639 code: Some(486),
640 };
641 self.event_sender.send(rejet_event).ok();
642 self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
643 .await
644 .ok();
645 return Err(anyhow::anyhow!("no pending call to accept"));
646 }
647 option = self.invite_or_accept(option, "accept".to_string()).await?;
648 } else {
649 option.check_default();
650 self.call_state.write().await.option = Some(option.clone());
651 }
652 info!(session_id = self.session_id, ?option, "accepting call");
653 let ready = self.call_state.write().await.ready_to_answer.take();
654 if let Some((answer, track, dialog)) = ready {
655 info!(
656 session_id = self.session_id,
657 track_id = track.as_ref().map(|t| t.id()),
658 "ready to answer with track"
659 );
660
661 let headers = vec![rsip::Header::ContentType(
662 "application/sdp".to_string().into(),
663 )];
664
665 match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
666 Ok(_) => {
667 {
668 let mut state = self.call_state.write().await;
669 state.answer = Some(answer);
670 state.answer_time = Some(Utc::now());
671 }
672 self.finish_caller_stack(&option, track).await?;
673 }
674 Err(e) => {
675 warn!(session_id = self.session_id, "failed to accept call: {}", e);
676 return Err(anyhow::anyhow!("failed to accept call"));
677 }
678 }
679 }
680 return Ok(());
681 }
682
683 async fn do_reject(
684 &self,
685 code: Option<rsip::StatusCode>,
686 reason: Option<String>,
687 ) -> Result<()> {
688 match self
689 .invitation
690 .find_dialog_id_by_session_id(&self.session_id)
691 {
692 Some(id) => {
693 info!(
694 session_id = self.session_id,
695 ?reason,
696 ?code,
697 "rejecting call"
698 );
699 self.invitation.hangup(id, code, reason).await
700 }
701 None => Ok(()),
702 }
703 }
704
705 async fn do_ringing(
706 &self,
707 ringtone: Option<String>,
708 recorder: Option<RecorderOption>,
709 early_media: Option<bool>,
710 ) -> Result<()> {
711 let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
712 if ready_to_answer_val {
713 let option = CallOption {
714 recorder,
715 ..Default::default()
716 };
717 let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
718 }
719
720 let state = self.call_state.read().await;
721 if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
722 let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
723 let headers = vec![rsip::Header::ContentType(
724 "application/sdp".to_string().into(),
725 )];
726 (Some(headers), Some(answer.as_bytes().to_vec()))
727 } else {
728 (None, None)
729 };
730
731 dialog.ringing(headers, body).ok();
732 info!(
733 session_id = self.session_id,
734 ringtone, early_media, "playing ringtone"
735 );
736 if let Some(ringtone_url) = ringtone {
737 drop(state);
738 self.do_play(ringtone_url, None, None, None).await.ok();
739 } else {
740 info!(session_id = self.session_id, "no ringtone to play");
741 }
742 }
743 Ok(())
744 }
745
746 async fn do_tts(
747 &self,
748 text: String,
749 speaker: Option<String>,
750 play_id: Option<String>,
751 auto_hangup: Option<bool>,
752 streaming: bool,
753 end_of_stream: bool,
754 option: Option<SynthesisOption>,
755 wait_input_timeout: Option<u32>,
756 base64: bool,
757 ) -> Result<()> {
758 let tts_option = {
759 let call_state = self.call_state.read().await;
760 match call_state.option.clone().unwrap_or_default().tts {
761 Some(opt) => opt.merge_with(option),
762 None => {
763 if let Some(opt) = option {
764 opt
765 } else {
766 return Err(anyhow::anyhow!("no tts option available"));
767 }
768 }
769 }
770 };
771 let speaker = match speaker {
772 Some(s) => Some(s),
773 None => tts_option.speaker.clone(),
774 };
775
776 let mut play_command = SynthesisCommand {
777 text,
778 speaker,
779 play_id: play_id.clone(),
780 streaming,
781 end_of_stream,
782 option: tts_option,
783 base64,
784 };
785 info!(
786 session_id = self.session_id,
787 provider = ?play_command.option.provider,
788 text = %play_command.text.chars().take(10).collect::<String>(),
789 speaker = play_command.speaker.as_deref(),
790 auto_hangup = auto_hangup.unwrap_or_default(),
791 play_id = play_command.play_id.as_deref(),
792 streaming = play_command.streaming,
793 end_of_stream = play_command.end_of_stream,
794 wait_input_timeout = wait_input_timeout.unwrap_or_default(),
795 is_base64 = play_command.base64,
796 "new synthesis"
797 );
798
799 let ssrc = rand::random::<u32>();
800 let should_interrupt = {
801 let mut state = self.call_state.write().await;
802 state.auto_hangup = match auto_hangup {
803 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
804 _ => None,
805 };
806 state.wait_input_timeout = wait_input_timeout;
807
808 let changed = play_id.is_some() && state.current_play_id != play_id;
809 state.current_play_id = play_id.clone();
810 changed
811 };
812
813 if should_interrupt {
814 let _ = self.do_interrupt(false).await;
815 }
816
817 let existing_handle = self.call_state.read().await.tts_handle.clone();
818 if let Some(tts_handle) = existing_handle {
819 match tts_handle.try_send(play_command) {
820 Ok(_) => return Ok(()),
821 Err(e) => {
822 play_command = e.0;
823 }
824 }
825 }
826
827 let (new_handle, tts_track) = StreamEngine::create_tts_track(
828 self.app_state.stream_engine.clone(),
829 self.cancel_token.child_token(),
830 self.session_id.clone(),
831 self.server_side_track_id.clone(),
832 ssrc,
833 play_id.clone(),
834 streaming,
835 &play_command.option,
836 )
837 .await?;
838
839 new_handle.try_send(play_command)?;
840 self.call_state.write().await.tts_handle = Some(new_handle);
841 self.update_track_wrapper(tts_track, play_id).await;
842 Ok(())
843 }
844
845 async fn do_play(
846 &self,
847 url: String,
848 play_id: Option<String>,
849 auto_hangup: Option<bool>,
850 wait_input_timeout: Option<u32>,
851 ) -> Result<()> {
852 let ssrc = rand::random::<u32>();
853 info!(
854 session_id = self.session_id,
855 ssrc, url, play_id, auto_hangup, "play file track"
856 );
857
858 let play_id = play_id.or(Some(url.clone()));
859
860 let file_track = FileTrack::new(self.server_side_track_id.clone())
861 .with_play_id(play_id.clone())
862 .with_ssrc(ssrc)
863 .with_path(url)
864 .with_cancel_token(self.cancel_token.child_token());
865
866 {
867 let mut state = self.call_state.write().await;
868 state.tts_handle = None;
869 state.auto_hangup = match auto_hangup {
870 Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
871 _ => None,
872 };
873 state.wait_input_timeout = wait_input_timeout;
874 }
875
876 self.update_track_wrapper(Box::new(file_track), play_id)
877 .await;
878 Ok(())
879 }
880
881 async fn do_history(&self, speaker: String, text: String) -> Result<()> {
882 self.event_sender
883 .send(SessionEvent::AddHistory {
884 sender: Some(self.session_id.clone()),
885 timestamp: crate::media::get_timestamp(),
886 speaker,
887 text,
888 })
889 .map(|_| ())
890 .map_err(Into::into)
891 }
892
893 async fn do_interrupt(&self, graceful: bool) -> Result<()> {
894 {
895 let mut state = self.call_state.write().await;
896 state.tts_handle = None;
897 state.moh = None;
898 }
899 self.media_stream
900 .remove_track(&self.server_side_track_id, graceful)
901 .await;
902 Ok(())
903 }
904 async fn do_pause(&self) -> Result<()> {
905 Ok(())
906 }
907 async fn do_resume(&self) -> Result<()> {
908 Ok(())
909 }
910 async fn do_hangup(
911 &self,
912 reason: Option<CallRecordHangupReason>,
913 initiator: Option<String>,
914 ) -> Result<()> {
915 info!(
916 session_id = self.session_id,
917 ?reason,
918 ?initiator,
919 "do_hangup"
920 );
921
922 let hangup_reason = match initiator.as_deref() {
924 Some("caller") => CallRecordHangupReason::ByCaller,
925 Some("callee") => CallRecordHangupReason::ByCallee,
926 Some("system") => CallRecordHangupReason::Autohangup,
927 _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
928 };
929
930 self.media_stream
931 .stop(Some(hangup_reason.to_string()), initiator);
932
933 self.call_state
934 .write()
935 .await
936 .set_hangup_reason(hangup_reason);
937 Ok(())
938 }
939
940 async fn do_refer(
941 &self,
942 caller: String,
943 callee: String,
944 refer_option: Option<ReferOption>,
945 ) -> Result<()> {
946 self.do_interrupt(false).await.ok();
947 let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
948 if let Some(ref path) = moh {
949 if !path.starts_with("http") && !std::path::Path::new(path).exists() {
950 let fallback = "./config/sounds/refer_moh.wav";
951 if std::path::Path::new(fallback).exists() {
952 info!(
953 session_id = self.session_id,
954 "moh {} not found, using fallback {}", path, fallback
955 );
956 moh = Some(fallback.to_string());
957 }
958 }
959 }
960 let session_id = self.session_id.clone();
961 let track_id = self.server_side_track_id.clone();
962
963 let recorder = {
964 let cs = self.call_state.read().await;
965 cs.option
966 .as_ref()
967 .map(|o| o.recorder.clone())
968 .unwrap_or_default()
969 };
970
971 let call_option = CallOption {
972 caller: Some(caller),
973 callee: Some(callee.clone()),
974 sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
975 asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
976 denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
977 recorder,
978 ..Default::default()
979 };
980
981 let mut invite_option = call_option.build_invite_option()?;
982 invite_option.call_id = Some(self.session_id.clone());
983
984 let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
985
986 {
987 let cs = self.call_state.read().await;
988 if let Some(opt) = cs.option.as_ref() {
989 if let Some(callee) = opt.callee.as_ref() {
990 headers.push(rsip::Header::Other(
991 "X-Referred-To".to_string(),
992 callee.clone(),
993 ));
994 }
995 if let Some(caller) = opt.caller.as_ref() {
996 headers.push(rsip::Header::Other(
997 "X-Referred-From".to_string(),
998 caller.clone(),
999 ));
1000 }
1001 }
1002 }
1003
1004 headers.push(rsip::Header::Other(
1005 "X-Referred-Id".to_string(),
1006 self.session_id.clone(),
1007 ));
1008
1009 let ssrc = rand::random::<u32>();
1010 let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1011 start_time: Utc::now(),
1012 ssrc,
1013 option: Some(call_option.clone()),
1014 is_refer: true,
1015 ..Default::default()
1016 }));
1017
1018 {
1019 let mut cs = self.call_state.write().await;
1020 cs.refer_callstate.replace(refer_call_state.clone());
1021 }
1022
1023 let auto_hangup_requested = refer_option
1024 .as_ref()
1025 .and_then(|o| o.auto_hangup)
1026 .unwrap_or(true);
1027
1028 if auto_hangup_requested {
1029 self.call_state.write().await.auto_hangup =
1030 Some((ssrc, CallRecordHangupReason::ByRefer));
1031 } else {
1032 self.call_state.write().await.auto_hangup = None;
1033 }
1034
1035 let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1036
1037 info!(
1038 session_id = self.session_id,
1039 ssrc,
1040 auto_hangup = auto_hangup_requested,
1041 callee,
1042 timeout_secs,
1043 "do_refer"
1044 );
1045
1046 let r = tokio::time::timeout(
1047 Duration::from_secs(timeout_secs as u64),
1048 self.create_outgoing_sip_track(
1049 self.cancel_token.child_token(),
1050 refer_call_state.clone(),
1051 &track_id,
1052 invite_option,
1053 &call_option,
1054 moh,
1055 auto_hangup_requested,
1056 ),
1057 )
1058 .await;
1059
1060 {
1061 self.call_state.write().await.moh = None;
1062 }
1063
1064 let result = match r {
1065 Ok(res) => res,
1066 Err(_) => {
1067 warn!(
1068 session_id = session_id,
1069 "refer sip track creation timed out after {} seconds", timeout_secs
1070 );
1071 self.event_sender
1072 .send(SessionEvent::Reject {
1073 track_id,
1074 timestamp: crate::media::get_timestamp(),
1075 reason: "Timeout when refer".into(),
1076 code: Some(408),
1077 refer: Some(true),
1078 })
1079 .ok();
1080 return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1081 }
1082 };
1083
1084 match result {
1085 Ok(answer) => {
1086 self.event_sender
1087 .send(SessionEvent::Answer {
1088 timestamp: crate::media::get_timestamp(),
1089 track_id,
1090 sdp: answer,
1091 refer: Some(true),
1092 })
1093 .ok();
1094 }
1095 Err(e) => {
1096 warn!(
1097 session_id = session_id,
1098 "failed to create refer sip track: {}", e
1099 );
1100 match &e {
1101 rsipstack::Error::DialogError(reason, _, code) => {
1102 self.event_sender
1103 .send(SessionEvent::Reject {
1104 track_id,
1105 timestamp: crate::media::get_timestamp(),
1106 reason: reason.clone(),
1107 code: Some(code.code() as u32),
1108 refer: Some(true),
1109 })
1110 .ok();
1111 }
1112 _ => {}
1113 }
1114 return Err(e.into());
1115 }
1116 }
1117 Ok(())
1118 }
1119
1120 async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1121 self.media_stream.mute_track(track_id).await;
1122 Ok(())
1123 }
1124
1125 async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1126 self.media_stream.unmute_track(track_id).await;
1127 Ok(())
1128 }
1129
1130 pub async fn cleanup(&self) -> Result<()> {
1131 self.call_state.write().await.tts_handle = None;
1132 self.media_stream.cleanup().await.ok();
1133 Ok(())
1134 }
1135
1136 pub fn get_callrecord(&self) -> Option<CallRecord> {
1137 self.call_state.try_read().ok().map(|call_state| {
1138 call_state.build_callrecord(
1139 self.app_state.clone(),
1140 self.session_id.clone(),
1141 self.call_type.clone(),
1142 )
1143 })
1144 }
1145
1146 async fn dump_to_file(
1147 &self,
1148 dump_file: &mut File,
1149 cmd_receiver: &mut CommandReceiver,
1150 event_receiver: &mut EventReceiver,
1151 ) {
1152 loop {
1153 select! {
1154 _ = self.cancel_token.cancelled() => {
1155 break;
1156 }
1157 Ok(cmd) = cmd_receiver.recv() => {
1158 CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1159 .await;
1160 }
1161 Ok(event) = event_receiver.recv() => {
1162 if matches!(event, SessionEvent::Binary{..}) {
1163 continue;
1164 }
1165 CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1166 .await;
1167 }
1168 };
1169 }
1170 }
1171
1172 async fn dump_loop(
1173 &self,
1174 dump_events: bool,
1175 mut dump_cmd_receiver: CommandReceiver,
1176 mut dump_event_receiver: EventReceiver,
1177 ) {
1178 if !dump_events {
1179 return;
1180 }
1181
1182 let file_name = self.app_state.get_dump_events_file(&self.session_id);
1183 let mut dump_file = match File::options()
1184 .create(true)
1185 .append(true)
1186 .open(&file_name)
1187 .await
1188 {
1189 Ok(file) => file,
1190 Err(e) => {
1191 warn!(
1192 session_id = self.session_id,
1193 file_name, "failed to open dump events file: {}", e
1194 );
1195 return;
1196 }
1197 };
1198 self.dump_to_file(
1199 &mut dump_file,
1200 &mut dump_cmd_receiver,
1201 &mut dump_event_receiver,
1202 )
1203 .await;
1204
1205 while let Ok(event) = dump_event_receiver.try_recv() {
1206 if matches!(event, SessionEvent::Binary { .. }) {
1207 continue;
1208 }
1209 CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1210 }
1211 }
1212
1213 pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1214 let mut rtc_config = RtcTrackConfig::default();
1215 rtc_config.mode = rustrtc::TransportMode::Rtp;
1216
1217 if let Some(codecs) = &self.app_state.config.codecs {
1218 let mut codec_types = Vec::new();
1219 for c in codecs {
1220 match c.to_lowercase().as_str() {
1221 "pcmu" => codec_types.push(CodecType::PCMU),
1222 "pcma" => codec_types.push(CodecType::PCMA),
1223 "g722" => codec_types.push(CodecType::G722),
1224 "g729" => codec_types.push(CodecType::G729),
1225 "opus" => codec_types.push(CodecType::Opus),
1226 "dtmf" | "2833" | "telephone_event" => {
1227 codec_types.push(CodecType::TelephoneEvent)
1228 }
1229 _ => {}
1230 }
1231 }
1232 if !codec_types.is_empty() {
1233 rtc_config.preferred_codec = Some(codec_types[0].clone());
1234 rtc_config.codecs = codec_types;
1235 }
1236 }
1237
1238 if rtc_config.preferred_codec.is_none() {
1239 rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1240 }
1241
1242 rtc_config.rtp_port_range = self
1243 .app_state
1244 .config
1245 .rtp_start_port
1246 .zip(self.app_state.config.rtp_end_port);
1247
1248 if let Some(ref external_ip) = self.app_state.config.external_ip {
1249 rtc_config.external_ip = Some(external_ip.clone());
1250 }
1251
1252 let mut track = RtcTrack::new(
1253 self.cancel_token.child_token(),
1254 track_id,
1255 self.track_config.clone(),
1256 rtc_config,
1257 )
1258 .with_ssrc(ssrc);
1259
1260 track.create().await?;
1261
1262 Ok(track)
1263 }
1264
1265 async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1266 self.call_state.write().await.option = Some(option.clone());
1267 info!(
1268 session_id = self.session_id,
1269 call_type = ?self.call_type,
1270 "setup caller track"
1271 );
1272
1273 let track = match self.call_type {
1274 ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1275 ActiveCallType::WebSocket => {
1276 let audio_receiver = self.call_state.write().await.audio_receiver.take();
1277 if let Some(receiver) = audio_receiver {
1278 Some(self.create_websocket_track(receiver).await?)
1279 } else {
1280 None
1281 }
1282 }
1283 ActiveCallType::Sip => {
1284 if let Some(dialog_id) = self
1285 .invitation
1286 .find_dialog_id_by_session_id(&self.session_id)
1287 {
1288 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1289 return self
1290 .prepare_incoming_sip_track(
1291 self.cancel_token.clone(),
1292 self.call_state.clone(),
1293 &self.session_id,
1294 pending_dialog,
1295 )
1296 .await;
1297 }
1298 }
1299
1300 let mut invite_option = option.build_invite_option()?;
1301 invite_option.call_id = Some(self.session_id.clone());
1302
1303 match self
1304 .create_outgoing_sip_track(
1305 self.cancel_token.clone(),
1306 self.call_state.clone(),
1307 &self.session_id,
1308 invite_option,
1309 &option,
1310 None,
1311 false,
1312 )
1313 .await
1314 {
1315 Ok(answer) => {
1316 self.event_sender
1317 .send(SessionEvent::Answer {
1318 timestamp: crate::media::get_timestamp(),
1319 track_id: self.session_id.clone(),
1320 sdp: answer,
1321 refer: Some(false),
1322 })
1323 .ok();
1324 return Ok(());
1325 }
1326 Err(e) => {
1327 warn!(
1328 session_id = self.session_id,
1329 "failed to create sip track: {}", e
1330 );
1331 match &e {
1332 rsipstack::Error::DialogError(reason, _, code) => {
1333 self.event_sender
1334 .send(SessionEvent::Reject {
1335 track_id: self.session_id.clone(),
1336 timestamp: crate::media::get_timestamp(),
1337 reason: reason.clone(),
1338 code: Some(code.code() as u32),
1339 refer: Some(false),
1340 })
1341 .ok();
1342 }
1343 _ => {}
1344 }
1345 return Err(e.into());
1346 }
1347 }
1348 }
1349 ActiveCallType::B2bua => {
1350 if let Some(dialog_id) = self
1351 .invitation
1352 .find_dialog_id_by_session_id(&self.session_id)
1353 {
1354 if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1355 return self
1356 .prepare_incoming_sip_track(
1357 self.cancel_token.clone(),
1358 self.call_state.clone(),
1359 &self.session_id,
1360 pending_dialog,
1361 )
1362 .await;
1363 }
1364 }
1365
1366 warn!(
1367 session_id = self.session_id,
1368 "no pending dialog found for B2BUA call"
1369 );
1370 return Err(anyhow::anyhow!(
1371 "no pending dialog found for session_id: {}",
1372 self.session_id
1373 ));
1374 }
1375 };
1376 match track {
1377 Some(track) => {
1378 self.finish_caller_stack(&option, Some(track)).await?;
1379 }
1380 None => {
1381 warn!(session_id = self.session_id, "no track created for caller");
1382 return Err(anyhow::anyhow!("no track created for caller"));
1383 }
1384 }
1385 Ok(())
1386 }
1387
1388 async fn finish_caller_stack(
1389 &self,
1390 option: &CallOption,
1391 track: Option<Box<dyn Track>>,
1392 ) -> Result<()> {
1393 if let Some(track) = track {
1394 self.setup_track_with_stream(&option, track).await?;
1395 }
1396
1397 {
1398 let call_state = self.call_state.read().await;
1399 if let Some(ref answer) = call_state.answer {
1400 info!(
1401 session_id = self.session_id,
1402 "sending answer event: {}", answer,
1403 );
1404 self.event_sender
1405 .send(SessionEvent::Answer {
1406 timestamp: crate::media::get_timestamp(),
1407 track_id: self.session_id.clone(),
1408 sdp: answer.clone(),
1409 refer: Some(false),
1410 })
1411 .ok();
1412 } else {
1413 warn!(
1414 session_id = self.session_id,
1415 "no answer in state to send event"
1416 );
1417 }
1418 }
1419 Ok(())
1420 }
1421
1422 pub async fn setup_track_with_stream(
1423 &self,
1424 option: &CallOption,
1425 mut track: Box<dyn Track>,
1426 ) -> Result<()> {
1427 let processors = match StreamEngine::create_processors(
1428 self.app_state.stream_engine.clone(),
1429 track.as_ref(),
1430 self.cancel_token.child_token(),
1431 self.event_sender.clone(),
1432 self.media_stream.packet_sender.clone(),
1433 option,
1434 )
1435 .await
1436 {
1437 Ok(processors) => processors,
1438 Err(e) => {
1439 warn!(
1440 session_id = self.session_id,
1441 "failed to prepare stream processors: {}", e
1442 );
1443 vec![]
1444 }
1445 };
1446
1447 for processor in processors {
1449 track.append_processor(processor);
1450 }
1451
1452 self.update_track_wrapper(track, None).await;
1453 Ok(())
1454 }
1455
1456 pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1457 let ambiance_opt = {
1458 let state = self.call_state.read().await;
1459 let mut opt = state
1460 .option
1461 .as_ref()
1462 .and_then(|o| o.ambiance.clone())
1463 .unwrap_or_default();
1464
1465 if let Some(global) = &self.app_state.config.ambiance {
1466 opt.merge(global);
1467 }
1468 opt
1469 };
1470 if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1471 match AmbianceProcessor::new(ambiance_opt).await {
1472 Ok(ambiance) => {
1473 info!(session_id = self.session_id, "loaded ambiance processor");
1474 track.append_processor(Box::new(ambiance));
1475 }
1476 Err(e) => {
1477 tracing::error!("failed to load ambiance wav {}", e);
1478 }
1479 }
1480 }
1481 self.call_state.write().await.current_play_id = play_id.clone();
1482 self.media_stream.update_track(track, play_id).await;
1483 }
1484
1485 pub async fn create_websocket_track(
1486 &self,
1487 audio_receiver: WebsocketBytesReceiver,
1488 ) -> Result<Box<dyn Track>> {
1489 let (ssrc, codec) = {
1490 let call_state = self.call_state.read().await;
1491 (
1492 call_state.ssrc,
1493 call_state
1494 .option
1495 .as_ref()
1496 .map(|o| o.codec.clone())
1497 .unwrap_or_default(),
1498 )
1499 };
1500
1501 let ws_track = WebsocketTrack::new(
1502 self.cancel_token.child_token(),
1503 self.session_id.clone(),
1504 self.track_config.clone(),
1505 self.event_sender.clone(),
1506 audio_receiver,
1507 codec,
1508 ssrc,
1509 );
1510
1511 {
1512 let mut call_state = self.call_state.write().await;
1513 call_state.answer_time = Some(Utc::now());
1514 call_state.answer = Some("".to_string());
1515 call_state.last_status_code = 200;
1516 }
1517
1518 Ok(Box::new(ws_track))
1519 }
1520
1521 pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1522 let (ssrc, option) = {
1523 let call_state = self.call_state.read().await;
1524 (
1525 call_state.ssrc,
1526 call_state.option.clone().unwrap_or_default(),
1527 )
1528 };
1529
1530 let mut rtc_config = RtcTrackConfig::default();
1531 rtc_config.mode = rustrtc::TransportMode::WebRtc; rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1533
1534 if let Some(codecs) = &self.app_state.config.codecs {
1535 let mut codec_types = Vec::new();
1536 for c in codecs {
1537 match c.to_lowercase().as_str() {
1538 "pcmu" => codec_types.push(CodecType::PCMU),
1539 "pcma" => codec_types.push(CodecType::PCMA),
1540 "g722" => codec_types.push(CodecType::G722),
1541 "g729" => codec_types.push(CodecType::G729),
1542 "opus" => codec_types.push(CodecType::Opus),
1543 "dtmf" | "2833" | "telephone_event" => {
1544 codec_types.push(CodecType::TelephoneEvent)
1545 }
1546 _ => {}
1547 }
1548 }
1549 if !codec_types.is_empty() {
1550 rtc_config.preferred_codec = Some(codec_types[0].clone());
1551 rtc_config.codecs = codec_types;
1552 }
1553 }
1554
1555 if let Some(ref external_ip) = self.app_state.config.external_ip {
1556 rtc_config.external_ip = Some(external_ip.clone());
1557 }
1558
1559 let mut webrtc_track = RtcTrack::new(
1560 self.cancel_token.child_token(),
1561 self.session_id.clone(),
1562 self.track_config.clone(),
1563 rtc_config,
1564 )
1565 .with_ssrc(ssrc);
1566
1567 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1568 let offer = match option.enable_ipv6 {
1569 Some(false) | None => {
1570 strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1571 }
1572 _ => option.offer.clone().unwrap_or("".to_string()),
1573 };
1574 let answer: Option<String>;
1575 match webrtc_track.handshake(offer, timeout).await {
1576 Ok(answer_sdp) => {
1577 answer = match option.enable_ipv6 {
1578 Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1579 Some(true) => Some(answer_sdp.to_string()),
1580 };
1581 }
1582 Err(e) => {
1583 warn!(session_id = self.session_id, "failed to setup track: {}", e);
1584 return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1585 }
1586 }
1587
1588 {
1589 let mut call_state = self.call_state.write().await;
1590 call_state.answer_time = Some(Utc::now());
1591 call_state.answer = answer;
1592 call_state.last_status_code = 200;
1593 }
1594 Ok(Box::new(webrtc_track))
1595 }
1596
1597 async fn create_outgoing_sip_track(
1598 &self,
1599 cancel_token: CancellationToken,
1600 call_state_ref: ActiveCallStateRef,
1601 track_id: &String,
1602 mut invite_option: InviteOption,
1603 call_option: &CallOption,
1604 moh: Option<String>,
1605 auto_hangup: bool,
1606 ) -> Result<String, rsipstack::Error> {
1607 let ssrc = call_state_ref.read().await.ssrc;
1608 let rtp_track = self
1609 .create_rtp_track(track_id.clone(), ssrc)
1610 .await
1611 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1612
1613 let offer = Some(
1614 rtp_track
1615 .local_description()
1616 .await
1617 .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1618 );
1619
1620 {
1621 let mut cs = call_state_ref.write().await;
1622 if let Some(o) = cs.option.as_mut() {
1623 o.offer = offer.clone();
1624 }
1625 cs.start_time = Utc::now();
1626 };
1627
1628 invite_option.offer = offer.clone().map(|s| s.into());
1629
1630 let needs_contact = invite_option.contact.scheme.is_none()
1633 || invite_option
1634 .contact
1635 .host_with_port
1636 .to_string()
1637 .starts_with("127.0.0.1");
1638
1639 if needs_contact {
1640 if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1641 invite_option.contact = rsip::Uri {
1642 scheme: Some(rsip::Scheme::Sip),
1643 auth: None,
1644 host_with_port: addr.addr.clone(),
1645 params: vec![],
1646 headers: vec![],
1647 };
1648 }
1649 }
1650
1651 let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1652
1653 if let Some(moh) = moh {
1654 let ssrc_and_moh = {
1655 let mut state = call_state_ref.write().await;
1656 state.moh = Some(moh.clone());
1657 if state.current_play_id.is_none() {
1658 let ssrc = rand::random::<u32>();
1659 Some((ssrc, moh.clone()))
1660 } else {
1661 info!(
1662 session_id = self.session_id,
1663 "Something is playing, MOH will start after it ends"
1664 );
1665 None
1666 }
1667 };
1668
1669 if let Some((ssrc, moh_path)) = ssrc_and_moh {
1670 let file_track = FileTrack::new(self.server_side_track_id.clone())
1671 .with_play_id(Some(moh_path.clone()))
1672 .with_ssrc(ssrc)
1673 .with_path(moh_path.clone())
1674 .with_cancel_token(self.cancel_token.child_token());
1675 self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1676 .await;
1677 }
1678 } else {
1679 let track = rtp_track_to_setup.take().unwrap();
1680 self.setup_track_with_stream(&call_option, track)
1681 .await
1682 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1683 }
1684
1685 info!(
1686 session_id = self.session_id,
1687 track_id,
1688 contact = %invite_option.contact,
1689 "invite {} -> {} offer: \n{}",
1690 invite_option.caller,
1691 invite_option.callee,
1692 offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1693 );
1694
1695 let (dlg_state_sender, dlg_state_receiver) =
1696 self.invitation.dialog_layer.new_dialog_state_channel();
1697
1698 let states = InviteDialogStates {
1699 is_client: true,
1700 session_id: self.session_id.clone(),
1701 track_id: track_id.clone(),
1702 event_sender: self.event_sender.clone(),
1703 media_stream: self.media_stream.clone(),
1704 call_state: call_state_ref.clone(),
1705 cancel_token,
1706 terminated_reason: None,
1707 has_early_media: false,
1708 };
1709
1710 let mut client_dialog_handler =
1711 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1712
1713 crate::spawn(async move {
1714 client_dialog_handler.process_dialog(states).await;
1715 });
1716
1717 let (dialog_id, answer) = self
1718 .invitation
1719 .invite(invite_option, dlg_state_sender)
1720 .await?;
1721
1722 self.call_state.write().await.moh = None;
1723
1724 if let Some(track) = rtp_track_to_setup {
1725 self.setup_track_with_stream(&call_option, track)
1726 .await
1727 .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1728 }
1729
1730 let answer = match answer {
1731 Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1732 None => {
1733 warn!(session_id = self.session_id, "no answer received");
1734 return Err(rsipstack::Error::DialogError(
1735 "No answer received".to_string(),
1736 dialog_id,
1737 rsip::StatusCode::NotAcceptableHere,
1738 ));
1739 }
1740 };
1741
1742 {
1743 let mut cs = call_state_ref.write().await;
1744 if cs.answer.is_none() {
1745 cs.answer = Some(answer.clone());
1746 }
1747 if auto_hangup {
1748 cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1749 }
1750 }
1751
1752 self.media_stream
1753 .update_remote_description(&track_id, &answer)
1754 .await
1755 .ok();
1756
1757 Ok(answer)
1758 }
1759
1760 pub fn is_webrtc_sdp(sdp: &str) -> bool {
1762 (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1763 && sdp.contains("a=fingerprint:")
1764 }
1765
1766 pub async fn setup_answer_track(
1767 &self,
1768 ssrc: u32,
1769 option: &CallOption,
1770 offer: String,
1771 ) -> Result<(String, Box<dyn Track>)> {
1772 let offer = match option.enable_ipv6 {
1773 Some(false) | None => strip_ipv6_candidates(&offer),
1774 _ => offer.clone(),
1775 };
1776
1777 let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1778
1779 let mut media_track = if Self::is_webrtc_sdp(&offer) {
1780 let mut rtc_config = RtcTrackConfig::default();
1781 rtc_config.mode = rustrtc::TransportMode::WebRtc;
1782 rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1783 if let Some(ref external_ip) = self.app_state.config.external_ip {
1784 rtc_config.external_ip = Some(external_ip.clone());
1785 }
1786
1787 let webrtc_track = RtcTrack::new(
1788 self.cancel_token.child_token(),
1789 self.session_id.clone(),
1790 self.track_config.clone(),
1791 rtc_config,
1792 )
1793 .with_ssrc(ssrc);
1794
1795 Box::new(webrtc_track) as Box<dyn Track>
1796 } else {
1797 let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1798 Box::new(rtp_track) as Box<dyn Track>
1799 };
1800
1801 let answer = match media_track.handshake(offer.clone(), timeout).await {
1802 Ok(answer) => answer,
1803 Err(e) => {
1804 return Err(anyhow::anyhow!("handshake failed: {e}"));
1805 }
1806 };
1807
1808 return Ok((answer, media_track));
1809 }
1810
1811 pub async fn prepare_incoming_sip_track(
1812 &self,
1813 cancel_token: CancellationToken,
1814 call_state_ref: ActiveCallStateRef,
1815 track_id: &String,
1816 pending_dialog: PendingDialog,
1817 ) -> Result<()> {
1818 let state_receiver = pending_dialog.state_receiver;
1819 let states = InviteDialogStates {
1822 is_client: false,
1823 session_id: self.session_id.clone(),
1824 track_id: track_id.clone(),
1825 event_sender: self.event_sender.clone(),
1826 media_stream: self.media_stream.clone(),
1827 call_state: self.call_state.clone(),
1828 cancel_token,
1829 terminated_reason: None,
1830 has_early_media: false,
1831 };
1832
1833 let initial_request = pending_dialog.dialog.initial_request();
1834 let offer = String::from_utf8_lossy(&initial_request.body).to_string();
1835
1836 let (ssrc, option) = {
1837 let call_state = call_state_ref.read().await;
1838 (
1839 call_state.ssrc,
1840 call_state.option.clone().unwrap_or_default(),
1841 )
1842 };
1843
1844 match self.setup_answer_track(ssrc, &option, offer).await {
1845 Ok((offer, track)) => {
1846 self.setup_track_with_stream(&option, track).await?;
1847 {
1848 let mut state = self.call_state.write().await;
1849 state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
1850 }
1851 }
1852 Err(e) => {
1853 return Err(anyhow::anyhow!("error creating track: {}", e));
1854 }
1855 }
1856
1857 let mut client_dialog_handler =
1858 DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
1859
1860 crate::spawn(async move {
1861 client_dialog_handler.process_dialog(states).await;
1862 });
1863 Ok(())
1864 }
1865}
1866
1867impl Drop for ActiveCall {
1868 fn drop(&mut self) {
1869 info!(session_id = self.session_id, "dropping active call");
1870 if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
1871 if let Some(record) = self.get_callrecord() {
1872 if let Err(e) = sender.send(record) {
1873 warn!(
1874 session_id = self.session_id,
1875 "failed to send call record: {}", e
1876 );
1877 }
1878 }
1879 }
1880 }
1881}
1882
1883impl ActiveCallState {
1884 pub fn merge_option(&self, mut option: CallOption) -> CallOption {
1885 if let Some(existing) = &self.option {
1886 if option.asr.is_none() {
1887 option.asr = existing.asr.clone();
1888 }
1889 if option.tts.is_none() {
1890 option.tts = existing.tts.clone();
1891 }
1892 if option.vad.is_none() {
1893 option.vad = existing.vad.clone();
1894 }
1895 if option.denoise.is_none() {
1896 option.denoise = existing.denoise;
1897 }
1898 if option.recorder.is_none() {
1899 option.recorder = existing.recorder.clone();
1900 }
1901 if option.eou.is_none() {
1902 option.eou = existing.eou.clone();
1903 }
1904 if option.extra.is_none() {
1905 option.extra = existing.extra.clone();
1906 }
1907 if option.ambiance.is_none() {
1908 option.ambiance = existing.ambiance.clone();
1909 }
1910 }
1911 option
1912 }
1913
1914 pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
1915 if self.hangup_reason.is_none() {
1916 self.hangup_reason = Some(reason);
1917 }
1918 }
1919
1920 pub fn build_hangup_event(
1921 &self,
1922 track_id: TrackId,
1923 initiator: Option<String>,
1924 ) -> crate::event::SessionEvent {
1925 let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
1926 let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
1927 let extra = self.extras.clone();
1928
1929 crate::event::SessionEvent::Hangup {
1930 track_id,
1931 timestamp: crate::media::get_timestamp(),
1932 reason: Some(format!("{:?}", self.hangup_reason)),
1933 initiator,
1934 start_time: self.start_time.to_rfc3339(),
1935 answer_time: self.answer_time.map(|t| t.to_rfc3339()),
1936 ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
1937 hangup_time: Utc::now().to_rfc3339(),
1938 extra,
1939 from: from.map(|f| f.into()),
1940 to: to.map(|f| f.into()),
1941 refer: Some(self.is_refer),
1942 }
1943 }
1944
1945 pub fn build_callrecord(
1946 &self,
1947 app_state: AppState,
1948 session_id: String,
1949 call_type: ActiveCallType,
1950 ) -> CallRecord {
1951 let option = self.option.clone().unwrap_or_default();
1952 let recorder = if option.recorder.is_some() {
1953 let recorder_file = app_state.get_recorder_file(&session_id);
1954 if std::path::Path::new(&recorder_file).exists() {
1955 let file_size = std::fs::metadata(&recorder_file)
1956 .map(|m| m.len())
1957 .unwrap_or(0);
1958 vec![crate::callrecord::CallRecordMedia {
1959 track_id: session_id.clone(),
1960 path: recorder_file,
1961 size: file_size,
1962 extra: None,
1963 }]
1964 } else {
1965 vec![]
1966 }
1967 } else {
1968 vec![]
1969 };
1970
1971 let dump_event_file = app_state.get_dump_events_file(&session_id);
1972 let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
1973 Some(dump_event_file)
1974 } else {
1975 None
1976 };
1977
1978 let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
1979 if let Ok(rc) = rc.try_read() {
1980 Some(Box::new(rc.build_callrecord(
1981 app_state.clone(),
1982 rc.session_id.clone(),
1983 ActiveCallType::B2bua,
1984 )))
1985 } else {
1986 None
1987 }
1988 });
1989
1990 let caller = option.caller.clone().unwrap_or_default();
1991 let callee = option.callee.clone().unwrap_or_default();
1992
1993 CallRecord {
1994 option: Some(option),
1995 call_id: session_id,
1996 call_type,
1997 start_time: self.start_time,
1998 ring_time: self.ring_time.clone(),
1999 answer_time: self.answer_time.clone(),
2000 end_time: Utc::now(),
2001 caller,
2002 callee,
2003 hangup_reason: self.hangup_reason.clone(),
2004 hangup_messages: Vec::new(),
2005 status_code: self.last_status_code,
2006 extras: self.extras.clone(),
2007 dump_event_file,
2008 recorder,
2009 refer_callrecord,
2010 }
2011 }
2012}