active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        recorder::RecorderOption,
11        stream::{MediaStream, MediaStreamBuilder},
12        track::{
13            Track, TrackConfig,
14            file::FileTrack,
15            media_pass::MediaPassTrack,
16            rtc::{RtcTrack, RtcTrackConfig},
17            tts::SynthesisHandle,
18            websocket::{WebsocketBytesReceiver, WebsocketTrack},
19        },
20    },
21    synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24    app::AppState,
25    call::{
26        CommandReceiver, CommandSender,
27        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28    },
29    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30    useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[derive(Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CallParams {
45    pub id: Option<String>,
46    #[serde(rename = "dump")]
47    pub dump_events: Option<bool>,
48    #[serde(rename = "ping")]
49    pub ping_interval: Option<u32>,
50    pub server_side_track: Option<String>,
51}
52
53#[derive(Debug, Serialize, Deserialize, Clone, Default)]
54#[serde(rename_all = "camelCase")]
55pub enum ActiveCallType {
56    Webrtc,
57    B2bua,
58    WebSocket,
59    #[default]
60    Sip,
61}
62
63#[derive(Default)]
64pub struct ActiveCallState {
65    pub session_id: String,
66    pub start_time: DateTime<Utc>,
67    pub ring_time: Option<DateTime<Utc>>,
68    pub answer_time: Option<DateTime<Utc>>,
69    pub hangup_reason: Option<CallRecordHangupReason>,
70    pub last_status_code: u16,
71    pub option: Option<CallOption>,
72    pub answer: Option<String>,
73    pub ssrc: u32,
74    pub refer_callstate: Option<ActiveCallStateRef>,
75    pub extras: Option<HashMap<String, serde_json::Value>>,
76    pub is_refer: bool,
77
78    // Runtime state (migrated from ActiveCall to reduce multiple locks)
79    pub tts_handle: Option<SynthesisHandle>,
80    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
81    pub wait_input_timeout: Option<u32>,
82    pub moh: Option<String>,
83    pub current_play_id: Option<String>,
84    pub audio_receiver: Option<WebsocketBytesReceiver>,
85    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
86}
87
88pub type ActiveCallRef = Arc<ActiveCall>;
89pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
90
91pub struct ActiveCall {
92    pub call_state: ActiveCallStateRef,
93    pub cancel_token: CancellationToken,
94    pub call_type: ActiveCallType,
95    pub session_id: String,
96    pub media_stream: Arc<MediaStream>,
97    pub track_config: TrackConfig,
98    pub event_sender: EventSender,
99    pub app_state: AppState,
100    pub invitation: Invitation,
101    pub cmd_sender: CommandSender,
102    pub dump_events: bool,
103    pub server_side_track_id: TrackId,
104}
105
106pub struct ActiveCallGuard {
107    pub call: ActiveCallRef,
108    pub active_calls: usize,
109}
110
111impl ActiveCallGuard {
112    pub fn new(call: ActiveCallRef) -> Self {
113        let active_calls = {
114            call.app_state
115                .total_calls
116                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117            let mut calls = call.app_state.active_calls.lock().unwrap();
118            calls.insert(call.session_id.clone(), call.clone());
119            calls.len()
120        };
121        Self { call, active_calls }
122    }
123}
124
125impl Drop for ActiveCallGuard {
126    fn drop(&mut self) {
127        self.call
128            .app_state
129            .active_calls
130            .lock()
131            .unwrap()
132            .remove(&self.call.session_id);
133    }
134}
135
136pub struct ActiveCallReceiver {
137    pub cmd_receiver: CommandReceiver,
138    pub dump_cmd_receiver: CommandReceiver,
139    pub dump_event_receiver: EventReceiver,
140}
141
142impl ActiveCall {
143    pub fn new(
144        call_type: ActiveCallType,
145        cancel_token: CancellationToken,
146        session_id: String,
147        invitation: Invitation,
148        app_state: AppState,
149        track_config: TrackConfig,
150        audio_receiver: Option<WebsocketBytesReceiver>,
151        dump_events: bool,
152        server_side_track_id: Option<TrackId>,
153        extras: Option<HashMap<String, serde_json::Value>>,
154    ) -> Self {
155        let event_sender = crate::event::create_event_sender();
156        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
157        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
158            .with_id(session_id.clone())
159            .with_cancel_token(cancel_token.child_token());
160        let media_stream = Arc::new(media_stream_builder.build());
161        let call_state = Arc::new(RwLock::new(ActiveCallState {
162            session_id: session_id.clone(),
163            start_time: Utc::now(),
164            ssrc: rand::random::<u32>(),
165            extras,
166            audio_receiver,
167            ..Default::default()
168        }));
169        Self {
170            cancel_token,
171            call_type,
172            session_id,
173            call_state,
174            media_stream,
175            track_config,
176            event_sender,
177            app_state,
178            invitation,
179            cmd_sender,
180            dump_events,
181            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
182        }
183    }
184
185    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
186        self.cmd_sender
187            .send(command)
188            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
189        Ok(())
190    }
191
192    /// Create a new ActiveCallReceiver for this ActiveCall
193    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
194    /// before calling `serve()`
195    pub fn new_receiver(&self) -> ActiveCallReceiver {
196        ActiveCallReceiver {
197            cmd_receiver: self.cmd_sender.subscribe(),
198            dump_cmd_receiver: self.cmd_sender.subscribe(),
199            dump_event_receiver: self.event_sender.subscribe(),
200        }
201    }
202
203    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
204        let ActiveCallReceiver {
205            mut cmd_receiver,
206            dump_cmd_receiver,
207            dump_event_receiver,
208        } = receiver;
209
210        let process_command_loop = async move {
211            while let Ok(command) = cmd_receiver.recv().await {
212                match self.dispatch(command).await {
213                    Ok(_) => (),
214                    Err(e) => {
215                        warn!(session_id = self.session_id, "{}", e);
216                        self.event_sender
217                            .send(SessionEvent::Error {
218                                track_id: self.session_id.clone(),
219                                timestamp: crate::media::get_timestamp(),
220                                sender: "command".to_string(),
221                                error: e.to_string(),
222                                code: None,
223                            })
224                            .ok();
225                    }
226                }
227            }
228        };
229        self.app_state
230            .total_calls
231            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
232
233        tokio::join!(
234            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
235            async {
236                select! {
237                    _ = process_command_loop => {
238                        info!(session_id = self.session_id, "command loop done");
239                    }
240                    _ = self.process() => {
241                        info!(session_id = self.session_id, "call serve done");
242                    }
243                    _ = self.cancel_token.cancelled() => {
244                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
245                    }
246                }
247                self.cancel_token.cancel();
248            }
249        );
250        Ok(())
251    }
252
253    async fn process(&self) -> Result<()> {
254        let mut event_receiver = self.event_sender.subscribe();
255
256        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
257        let input_timeout_expire_ref = input_timeout_expire.clone();
258        let event_sender = self.event_sender.clone();
259        let wait_input_timeout_loop = async {
260            loop {
261                let (start_time, expire) = { *input_timeout_expire.lock().await };
262                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
263                    info!(session_id = self.session_id, "wait input timeout reached");
264                    *input_timeout_expire.lock().await = (0, 0);
265                    event_sender
266                        .send(SessionEvent::Silence {
267                            track_id: self.server_side_track_id.clone(),
268                            timestamp: crate::media::get_timestamp(),
269                            start_time,
270                            duration: expire as u64,
271                            samples: None,
272                        })
273                        .ok();
274                }
275                sleep(Duration::from_millis(100)).await;
276            }
277        };
278        let server_side_track_id = self.server_side_track_id.clone();
279        let event_hook_loop = async move {
280            while let Ok(event) = event_receiver.recv().await {
281                match event {
282                    SessionEvent::Speaking { .. }
283                    | SessionEvent::Dtmf { .. }
284                    | SessionEvent::AsrDelta { .. }
285                    | SessionEvent::AsrFinal { .. }
286                    | SessionEvent::TrackStart { .. } => {
287                        *input_timeout_expire_ref.lock().await = (0, 0);
288                    }
289                    SessionEvent::TrackEnd {
290                        track_id,
291                        play_id,
292                        ssrc,
293                        ..
294                    } => {
295                        if track_id != server_side_track_id {
296                            continue;
297                        }
298
299                        let (moh_path, auto_hangup, wait_timeout_val) = {
300                            let mut state = self.call_state.write().await;
301                            if play_id != state.current_play_id {
302                                debug!(
303                                    session_id = self.session_id,
304                                    ?play_id,
305                                    current = ?state.current_play_id,
306                                    "ignoring interrupted track end"
307                                );
308                                continue;
309                            }
310                            state.current_play_id = None;
311                            (
312                                state.moh.clone(),
313                                state.auto_hangup.clone(),
314                                state.wait_input_timeout.take(),
315                            )
316                        };
317
318                        if let Some(path) = moh_path {
319                            info!(session_id = self.session_id, "looping moh: {}", path);
320                            let ssrc = rand::random::<u32>();
321                            let file_track = FileTrack::new(self.server_side_track_id.clone())
322                                .with_play_id(Some(path.clone()))
323                                .with_ssrc(ssrc)
324                                .with_path(path.clone())
325                                .with_cancel_token(self.cancel_token.child_token());
326                            self.update_track_wrapper(Box::new(file_track), Some(path))
327                                .await;
328                            continue;
329                        }
330
331                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
332                            if hangup_ssrc == ssrc {
333                                info!(
334                                    session_id = self.session_id,
335                                    ssrc, "auto hangup when track end track_id:{}", track_id
336                                );
337                                self.do_hangup(Some(hangup_reason), None).await.ok();
338                            }
339                        }
340
341                        if let Some(timeout) = wait_timeout_val {
342                            let expire = if timeout > 0 {
343                                (crate::media::get_timestamp(), timeout)
344                            } else {
345                                (0, 0)
346                            };
347                            *input_timeout_expire_ref.lock().await = expire;
348                        }
349                    }
350                    SessionEvent::Interrupt { receiver } => {
351                        let track_id =
352                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
353                        if track_id == self.server_side_track_id {
354                            debug!(
355                                session_id = self.session_id,
356                                "received interrupt event, stopping playback"
357                            );
358                            self.do_interrupt(true).await.ok();
359                        }
360                    }
361                    SessionEvent::Inactivity { track_id, .. } => {
362                        info!(
363                            session_id = self.session_id,
364                            track_id, "inactivity timeout reached, hanging up"
365                        );
366                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
367                            .await
368                            .ok();
369                    }
370                    SessionEvent::Error { track_id, .. } => {
371                        if track_id != server_side_track_id {
372                            continue;
373                        }
374
375                        let moh_info = {
376                            let mut state = self.call_state.write().await;
377                            if let Some(path) = state.moh.clone() {
378                                let fallback = "./config/sounds/refer_moh.wav".to_string();
379                                let next_path = if path != fallback
380                                    && std::path::Path::new(&fallback).exists()
381                                {
382                                    info!(
383                                        session_id = self.session_id,
384                                        "moh error, switching to fallback: {}", fallback
385                                    );
386                                    state.moh = Some(fallback.clone());
387                                    fallback
388                                } else {
389                                    info!(
390                                        session_id = self.session_id,
391                                        "looping moh on error: {}", path
392                                    );
393                                    path
394                                };
395                                Some(next_path)
396                            } else {
397                                None
398                            }
399                        };
400
401                        if let Some(next_path) = moh_info {
402                            let ssrc = rand::random::<u32>();
403                            let file_track = FileTrack::new(self.server_side_track_id.clone())
404                                .with_play_id(Some(next_path.clone()))
405                                .with_ssrc(ssrc)
406                                .with_path(next_path.clone())
407                                .with_cancel_token(self.cancel_token.child_token());
408                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
409                                .await;
410                            continue;
411                        }
412                    }
413                    _ => {}
414                }
415            }
416        };
417
418        select! {
419            _ = wait_input_timeout_loop=>{
420                info!(session_id = self.session_id, "wait input timeout loop done");
421            }
422            _ = self.media_stream.serve() => {
423                info!(session_id = self.session_id, "media stream loop done");
424            }
425            _ = event_hook_loop => {
426                info!(session_id = self.session_id, "event loop done");
427            }
428        }
429        Ok(())
430    }
431
432    async fn dispatch(&self, command: Command) -> Result<()> {
433        match command {
434            Command::Invite { option } => self.do_invite(option).await,
435            Command::Accept { option } => self.do_accept(option).await,
436            Command::Reject { reason, code } => {
437                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
438                    .await
439            }
440            Command::Ringing {
441                ringtone,
442                recorder,
443                early_media,
444            } => self.do_ringing(ringtone, recorder, early_media).await,
445            Command::Tts {
446                text,
447                speaker,
448                play_id,
449                auto_hangup,
450                streaming,
451                end_of_stream,
452                option,
453                wait_input_timeout,
454                base64,
455            } => {
456                self.do_tts(
457                    text,
458                    speaker,
459                    play_id,
460                    auto_hangup,
461                    streaming.unwrap_or_default(),
462                    end_of_stream.unwrap_or_default(),
463                    option,
464                    wait_input_timeout,
465                    base64.unwrap_or_default(),
466                )
467                .await
468            }
469            Command::Play {
470                url,
471                play_id,
472                auto_hangup,
473                wait_input_timeout,
474            } => {
475                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
476                    .await
477            }
478            Command::Hangup { reason, initiator } => {
479                let reason = reason.map(|r| {
480                    r.parse::<CallRecordHangupReason>()
481                        .unwrap_or(CallRecordHangupReason::BySystem)
482                });
483                self.do_hangup(reason, initiator).await
484            }
485            Command::Refer {
486                caller,
487                callee,
488                options,
489            } => self.do_refer(caller, callee, options).await,
490            Command::Mute { track_id } => self.do_mute(track_id).await,
491            Command::Unmute { track_id } => self.do_unmute(track_id).await,
492            Command::Pause {} => self.do_pause().await,
493            Command::Resume {} => self.do_resume().await,
494            Command::Interrupt {
495                graceful: passage,
496                fade_out_ms: _,
497            } => self.do_interrupt(passage.unwrap_or_default()).await,
498            Command::History { speaker, text } => self.do_history(speaker, text).await,
499        }
500    }
501
502    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
503        if let Some(recorder_option) = &option.recorder {
504            let mut recorder_file = recorder_option.recorder_file.clone();
505            if recorder_file.contains("{id}") {
506                recorder_file = recorder_file.replace("{id}", &self.session_id);
507            }
508
509            let recorder_file = if recorder_file.is_empty() {
510                self.app_state.get_recorder_file(&self.session_id)
511            } else {
512                let p = Path::new(&recorder_file);
513                p.is_absolute()
514                    .then(|| recorder_file.clone())
515                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
516            };
517            info!(
518                session_id = self.session_id,
519                recorder_file, "created recording file"
520            );
521
522            let track_samplerate = self.track_config.samplerate;
523            let recorder_samplerate = if track_samplerate > 0 {
524                track_samplerate
525            } else {
526                recorder_option.samplerate
527            };
528            let recorder_ptime = if recorder_option.ptime == 0 {
529                200
530            } else {
531                recorder_option.ptime
532            };
533            let requested_format = recorder_option
534                .format
535                .unwrap_or(self.app_state.config.recorder_format());
536            let format = requested_format.effective();
537            if requested_format != format {
538                warn!(
539                    session_id = self.session_id,
540                    requested = requested_format.extension(),
541                    "Recorder format fallback to wav due to unsupported feature"
542                );
543            }
544            let mut recorder_config = RecorderOption {
545                recorder_file,
546                samplerate: recorder_samplerate,
547                ptime: recorder_ptime,
548                format: Some(format),
549            };
550            recorder_config.ensure_path_extension(format);
551            Some(recorder_config)
552        } else {
553            None
554        }
555    }
556
557    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
558        // Merge with existing configuration (e.g., from playbook)
559        {
560            let state = self.call_state.read().await;
561            option = state.merge_option(option);
562        }
563
564        option.check_default();
565        if let Some(opt) = self.build_record_option(&option) {
566            self.media_stream.update_recorder_option(opt).await;
567        }
568
569        if let Some(opt) = &option.media_pass {
570            let track_id = self.server_side_track_id.clone();
571            let cancel_token = self.cancel_token.child_token();
572            let ssrc = rand::random::<u32>();
573            let media_pass_track = MediaPassTrack::new(
574                self.session_id.clone(),
575                ssrc,
576                track_id,
577                cancel_token,
578                opt.clone(),
579            );
580            self.update_track_wrapper(Box::new(media_pass_track), None)
581                .await;
582        }
583
584        info!(
585            session_id = self.session_id,
586            call_type = ?self.call_type,
587            sender,
588            ?option,
589            "caller with option"
590        );
591
592        match self.setup_caller_track(&option).await {
593            Ok(_) => return Ok(option),
594            Err(e) => {
595                self.app_state
596                    .total_failed_calls
597                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
598                let error_event = crate::event::SessionEvent::Error {
599                    track_id: self.session_id.clone(),
600                    timestamp: crate::media::get_timestamp(),
601                    sender,
602                    error: e.to_string(),
603                    code: None,
604                };
605                self.event_sender.send(error_event).ok();
606                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
607                    .await
608                    .ok();
609                return Err(e);
610            }
611        }
612    }
613
614    async fn do_invite(&self, option: CallOption) -> Result<()> {
615        self.invite_or_accept(option, "invite".to_string())
616            .await
617            .map(|_| ())
618    }
619
620    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
621        let has_pending = self.invitation.has_pending_call(&self.session_id).is_some();
622        let ready_to_answer_val = {
623            let state = self.call_state.read().await;
624            state.ready_to_answer.is_none()
625        };
626
627        if ready_to_answer_val {
628            if !has_pending {
629                // emit reject event
630                warn!(session_id = self.session_id, "no pending call to accept");
631                let rejet_event = crate::event::SessionEvent::Reject {
632                    track_id: self.session_id.clone(),
633                    timestamp: crate::media::get_timestamp(),
634                    reason: "no pending call".to_string(),
635                    refer: None,
636                    code: Some(486),
637                };
638                self.event_sender.send(rejet_event).ok();
639                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
640                    .await
641                    .ok();
642                return Err(anyhow::anyhow!("no pending call to accept"));
643            }
644            option = self.invite_or_accept(option, "accept".to_string()).await?;
645        } else {
646            option.check_default();
647            self.call_state.write().await.option = Some(option.clone());
648        }
649        info!(session_id = self.session_id, ?option, "accepting call");
650        let ready = self.call_state.write().await.ready_to_answer.take();
651        if let Some((answer, track, dialog)) = ready {
652            info!(
653                session_id = self.session_id,
654                track_id = track.as_ref().map(|t| t.id()),
655                "ready to answer with track"
656            );
657
658            let headers = vec![rsip::Header::ContentType(
659                "application/sdp".to_string().into(),
660            )];
661
662            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
663                Ok(_) => {
664                    {
665                        let mut state = self.call_state.write().await;
666                        state.answer = Some(answer);
667                        state.answer_time = Some(Utc::now());
668                    }
669                    self.finish_caller_stack(&option, track).await?;
670                }
671                Err(e) => {
672                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
673                    return Err(anyhow::anyhow!("failed to accept call"));
674                }
675            }
676        }
677        return Ok(());
678    }
679
680    async fn do_reject(
681        &self,
682        code: Option<rsip::StatusCode>,
683        reason: Option<String>,
684    ) -> Result<()> {
685        match self.invitation.has_pending_call(&self.session_id) {
686            Some(id) => {
687                info!(
688                    session_id = self.session_id,
689                    ?reason,
690                    ?code,
691                    "rejecting call"
692                );
693                self.invitation.hangup(id, code, reason).await
694            }
695            None => Ok(()),
696        }
697    }
698
699    async fn do_ringing(
700        &self,
701        ringtone: Option<String>,
702        recorder: Option<RecorderOption>,
703        early_media: Option<bool>,
704    ) -> Result<()> {
705        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
706        if ready_to_answer_val {
707            let option = CallOption {
708                recorder,
709                ..Default::default()
710            };
711            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
712        }
713
714        let state = self.call_state.read().await;
715        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
716            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
717                let headers = vec![rsip::Header::ContentType(
718                    "application/sdp".to_string().into(),
719                )];
720                (Some(headers), Some(answer.as_bytes().to_vec()))
721            } else {
722                (None, None)
723            };
724
725            dialog.ringing(headers, body).ok();
726            info!(
727                session_id = self.session_id,
728                ringtone, early_media, "playing ringtone"
729            );
730            if let Some(ringtone_url) = ringtone {
731                drop(state);
732                self.do_play(ringtone_url, None, None, None).await.ok();
733            } else {
734                info!(session_id = self.session_id, "no ringtone to play");
735            }
736        }
737        Ok(())
738    }
739
740    async fn do_tts(
741        &self,
742        text: String,
743        speaker: Option<String>,
744        play_id: Option<String>,
745        auto_hangup: Option<bool>,
746        streaming: bool,
747        end_of_stream: bool,
748        option: Option<SynthesisOption>,
749        wait_input_timeout: Option<u32>,
750        base64: bool,
751    ) -> Result<()> {
752        let tts_option = {
753            let call_state = self.call_state.read().await;
754            match call_state.option.clone().unwrap_or_default().tts {
755                Some(opt) => opt.merge_with(option),
756                None => {
757                    if let Some(opt) = option {
758                        opt
759                    } else {
760                        return Err(anyhow::anyhow!("no tts option available"));
761                    }
762                }
763            }
764        };
765        let speaker = match speaker {
766            Some(s) => Some(s),
767            None => tts_option.speaker.clone(),
768        };
769
770        let mut play_command = SynthesisCommand {
771            text,
772            speaker,
773            play_id: play_id.clone(),
774            streaming,
775            end_of_stream,
776            option: tts_option,
777            base64,
778        };
779        info!(
780            session_id = self.session_id,
781            provider = ?play_command.option.provider,
782            text = %play_command.text.chars().take(10).collect::<String>(),
783            speaker = play_command.speaker.as_deref(),
784            auto_hangup = auto_hangup.unwrap_or_default(),
785            play_id = play_command.play_id.as_deref(),
786            streaming = play_command.streaming,
787            end_of_stream = play_command.end_of_stream,
788            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
789            is_base64 = play_command.base64,
790            "new synthesis"
791        );
792
793        let ssrc = rand::random::<u32>();
794        let should_interrupt = {
795            let mut state = self.call_state.write().await;
796            state.auto_hangup = match auto_hangup {
797                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
798                _ => None,
799            };
800            state.wait_input_timeout = wait_input_timeout;
801
802            let changed = play_id.is_some() && state.current_play_id != play_id;
803            state.current_play_id = play_id.clone();
804            changed
805        };
806
807        if should_interrupt {
808            let _ = self.do_interrupt(false).await;
809        }
810
811        let existing_handle = self.call_state.read().await.tts_handle.clone();
812        if let Some(tts_handle) = existing_handle {
813            match tts_handle.try_send(play_command) {
814                Ok(_) => return Ok(()),
815                Err(e) => {
816                    play_command = e.0;
817                }
818            }
819        }
820
821        let (new_handle, tts_track) = StreamEngine::create_tts_track(
822            self.app_state.stream_engine.clone(),
823            self.cancel_token.child_token(),
824            self.session_id.clone(),
825            self.server_side_track_id.clone(),
826            ssrc,
827            play_id.clone(),
828            streaming,
829            &play_command.option,
830        )
831        .await?;
832
833        new_handle.try_send(play_command)?;
834        self.call_state.write().await.tts_handle = Some(new_handle);
835        self.update_track_wrapper(tts_track, play_id).await;
836        Ok(())
837    }
838
839    async fn do_play(
840        &self,
841        url: String,
842        play_id: Option<String>,
843        auto_hangup: Option<bool>,
844        wait_input_timeout: Option<u32>,
845    ) -> Result<()> {
846        let ssrc = rand::random::<u32>();
847        info!(
848            session_id = self.session_id,
849            ssrc, url, play_id, auto_hangup, "play file track"
850        );
851
852        let play_id = play_id.or(Some(url.clone()));
853
854        let file_track = FileTrack::new(self.server_side_track_id.clone())
855            .with_play_id(play_id.clone())
856            .with_ssrc(ssrc)
857            .with_path(url)
858            .with_cancel_token(self.cancel_token.child_token());
859
860        {
861            let mut state = self.call_state.write().await;
862            state.tts_handle = None;
863            state.auto_hangup = match auto_hangup {
864                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
865                _ => None,
866            };
867            state.wait_input_timeout = wait_input_timeout;
868        }
869
870        self.update_track_wrapper(Box::new(file_track), play_id)
871            .await;
872        Ok(())
873    }
874
875    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
876        self.event_sender
877            .send(SessionEvent::AddHistory {
878                sender: Some(self.session_id.clone()),
879                timestamp: crate::media::get_timestamp(),
880                speaker,
881                text,
882            })
883            .map(|_| ())
884            .map_err(Into::into)
885    }
886
887    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
888        {
889            let mut state = self.call_state.write().await;
890            state.tts_handle = None;
891            state.moh = None;
892        }
893        self.media_stream
894            .remove_track(&self.server_side_track_id, graceful)
895            .await;
896        Ok(())
897    }
898    async fn do_pause(&self) -> Result<()> {
899        Ok(())
900    }
901    async fn do_resume(&self) -> Result<()> {
902        Ok(())
903    }
904    async fn do_hangup(
905        &self,
906        reason: Option<CallRecordHangupReason>,
907        initiator: Option<String>,
908    ) -> Result<()> {
909        info!(
910            session_id = self.session_id,
911            ?reason,
912            ?initiator,
913            "do_hangup"
914        );
915
916        // Set hangup reason based on initiator and reason
917        let hangup_reason = match initiator.as_deref() {
918            Some("caller") => CallRecordHangupReason::ByCaller,
919            Some("callee") => CallRecordHangupReason::ByCallee,
920            Some("system") => CallRecordHangupReason::Autohangup,
921            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
922        };
923
924        self.media_stream
925            .stop(Some(hangup_reason.to_string()), initiator);
926
927        self.call_state
928            .write()
929            .await
930            .set_hangup_reason(hangup_reason);
931        Ok(())
932    }
933
934    async fn do_refer(
935        &self,
936        caller: String,
937        callee: String,
938        refer_option: Option<ReferOption>,
939    ) -> Result<()> {
940        self.do_interrupt(false).await.ok();
941        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
942        if let Some(ref path) = moh {
943            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
944                let fallback = "./config/sounds/refer_moh.wav";
945                if std::path::Path::new(fallback).exists() {
946                    info!(
947                        session_id = self.session_id,
948                        "moh {} not found, using fallback {}", path, fallback
949                    );
950                    moh = Some(fallback.to_string());
951                }
952            }
953        }
954        let session_id = self.session_id.clone();
955        let track_id = self.server_side_track_id.clone();
956
957        let recorder = {
958            let cs = self.call_state.read().await;
959            cs.option
960                .as_ref()
961                .map(|o| o.recorder.clone())
962                .unwrap_or_default()
963        };
964
965        let call_option = CallOption {
966            caller: Some(caller),
967            callee: Some(callee.clone()),
968            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
969            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
970            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
971            recorder,
972            ..Default::default()
973        };
974
975        let mut invite_option = call_option.build_invite_option()?;
976        invite_option.call_id = Some(self.session_id.clone());
977
978        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
979
980        {
981            let cs = self.call_state.read().await;
982            if let Some(opt) = cs.option.as_ref() {
983                if let Some(callee) = opt.callee.as_ref() {
984                    headers.push(rsip::Header::Other(
985                        "X-Referred-To".to_string(),
986                        callee.clone(),
987                    ));
988                }
989                if let Some(caller) = opt.caller.as_ref() {
990                    headers.push(rsip::Header::Other(
991                        "X-Referred-From".to_string(),
992                        caller.clone(),
993                    ));
994                }
995            }
996        }
997
998        headers.push(rsip::Header::Other(
999            "X-Referred-Id".to_string(),
1000            self.session_id.clone(),
1001        ));
1002
1003        let ssrc = rand::random::<u32>();
1004        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1005            start_time: Utc::now(),
1006            ssrc,
1007            option: Some(call_option.clone()),
1008            is_refer: true,
1009            ..Default::default()
1010        }));
1011
1012        {
1013            let mut cs = self.call_state.write().await;
1014            cs.refer_callstate.replace(refer_call_state.clone());
1015        }
1016
1017        let auto_hangup_requested = refer_option
1018            .as_ref()
1019            .and_then(|o| o.auto_hangup)
1020            .unwrap_or(true);
1021
1022        if auto_hangup_requested {
1023            self.call_state.write().await.auto_hangup =
1024                Some((ssrc, CallRecordHangupReason::ByRefer));
1025        } else {
1026            self.call_state.write().await.auto_hangup = None;
1027        }
1028
1029        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1030
1031        info!(
1032            session_id = self.session_id,
1033            ssrc,
1034            auto_hangup = auto_hangup_requested,
1035            callee,
1036            timeout_secs,
1037            "do_refer"
1038        );
1039
1040        let r = tokio::time::timeout(
1041            Duration::from_secs(timeout_secs as u64),
1042            self.create_outgoing_sip_track(
1043                self.cancel_token.child_token(),
1044                refer_call_state.clone(),
1045                &track_id,
1046                invite_option,
1047                &call_option,
1048                moh,
1049                auto_hangup_requested,
1050            ),
1051        )
1052        .await;
1053
1054        {
1055            self.call_state.write().await.moh = None;
1056        }
1057
1058        let result = match r {
1059            Ok(res) => res,
1060            Err(_) => {
1061                warn!(
1062                    session_id = session_id,
1063                    "refer sip track creation timed out after {} seconds", timeout_secs
1064                );
1065                self.event_sender
1066                    .send(SessionEvent::Reject {
1067                        track_id,
1068                        timestamp: crate::media::get_timestamp(),
1069                        reason: "Timeout when refer".into(),
1070                        code: Some(408),
1071                        refer: Some(true),
1072                    })
1073                    .ok();
1074                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1075            }
1076        };
1077
1078        match result {
1079            Ok(answer) => {
1080                self.event_sender
1081                    .send(SessionEvent::Answer {
1082                        timestamp: crate::media::get_timestamp(),
1083                        track_id,
1084                        sdp: answer,
1085                        refer: Some(true),
1086                    })
1087                    .ok();
1088            }
1089            Err(e) => {
1090                warn!(
1091                    session_id = session_id,
1092                    "failed to create refer sip track: {}", e
1093                );
1094                match &e {
1095                    rsipstack::Error::DialogError(reason, _, code) => {
1096                        self.event_sender
1097                            .send(SessionEvent::Reject {
1098                                track_id,
1099                                timestamp: crate::media::get_timestamp(),
1100                                reason: reason.clone(),
1101                                code: Some(code.code() as u32),
1102                                refer: Some(true),
1103                            })
1104                            .ok();
1105                    }
1106                    _ => {}
1107                }
1108                return Err(e.into());
1109            }
1110        }
1111        Ok(())
1112    }
1113
1114    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1115        self.media_stream.mute_track(track_id).await;
1116        Ok(())
1117    }
1118
1119    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1120        self.media_stream.unmute_track(track_id).await;
1121        Ok(())
1122    }
1123
1124    pub async fn cleanup(&self) -> Result<()> {
1125        self.call_state.write().await.tts_handle = None;
1126        self.media_stream.cleanup().await.ok();
1127        Ok(())
1128    }
1129
1130    pub fn get_callrecord(&self) -> Option<CallRecord> {
1131        self.call_state.try_read().ok().map(|call_state| {
1132            call_state.build_callrecord(
1133                self.app_state.clone(),
1134                self.session_id.clone(),
1135                self.call_type.clone(),
1136            )
1137        })
1138    }
1139
1140    async fn dump_to_file(
1141        &self,
1142        dump_file: &mut File,
1143        cmd_receiver: &mut CommandReceiver,
1144        event_receiver: &mut EventReceiver,
1145    ) {
1146        loop {
1147            select! {
1148                _ = self.cancel_token.cancelled() => {
1149                    break;
1150                }
1151                Ok(cmd) = cmd_receiver.recv() => {
1152                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1153                        .await;
1154                }
1155                Ok(event) = event_receiver.recv() => {
1156                    if matches!(event, SessionEvent::Binary{..}) {
1157                        continue;
1158                    }
1159                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1160                        .await;
1161                }
1162            };
1163        }
1164    }
1165
1166    async fn dump_loop(
1167        &self,
1168        dump_events: bool,
1169        mut dump_cmd_receiver: CommandReceiver,
1170        mut dump_event_receiver: EventReceiver,
1171    ) {
1172        if !dump_events {
1173            return;
1174        }
1175
1176        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1177        let mut dump_file = match File::options()
1178            .create(true)
1179            .append(true)
1180            .open(&file_name)
1181            .await
1182        {
1183            Ok(file) => file,
1184            Err(e) => {
1185                warn!(
1186                    session_id = self.session_id,
1187                    file_name, "failed to open dump events file: {}", e
1188                );
1189                return;
1190            }
1191        };
1192        self.dump_to_file(
1193            &mut dump_file,
1194            &mut dump_cmd_receiver,
1195            &mut dump_event_receiver,
1196        )
1197        .await;
1198
1199        while let Ok(event) = dump_event_receiver.try_recv() {
1200            if matches!(event, SessionEvent::Binary { .. }) {
1201                continue;
1202            }
1203            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1204        }
1205    }
1206
1207    pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1208        let mut rtc_config = RtcTrackConfig::default();
1209        rtc_config.mode = rustrtc::TransportMode::Rtp;
1210
1211        if let Some(codecs) = &self.app_state.config.codecs {
1212            let mut codec_types = Vec::new();
1213            for c in codecs {
1214                match c.to_lowercase().as_str() {
1215                    "pcmu" => codec_types.push(CodecType::PCMU),
1216                    "pcma" => codec_types.push(CodecType::PCMA),
1217                    "g722" => codec_types.push(CodecType::G722),
1218                    "g729" => codec_types.push(CodecType::G729),
1219                    "opus" => codec_types.push(CodecType::Opus),
1220                    "dtmf" | "2833" | "telephone_event" => {
1221                        codec_types.push(CodecType::TelephoneEvent)
1222                    }
1223                    _ => {}
1224                }
1225            }
1226            if !codec_types.is_empty() {
1227                rtc_config.preferred_codec = Some(codec_types[0].clone());
1228                rtc_config.codecs = codec_types;
1229            }
1230        }
1231
1232        if rtc_config.preferred_codec.is_none() {
1233            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1234        }
1235
1236        rtc_config.rtp_port_range = self
1237            .app_state
1238            .config
1239            .rtp_start_port
1240            .zip(self.app_state.config.rtp_end_port);
1241
1242        if let Some(ref external_ip) = self.app_state.config.external_ip {
1243            rtc_config.external_ip = Some(external_ip.clone());
1244        }
1245
1246        let mut track = RtcTrack::new(
1247            self.cancel_token.child_token(),
1248            track_id,
1249            self.track_config.clone(),
1250            rtc_config,
1251        )
1252        .with_ssrc(ssrc);
1253
1254        track.create().await?;
1255
1256        Ok(track)
1257    }
1258
1259    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1260        self.call_state.write().await.option = Some(option.clone());
1261        info!(
1262            session_id = self.session_id,
1263            call_type = ?self.call_type,
1264            "setup caller track"
1265        );
1266
1267        let track = match self.call_type {
1268            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1269            ActiveCallType::WebSocket => {
1270                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1271                if let Some(receiver) = audio_receiver {
1272                    Some(self.create_websocket_track(receiver).await?)
1273                } else {
1274                    None
1275                }
1276            }
1277            ActiveCallType::Sip => {
1278                if let Some(pending_dialog) = self.invitation.get_pending_call(&self.session_id) {
1279                    return self
1280                        .prepare_incoming_sip_track(
1281                            self.cancel_token.clone(),
1282                            self.call_state.clone(),
1283                            &self.session_id,
1284                            pending_dialog,
1285                        )
1286                        .await;
1287                }
1288
1289                let invite_option = option.build_invite_option()?;
1290                match self
1291                    .create_outgoing_sip_track(
1292                        self.cancel_token.clone(),
1293                        self.call_state.clone(),
1294                        &self.session_id,
1295                        invite_option,
1296                        &option,
1297                        None,
1298                        false,
1299                    )
1300                    .await
1301                {
1302                    Ok(answer) => {
1303                        self.event_sender
1304                            .send(SessionEvent::Answer {
1305                                timestamp: crate::media::get_timestamp(),
1306                                track_id: self.session_id.clone(),
1307                                sdp: answer,
1308                                refer: Some(false),
1309                            })
1310                            .ok();
1311                        return Ok(());
1312                    }
1313                    Err(e) => {
1314                        warn!(
1315                            session_id = self.session_id,
1316                            "failed to create sip track: {}", e
1317                        );
1318                        match &e {
1319                            rsipstack::Error::DialogError(reason, _, code) => {
1320                                self.event_sender
1321                                    .send(SessionEvent::Reject {
1322                                        track_id: self.session_id.clone(),
1323                                        timestamp: crate::media::get_timestamp(),
1324                                        reason: reason.clone(),
1325                                        code: Some(code.code() as u32),
1326                                        refer: Some(false),
1327                                    })
1328                                    .ok();
1329                            }
1330                            _ => {}
1331                        }
1332                        return Err(e.into());
1333                    }
1334                }
1335            }
1336            ActiveCallType::B2bua => match self.invitation.get_pending_call(&self.session_id) {
1337                Some(pending_dialog) => {
1338                    return self
1339                        .prepare_incoming_sip_track(
1340                            self.cancel_token.clone(),
1341                            self.call_state.clone(),
1342                            &self.session_id,
1343                            pending_dialog,
1344                        )
1345                        .await;
1346                }
1347                None => {
1348                    warn!(
1349                        session_id = self.session_id,
1350                        "no pending dialog found for B2BUA call"
1351                    );
1352                    return Err(anyhow::anyhow!(
1353                        "no pending dialog found for session_id: {}",
1354                        self.session_id
1355                    ));
1356                }
1357            },
1358        };
1359        match track {
1360            Some(track) => {
1361                self.finish_caller_stack(&option, Some(track)).await?;
1362            }
1363            None => {
1364                warn!(session_id = self.session_id, "no track created for caller");
1365                return Err(anyhow::anyhow!("no track created for caller"));
1366            }
1367        }
1368        Ok(())
1369    }
1370
1371    async fn finish_caller_stack(
1372        &self,
1373        option: &CallOption,
1374        track: Option<Box<dyn Track>>,
1375    ) -> Result<()> {
1376        if let Some(track) = track {
1377            self.setup_track_with_stream(&option, track).await?;
1378        }
1379
1380        {
1381            let call_state = self.call_state.read().await;
1382            if let Some(ref answer) = call_state.answer {
1383                info!(
1384                    session_id = self.session_id,
1385                    "sending answer event: {}", answer,
1386                );
1387                self.event_sender
1388                    .send(SessionEvent::Answer {
1389                        timestamp: crate::media::get_timestamp(),
1390                        track_id: self.session_id.clone(),
1391                        sdp: answer.clone(),
1392                        refer: Some(false),
1393                    })
1394                    .ok();
1395            } else {
1396                warn!(
1397                    session_id = self.session_id,
1398                    "no answer in state to send event"
1399                );
1400            }
1401        }
1402        Ok(())
1403    }
1404
1405    pub async fn setup_track_with_stream(
1406        &self,
1407        option: &CallOption,
1408        mut track: Box<dyn Track>,
1409    ) -> Result<()> {
1410        let processors = match StreamEngine::create_processors(
1411            self.app_state.stream_engine.clone(),
1412            track.as_ref(),
1413            self.cancel_token.child_token(),
1414            self.event_sender.clone(),
1415            self.media_stream.packet_sender.clone(),
1416            option,
1417        )
1418        .await
1419        {
1420            Ok(processors) => processors,
1421            Err(e) => {
1422                warn!(
1423                    session_id = self.session_id,
1424                    "failed to prepare stream processors: {}", e
1425                );
1426                vec![]
1427            }
1428        };
1429
1430        // Add all processors from the hook
1431        for processor in processors {
1432            track.append_processor(processor);
1433        }
1434
1435        self.update_track_wrapper(track, None).await;
1436        Ok(())
1437    }
1438
1439    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1440        let ambiance_opt = {
1441            let state = self.call_state.read().await;
1442            let mut opt = state
1443                .option
1444                .as_ref()
1445                .and_then(|o| o.ambiance.clone())
1446                .unwrap_or_default();
1447
1448            if let Some(global) = &self.app_state.config.ambiance {
1449                opt.merge(global);
1450            }
1451            opt
1452        };
1453        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1454            match AmbianceProcessor::new(ambiance_opt).await {
1455                Ok(ambiance) => {
1456                    info!(session_id = self.session_id, "loaded ambiance processor");
1457                    track.append_processor(Box::new(ambiance));
1458                }
1459                Err(e) => {
1460                    tracing::error!("failed to load ambiance wav {}", e);
1461                }
1462            }
1463        }
1464        self.call_state.write().await.current_play_id = play_id.clone();
1465        self.media_stream.update_track(track, play_id).await;
1466    }
1467
1468    pub async fn create_websocket_track(
1469        &self,
1470        audio_receiver: WebsocketBytesReceiver,
1471    ) -> Result<Box<dyn Track>> {
1472        let (ssrc, codec) = {
1473            let call_state = self.call_state.read().await;
1474            (
1475                call_state.ssrc,
1476                call_state
1477                    .option
1478                    .as_ref()
1479                    .map(|o| o.codec.clone())
1480                    .unwrap_or_default(),
1481            )
1482        };
1483
1484        let ws_track = WebsocketTrack::new(
1485            self.cancel_token.child_token(),
1486            self.session_id.clone(),
1487            self.track_config.clone(),
1488            self.event_sender.clone(),
1489            audio_receiver,
1490            codec,
1491            ssrc,
1492        );
1493
1494        {
1495            let mut call_state = self.call_state.write().await;
1496            call_state.answer_time = Some(Utc::now());
1497            call_state.answer = Some("".to_string());
1498            call_state.last_status_code = 200;
1499        }
1500
1501        Ok(Box::new(ws_track))
1502    }
1503
1504    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1505        let (ssrc, option) = {
1506            let call_state = self.call_state.read().await;
1507            (
1508                call_state.ssrc,
1509                call_state.option.clone().unwrap_or_default(),
1510            )
1511        };
1512
1513        let mut rtc_config = RtcTrackConfig::default();
1514        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1515        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1516
1517        if let Some(codecs) = &self.app_state.config.codecs {
1518            let mut codec_types = Vec::new();
1519            for c in codecs {
1520                match c.to_lowercase().as_str() {
1521                    "pcmu" => codec_types.push(CodecType::PCMU),
1522                    "pcma" => codec_types.push(CodecType::PCMA),
1523                    "g722" => codec_types.push(CodecType::G722),
1524                    "g729" => codec_types.push(CodecType::G729),
1525                    "opus" => codec_types.push(CodecType::Opus),
1526                    "dtmf" | "2833" | "telephone_event" => {
1527                        codec_types.push(CodecType::TelephoneEvent)
1528                    }
1529                    _ => {}
1530                }
1531            }
1532            if !codec_types.is_empty() {
1533                rtc_config.preferred_codec = Some(codec_types[0].clone());
1534                rtc_config.codecs = codec_types;
1535            }
1536        }
1537
1538        if let Some(ref external_ip) = self.app_state.config.external_ip {
1539            rtc_config.external_ip = Some(external_ip.clone());
1540        }
1541
1542        let mut webrtc_track = RtcTrack::new(
1543            self.cancel_token.child_token(),
1544            self.session_id.clone(),
1545            self.track_config.clone(),
1546            rtc_config,
1547        )
1548        .with_ssrc(ssrc);
1549
1550        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1551        let offer = match option.enable_ipv6 {
1552            Some(false) | None => {
1553                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1554            }
1555            _ => option.offer.clone().unwrap_or("".to_string()),
1556        };
1557        let answer: Option<String>;
1558        match webrtc_track.handshake(offer, timeout).await {
1559            Ok(answer_sdp) => {
1560                answer = match option.enable_ipv6 {
1561                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1562                    Some(true) => Some(answer_sdp.to_string()),
1563                };
1564            }
1565            Err(e) => {
1566                warn!(session_id = self.session_id, "failed to setup track: {}", e);
1567                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1568            }
1569        }
1570
1571        {
1572            let mut call_state = self.call_state.write().await;
1573            call_state.answer_time = Some(Utc::now());
1574            call_state.answer = answer;
1575            call_state.last_status_code = 200;
1576        }
1577        Ok(Box::new(webrtc_track))
1578    }
1579
1580    async fn create_outgoing_sip_track(
1581        &self,
1582        cancel_token: CancellationToken,
1583        call_state_ref: ActiveCallStateRef,
1584        track_id: &String,
1585        mut invite_option: InviteOption,
1586        call_option: &CallOption,
1587        moh: Option<String>,
1588        auto_hangup: bool,
1589    ) -> Result<String, rsipstack::Error> {
1590        let ssrc = call_state_ref.read().await.ssrc;
1591        let rtp_track = self
1592            .create_rtp_track(track_id.clone(), ssrc)
1593            .await
1594            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1595
1596        let offer = Some(
1597            rtp_track
1598                .local_description()
1599                .await
1600                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1601        );
1602
1603        {
1604            let mut cs = call_state_ref.write().await;
1605            if let Some(o) = cs.option.as_mut() {
1606                o.offer = offer.clone();
1607            }
1608            cs.start_time = Utc::now();
1609        };
1610
1611        invite_option.offer = offer.clone().map(|s| s.into());
1612
1613        // Set contact to local SIP endpoint address if not already set explicitly
1614        // Check if contact is still default (no scheme set) or if host is localhost-like
1615        let needs_contact = invite_option.contact.scheme.is_none()
1616            || invite_option
1617                .contact
1618                .host_with_port
1619                .to_string()
1620                .starts_with("127.0.0.1");
1621
1622        if needs_contact {
1623            if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1624                invite_option.contact = rsip::Uri {
1625                    scheme: Some(rsip::Scheme::Sip),
1626                    auth: None,
1627                    host_with_port: addr.addr.clone(),
1628                    params: vec![],
1629                    headers: vec![],
1630                };
1631            }
1632        }
1633
1634        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1635
1636        if let Some(moh) = moh {
1637            let ssrc_and_moh = {
1638                let mut state = call_state_ref.write().await;
1639                state.moh = Some(moh.clone());
1640                if state.current_play_id.is_none() {
1641                    let ssrc = rand::random::<u32>();
1642                    Some((ssrc, moh.clone()))
1643                } else {
1644                    info!(
1645                        session_id = self.session_id,
1646                        "Something is playing, MOH will start after it ends"
1647                    );
1648                    None
1649                }
1650            };
1651
1652            if let Some((ssrc, moh_path)) = ssrc_and_moh {
1653                let file_track = FileTrack::new(self.server_side_track_id.clone())
1654                    .with_play_id(Some(moh_path.clone()))
1655                    .with_ssrc(ssrc)
1656                    .with_path(moh_path.clone())
1657                    .with_cancel_token(self.cancel_token.child_token());
1658                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1659                    .await;
1660            }
1661        } else {
1662            let track = rtp_track_to_setup.take().unwrap();
1663            self.setup_track_with_stream(&call_option, track)
1664                .await
1665                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1666        }
1667
1668        info!(
1669            session_id = self.session_id,
1670            track_id,
1671            contact = %invite_option.contact,
1672            "invite {} -> {} offer: \n{}",
1673            invite_option.caller,
1674            invite_option.callee,
1675            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1676        );
1677
1678        let (dlg_state_sender, dlg_state_receiver) =
1679            self.invitation.dialog_layer.new_dialog_state_channel();
1680
1681        let states = InviteDialogStates {
1682            is_client: true,
1683            session_id: self.session_id.clone(),
1684            track_id: track_id.clone(),
1685            event_sender: self.event_sender.clone(),
1686            media_stream: self.media_stream.clone(),
1687            call_state: call_state_ref.clone(),
1688            cancel_token,
1689            terminated_reason: None,
1690            has_early_media: false,
1691        };
1692
1693        let mut client_dialog_handler =
1694            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1695
1696        crate::spawn(async move {
1697            client_dialog_handler.process_dialog(states).await;
1698        });
1699
1700        let (dialog_id, answer) = self
1701            .invitation
1702            .invite(invite_option, dlg_state_sender)
1703            .await?;
1704
1705        self.call_state.write().await.moh = None;
1706
1707        if let Some(track) = rtp_track_to_setup {
1708            self.setup_track_with_stream(&call_option, track)
1709                .await
1710                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1711        }
1712
1713        let answer = match answer {
1714            Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1715            None => {
1716                warn!(session_id = self.session_id, "no answer received");
1717                return Err(rsipstack::Error::DialogError(
1718                    "No answer received".to_string(),
1719                    dialog_id,
1720                    rsip::StatusCode::NotAcceptableHere,
1721                ));
1722            }
1723        };
1724
1725        {
1726            let mut cs = call_state_ref.write().await;
1727            if cs.answer.is_none() {
1728                cs.answer = Some(answer.clone());
1729            }
1730            if auto_hangup {
1731                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1732            }
1733        }
1734
1735        self.media_stream
1736            .update_remote_description(&track_id, &answer)
1737            .await
1738            .ok();
1739
1740        Ok(answer)
1741    }
1742
1743    /// Detect if SDP is WebRTC format
1744    pub fn is_webrtc_sdp(sdp: &str) -> bool {
1745        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1746            && sdp.contains("a=fingerprint:")
1747    }
1748
1749    pub async fn setup_answer_track(
1750        &self,
1751        ssrc: u32,
1752        option: &CallOption,
1753        offer: String,
1754    ) -> Result<(String, Box<dyn Track>)> {
1755        let offer = match option.enable_ipv6 {
1756            Some(false) | None => strip_ipv6_candidates(&offer),
1757            _ => offer.clone(),
1758        };
1759
1760        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1761
1762        let mut media_track = if Self::is_webrtc_sdp(&offer) {
1763            let mut rtc_config = RtcTrackConfig::default();
1764            rtc_config.mode = rustrtc::TransportMode::WebRtc;
1765            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1766            if let Some(ref external_ip) = self.app_state.config.external_ip {
1767                rtc_config.external_ip = Some(external_ip.clone());
1768            }
1769
1770            let webrtc_track = RtcTrack::new(
1771                self.cancel_token.child_token(),
1772                self.session_id.clone(),
1773                self.track_config.clone(),
1774                rtc_config,
1775            )
1776            .with_ssrc(ssrc);
1777
1778            Box::new(webrtc_track) as Box<dyn Track>
1779        } else {
1780            let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1781            Box::new(rtp_track) as Box<dyn Track>
1782        };
1783
1784        let answer = match media_track.handshake(offer.clone(), timeout).await {
1785            Ok(answer) => answer,
1786            Err(e) => {
1787                return Err(anyhow::anyhow!("handshake failed: {e}"));
1788            }
1789        };
1790
1791        return Ok((answer, media_track));
1792    }
1793
1794    pub async fn prepare_incoming_sip_track(
1795        &self,
1796        cancel_token: CancellationToken,
1797        call_state_ref: ActiveCallStateRef,
1798        track_id: &String,
1799        pending_dialog: PendingDialog,
1800    ) -> Result<()> {
1801        let state_receiver = pending_dialog.state_receiver;
1802        //let pending_token_clone = pending_dialog.token;
1803
1804        let states = InviteDialogStates {
1805            is_client: false,
1806            session_id: self.session_id.clone(),
1807            track_id: track_id.clone(),
1808            event_sender: self.event_sender.clone(),
1809            media_stream: self.media_stream.clone(),
1810            call_state: self.call_state.clone(),
1811            cancel_token,
1812            terminated_reason: None,
1813            has_early_media: false,
1814        };
1815
1816        let initial_request = pending_dialog.dialog.initial_request();
1817        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
1818
1819        let (ssrc, option) = {
1820            let call_state = call_state_ref.read().await;
1821            (
1822                call_state.ssrc,
1823                call_state.option.clone().unwrap_or_default(),
1824            )
1825        };
1826
1827        match self.setup_answer_track(ssrc, &option, offer).await {
1828            Ok((offer, track)) => {
1829                self.setup_track_with_stream(&option, track).await?;
1830                {
1831                    let mut state = self.call_state.write().await;
1832                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
1833                }
1834            }
1835            Err(e) => {
1836                return Err(anyhow::anyhow!("error creating track: {}", e));
1837            }
1838        }
1839
1840        let mut client_dialog_handler =
1841            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
1842
1843        crate::spawn(async move {
1844            client_dialog_handler.process_dialog(states).await;
1845        });
1846        Ok(())
1847    }
1848}
1849
1850impl Drop for ActiveCall {
1851    fn drop(&mut self) {
1852        info!(session_id = self.session_id, "dropping active call");
1853        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
1854            if let Some(record) = self.get_callrecord() {
1855                if let Err(e) = sender.send(record) {
1856                    warn!(
1857                        session_id = self.session_id,
1858                        "failed to send call record: {}", e
1859                    );
1860                }
1861            }
1862        }
1863    }
1864}
1865
1866impl ActiveCallState {
1867    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
1868        if let Some(existing) = &self.option {
1869            if option.asr.is_none() {
1870                option.asr = existing.asr.clone();
1871            }
1872            if option.tts.is_none() {
1873                option.tts = existing.tts.clone();
1874            }
1875            if option.vad.is_none() {
1876                option.vad = existing.vad.clone();
1877            }
1878            if option.denoise.is_none() {
1879                option.denoise = existing.denoise;
1880            }
1881            if option.recorder.is_none() {
1882                option.recorder = existing.recorder.clone();
1883            }
1884            if option.eou.is_none() {
1885                option.eou = existing.eou.clone();
1886            }
1887            if option.extra.is_none() {
1888                option.extra = existing.extra.clone();
1889            }
1890            if option.ambiance.is_none() {
1891                option.ambiance = existing.ambiance.clone();
1892            }
1893        }
1894        option
1895    }
1896
1897    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
1898        if self.hangup_reason.is_none() {
1899            self.hangup_reason = Some(reason);
1900        }
1901    }
1902
1903    pub fn build_hangup_event(
1904        &self,
1905        track_id: TrackId,
1906        initiator: Option<String>,
1907    ) -> crate::event::SessionEvent {
1908        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
1909        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
1910        let extra = self.extras.clone();
1911
1912        crate::event::SessionEvent::Hangup {
1913            track_id,
1914            timestamp: crate::media::get_timestamp(),
1915            reason: Some(format!("{:?}", self.hangup_reason)),
1916            initiator,
1917            start_time: self.start_time.to_rfc3339(),
1918            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
1919            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
1920            hangup_time: Utc::now().to_rfc3339(),
1921            extra,
1922            from: from.map(|f| f.into()),
1923            to: to.map(|f| f.into()),
1924            refer: Some(self.is_refer),
1925        }
1926    }
1927
1928    pub fn build_callrecord(
1929        &self,
1930        app_state: AppState,
1931        session_id: String,
1932        call_type: ActiveCallType,
1933    ) -> CallRecord {
1934        let option = self.option.clone().unwrap_or_default();
1935        let recorder = if option.recorder.is_some() {
1936            let recorder_file = app_state.get_recorder_file(&session_id);
1937            if std::path::Path::new(&recorder_file).exists() {
1938                let file_size = std::fs::metadata(&recorder_file)
1939                    .map(|m| m.len())
1940                    .unwrap_or(0);
1941                vec![crate::callrecord::CallRecordMedia {
1942                    track_id: session_id.clone(),
1943                    path: recorder_file,
1944                    size: file_size,
1945                    extra: None,
1946                }]
1947            } else {
1948                vec![]
1949            }
1950        } else {
1951            vec![]
1952        };
1953
1954        let dump_event_file = app_state.get_dump_events_file(&session_id);
1955        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
1956            Some(dump_event_file)
1957        } else {
1958            None
1959        };
1960
1961        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
1962            if let Ok(rc) = rc.try_read() {
1963                Some(Box::new(rc.build_callrecord(
1964                    app_state.clone(),
1965                    rc.session_id.clone(),
1966                    ActiveCallType::B2bua,
1967                )))
1968            } else {
1969                None
1970            }
1971        });
1972
1973        let caller = option.caller.clone().unwrap_or_default();
1974        let callee = option.callee.clone().unwrap_or_default();
1975
1976        CallRecord {
1977            option: Some(option),
1978            call_id: session_id,
1979            call_type,
1980            start_time: self.start_time,
1981            ring_time: self.ring_time.clone(),
1982            answer_time: self.answer_time.clone(),
1983            end_time: Utc::now(),
1984            caller,
1985            callee,
1986            hangup_reason: self.hangup_reason.clone(),
1987            hangup_messages: Vec::new(),
1988            status_code: self.last_status_code,
1989            extras: self.extras.clone(),
1990            dump_event_file,
1991            recorder,
1992            refer_callrecord,
1993        }
1994    }
1995}