active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        recorder::RecorderOption,
11        stream::{MediaStream, MediaStreamBuilder},
12        track::{
13            Track, TrackConfig,
14            file::FileTrack,
15            media_pass::MediaPassTrack,
16            rtc::{RtcTrack, RtcTrackConfig},
17            tts::SynthesisHandle,
18            websocket::{WebsocketBytesReceiver, WebsocketTrack},
19        },
20    },
21    synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24    app::AppState,
25    call::{
26        CommandReceiver, CommandSender,
27        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28    },
29    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30    useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[derive(Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CallParams {
45    pub id: Option<String>,
46    #[serde(rename = "dump")]
47    pub dump_events: Option<bool>,
48    #[serde(rename = "ping")]
49    pub ping_interval: Option<u32>,
50    pub server_side_track: Option<String>,
51}
52
53#[derive(Debug, Serialize, Deserialize, Clone, Default)]
54#[serde(rename_all = "camelCase")]
55pub enum ActiveCallType {
56    Webrtc,
57    B2bua,
58    WebSocket,
59    #[default]
60    Sip,
61}
62
63#[derive(Default)]
64pub struct ActiveCallState {
65    pub session_id: String,
66    pub start_time: DateTime<Utc>,
67    pub ring_time: Option<DateTime<Utc>>,
68    pub answer_time: Option<DateTime<Utc>>,
69    pub hangup_reason: Option<CallRecordHangupReason>,
70    pub last_status_code: u16,
71    pub option: Option<CallOption>,
72    pub answer: Option<String>,
73    pub ssrc: u32,
74    pub refer_callstate: Option<ActiveCallStateRef>,
75    pub extras: Option<HashMap<String, serde_json::Value>>,
76    pub is_refer: bool,
77
78    // Runtime state (migrated from ActiveCall to reduce multiple locks)
79    pub tts_handle: Option<SynthesisHandle>,
80    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
81    pub wait_input_timeout: Option<u32>,
82    pub moh: Option<String>,
83    pub current_play_id: Option<String>,
84    pub audio_receiver: Option<WebsocketBytesReceiver>,
85    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
86}
87
88pub type ActiveCallRef = Arc<ActiveCall>;
89pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
90
91pub struct ActiveCall {
92    pub call_state: ActiveCallStateRef,
93    pub cancel_token: CancellationToken,
94    pub call_type: ActiveCallType,
95    pub session_id: String,
96    pub media_stream: Arc<MediaStream>,
97    pub track_config: TrackConfig,
98    pub event_sender: EventSender,
99    pub app_state: AppState,
100    pub invitation: Invitation,
101    pub cmd_sender: CommandSender,
102    pub dump_events: bool,
103    pub server_side_track_id: TrackId,
104}
105
106pub struct ActiveCallGuard {
107    pub call: ActiveCallRef,
108    pub active_calls: usize,
109}
110
111impl ActiveCallGuard {
112    pub fn new(call: ActiveCallRef) -> Self {
113        let active_calls = {
114            call.app_state
115                .total_calls
116                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117            let mut calls = call.app_state.active_calls.lock().unwrap();
118            calls.insert(call.session_id.clone(), call.clone());
119            calls.len()
120        };
121        Self { call, active_calls }
122    }
123}
124
125impl Drop for ActiveCallGuard {
126    fn drop(&mut self) {
127        self.call
128            .app_state
129            .active_calls
130            .lock()
131            .unwrap()
132            .remove(&self.call.session_id);
133    }
134}
135
136pub struct ActiveCallReceiver {
137    pub cmd_receiver: CommandReceiver,
138    pub dump_cmd_receiver: CommandReceiver,
139    pub dump_event_receiver: EventReceiver,
140}
141
142impl ActiveCall {
143    pub fn new(
144        call_type: ActiveCallType,
145        cancel_token: CancellationToken,
146        session_id: String,
147        invitation: Invitation,
148        app_state: AppState,
149        track_config: TrackConfig,
150        audio_receiver: Option<WebsocketBytesReceiver>,
151        dump_events: bool,
152        server_side_track_id: Option<TrackId>,
153        extras: Option<HashMap<String, serde_json::Value>>,
154    ) -> Self {
155        let event_sender = crate::event::create_event_sender();
156        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
157        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
158            .with_id(session_id.clone())
159            .with_cancel_token(cancel_token.child_token());
160        let media_stream = Arc::new(media_stream_builder.build());
161        let call_state = Arc::new(RwLock::new(ActiveCallState {
162            session_id: session_id.clone(),
163            start_time: Utc::now(),
164            ssrc: rand::random::<u32>(),
165            extras,
166            audio_receiver,
167            ..Default::default()
168        }));
169        Self {
170            cancel_token,
171            call_type,
172            session_id,
173            call_state,
174            media_stream,
175            track_config,
176            event_sender,
177            app_state,
178            invitation,
179            cmd_sender,
180            dump_events,
181            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
182        }
183    }
184
185    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
186        self.cmd_sender
187            .send(command)
188            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
189        Ok(())
190    }
191
192    /// Create a new ActiveCallReceiver for this ActiveCall
193    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
194    /// before calling `serve()`
195    pub fn new_receiver(&self) -> ActiveCallReceiver {
196        ActiveCallReceiver {
197            cmd_receiver: self.cmd_sender.subscribe(),
198            dump_cmd_receiver: self.cmd_sender.subscribe(),
199            dump_event_receiver: self.event_sender.subscribe(),
200        }
201    }
202
203    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
204        let ActiveCallReceiver {
205            mut cmd_receiver,
206            dump_cmd_receiver,
207            dump_event_receiver,
208        } = receiver;
209
210        let process_command_loop = async move {
211            while let Ok(command) = cmd_receiver.recv().await {
212                match self.dispatch(command).await {
213                    Ok(_) => (),
214                    Err(e) => {
215                        warn!(session_id = self.session_id, "{}", e);
216                        self.event_sender
217                            .send(SessionEvent::Error {
218                                track_id: self.session_id.clone(),
219                                timestamp: crate::media::get_timestamp(),
220                                sender: "command".to_string(),
221                                error: e.to_string(),
222                                code: None,
223                            })
224                            .ok();
225                    }
226                }
227            }
228        };
229        self.app_state
230            .total_calls
231            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
232
233        tokio::join!(
234            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
235            async {
236                select! {
237                    _ = process_command_loop => {
238                        info!(session_id = self.session_id, "command loop done");
239                    }
240                    _ = self.process() => {
241                        info!(session_id = self.session_id, "call serve done");
242                    }
243                    _ = self.cancel_token.cancelled() => {
244                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
245                    }
246                }
247                self.cancel_token.cancel();
248            }
249        );
250        Ok(())
251    }
252
253    async fn process(&self) -> Result<()> {
254        let mut event_receiver = self.event_sender.subscribe();
255
256        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
257        let input_timeout_expire_ref = input_timeout_expire.clone();
258        let event_sender = self.event_sender.clone();
259        let wait_input_timeout_loop = async {
260            loop {
261                let (start_time, expire) = { *input_timeout_expire.lock().await };
262                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
263                    info!(session_id = self.session_id, "wait input timeout reached");
264                    *input_timeout_expire.lock().await = (0, 0);
265                    event_sender
266                        .send(SessionEvent::Silence {
267                            track_id: self.server_side_track_id.clone(),
268                            timestamp: crate::media::get_timestamp(),
269                            start_time,
270                            duration: expire as u64,
271                            samples: None,
272                        })
273                        .ok();
274                }
275                sleep(Duration::from_millis(100)).await;
276            }
277        };
278        let server_side_track_id = self.server_side_track_id.clone();
279        let event_hook_loop = async move {
280            while let Ok(event) = event_receiver.recv().await {
281                match event {
282                    SessionEvent::Speaking { .. }
283                    | SessionEvent::Dtmf { .. }
284                    | SessionEvent::AsrDelta { .. }
285                    | SessionEvent::AsrFinal { .. }
286                    | SessionEvent::TrackStart { .. } => {
287                        *input_timeout_expire_ref.lock().await = (0, 0);
288                    }
289                    SessionEvent::TrackEnd {
290                        track_id,
291                        play_id,
292                        ssrc,
293                        ..
294                    } => {
295                        if track_id != server_side_track_id {
296                            continue;
297                        }
298
299                        let (moh_path, auto_hangup, wait_timeout_val) = {
300                            let mut state = self.call_state.write().await;
301                            if play_id != state.current_play_id {
302                                debug!(
303                                    session_id = self.session_id,
304                                    ?play_id,
305                                    current = ?state.current_play_id,
306                                    "ignoring interrupted track end"
307                                );
308                                continue;
309                            }
310                            state.current_play_id = None;
311                            (
312                                state.moh.clone(),
313                                state.auto_hangup.clone(),
314                                state.wait_input_timeout.take(),
315                            )
316                        };
317
318                        if let Some(path) = moh_path {
319                            info!(session_id = self.session_id, "looping moh: {}", path);
320                            let ssrc = rand::random::<u32>();
321                            let file_track = FileTrack::new(self.server_side_track_id.clone())
322                                .with_play_id(Some(path.clone()))
323                                .with_ssrc(ssrc)
324                                .with_path(path.clone())
325                                .with_cancel_token(self.cancel_token.child_token());
326                            self.update_track_wrapper(Box::new(file_track), Some(path))
327                                .await;
328                            continue;
329                        }
330
331                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
332                            if hangup_ssrc == ssrc {
333                                info!(
334                                    session_id = self.session_id,
335                                    ssrc, "auto hangup when track end track_id:{}", track_id
336                                );
337                                self.do_hangup(Some(hangup_reason), None).await.ok();
338                            }
339                        }
340
341                        if let Some(timeout) = wait_timeout_val {
342                            let expire = if timeout > 0 {
343                                (crate::media::get_timestamp(), timeout)
344                            } else {
345                                (0, 0)
346                            };
347                            *input_timeout_expire_ref.lock().await = expire;
348                        }
349                    }
350                    SessionEvent::Interrupt { receiver } => {
351                        let track_id =
352                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
353                        if track_id == self.server_side_track_id {
354                            debug!(
355                                session_id = self.session_id,
356                                "received interrupt event, stopping playback"
357                            );
358                            self.do_interrupt(true).await.ok();
359                        }
360                    }
361                    SessionEvent::Inactivity { track_id, .. } => {
362                        info!(
363                            session_id = self.session_id,
364                            track_id, "inactivity timeout reached, hanging up"
365                        );
366                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
367                            .await
368                            .ok();
369                    }
370                    SessionEvent::Error { track_id, .. } => {
371                        if track_id != server_side_track_id {
372                            continue;
373                        }
374
375                        let moh_info = {
376                            let mut state = self.call_state.write().await;
377                            if let Some(path) = state.moh.clone() {
378                                let fallback = "./config/sounds/refer_moh.wav".to_string();
379                                let next_path = if path != fallback
380                                    && std::path::Path::new(&fallback).exists()
381                                {
382                                    info!(
383                                        session_id = self.session_id,
384                                        "moh error, switching to fallback: {}", fallback
385                                    );
386                                    state.moh = Some(fallback.clone());
387                                    fallback
388                                } else {
389                                    info!(
390                                        session_id = self.session_id,
391                                        "looping moh on error: {}", path
392                                    );
393                                    path
394                                };
395                                Some(next_path)
396                            } else {
397                                None
398                            }
399                        };
400
401                        if let Some(next_path) = moh_info {
402                            let ssrc = rand::random::<u32>();
403                            let file_track = FileTrack::new(self.server_side_track_id.clone())
404                                .with_play_id(Some(next_path.clone()))
405                                .with_ssrc(ssrc)
406                                .with_path(next_path.clone())
407                                .with_cancel_token(self.cancel_token.child_token());
408                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
409                                .await;
410                            continue;
411                        }
412                    }
413                    _ => {}
414                }
415            }
416        };
417
418        select! {
419            _ = wait_input_timeout_loop=>{
420                info!(session_id = self.session_id, "wait input timeout loop done");
421            }
422            _ = self.media_stream.serve() => {
423                info!(session_id = self.session_id, "media stream loop done");
424            }
425            _ = event_hook_loop => {
426                info!(session_id = self.session_id, "event loop done");
427            }
428        }
429        Ok(())
430    }
431
432    async fn dispatch(&self, command: Command) -> Result<()> {
433        match command {
434            Command::Invite { option } => self.do_invite(option).await,
435            Command::Accept { option } => self.do_accept(option).await,
436            Command::Reject { reason, code } => {
437                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
438                    .await
439            }
440            Command::Ringing {
441                ringtone,
442                recorder,
443                early_media,
444            } => self.do_ringing(ringtone, recorder, early_media).await,
445            Command::Tts {
446                text,
447                speaker,
448                play_id,
449                auto_hangup,
450                streaming,
451                end_of_stream,
452                option,
453                wait_input_timeout,
454                base64,
455            } => {
456                self.do_tts(
457                    text,
458                    speaker,
459                    play_id,
460                    auto_hangup,
461                    streaming.unwrap_or_default(),
462                    end_of_stream.unwrap_or_default(),
463                    option,
464                    wait_input_timeout,
465                    base64.unwrap_or_default(),
466                )
467                .await
468            }
469            Command::Play {
470                url,
471                play_id,
472                auto_hangup,
473                wait_input_timeout,
474            } => {
475                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
476                    .await
477            }
478            Command::Hangup { reason, initiator } => {
479                let reason = reason.map(|r| {
480                    r.parse::<CallRecordHangupReason>()
481                        .unwrap_or(CallRecordHangupReason::BySystem)
482                });
483                self.do_hangup(reason, initiator).await
484            }
485            Command::Refer {
486                caller,
487                callee,
488                options,
489            } => self.do_refer(caller, callee, options).await,
490            Command::Mute { track_id } => self.do_mute(track_id).await,
491            Command::Unmute { track_id } => self.do_unmute(track_id).await,
492            Command::Pause {} => self.do_pause().await,
493            Command::Resume {} => self.do_resume().await,
494            Command::Interrupt {
495                graceful: passage,
496                fade_out_ms: _,
497            } => self.do_interrupt(passage.unwrap_or_default()).await,
498            Command::History { speaker, text } => self.do_history(speaker, text).await,
499        }
500    }
501
502    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
503        if let Some(recorder_option) = &option.recorder {
504            let mut recorder_file = recorder_option.recorder_file.clone();
505            if recorder_file.contains("{id}") {
506                recorder_file = recorder_file.replace("{id}", &self.session_id);
507            }
508
509            let recorder_file = if recorder_file.is_empty() {
510                self.app_state.get_recorder_file(&self.session_id)
511            } else {
512                let p = Path::new(&recorder_file);
513                p.is_absolute()
514                    .then(|| recorder_file.clone())
515                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
516            };
517            info!(
518                session_id = self.session_id,
519                recorder_file, "created recording file"
520            );
521
522            let track_samplerate = self.track_config.samplerate;
523            let recorder_samplerate = if track_samplerate > 0 {
524                track_samplerate
525            } else {
526                recorder_option.samplerate
527            };
528            let recorder_ptime = if recorder_option.ptime == 0 {
529                200
530            } else {
531                recorder_option.ptime
532            };
533            let requested_format = recorder_option
534                .format
535                .unwrap_or(self.app_state.config.recorder_format());
536            let format = requested_format.effective();
537            if requested_format != format {
538                warn!(
539                    session_id = self.session_id,
540                    requested = requested_format.extension(),
541                    "Recorder format fallback to wav due to unsupported feature"
542                );
543            }
544            let mut recorder_config = RecorderOption {
545                recorder_file,
546                samplerate: recorder_samplerate,
547                ptime: recorder_ptime,
548                format: Some(format),
549            };
550            recorder_config.ensure_path_extension(format);
551            Some(recorder_config)
552        } else {
553            None
554        }
555    }
556
557    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
558        // Merge with existing configuration (e.g., from playbook)
559        {
560            let state = self.call_state.read().await;
561            option = state.merge_option(option);
562        }
563
564        option.check_default();
565        if let Some(opt) = self.build_record_option(&option) {
566            self.media_stream.update_recorder_option(opt).await;
567        }
568
569        if let Some(opt) = &option.media_pass {
570            let track_id = self.server_side_track_id.clone();
571            let cancel_token = self.cancel_token.child_token();
572            let ssrc = rand::random::<u32>();
573            let media_pass_track = MediaPassTrack::new(
574                self.session_id.clone(),
575                ssrc,
576                track_id,
577                cancel_token,
578                opt.clone(),
579            );
580            self.update_track_wrapper(Box::new(media_pass_track), None)
581                .await;
582        }
583
584        info!(
585            session_id = self.session_id,
586            call_type = ?self.call_type,
587            sender,
588            ?option,
589            "caller with option"
590        );
591
592        match self.setup_caller_track(&option).await {
593            Ok(_) => return Ok(option),
594            Err(e) => {
595                self.app_state
596                    .total_failed_calls
597                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
598                let error_event = crate::event::SessionEvent::Error {
599                    track_id: self.session_id.clone(),
600                    timestamp: crate::media::get_timestamp(),
601                    sender,
602                    error: e.to_string(),
603                    code: None,
604                };
605                self.event_sender.send(error_event).ok();
606                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
607                    .await
608                    .ok();
609                return Err(e);
610            }
611        }
612    }
613
614    async fn do_invite(&self, option: CallOption) -> Result<()> {
615        self.invite_or_accept(option, "invite".to_string())
616            .await
617            .map(|_| ())
618    }
619
620    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
621        let has_pending = self
622            .invitation
623            .find_dialog_id_by_session_id(&self.session_id)
624            .is_some();
625        let ready_to_answer_val = {
626            let state = self.call_state.read().await;
627            state.ready_to_answer.is_none()
628        };
629
630        if ready_to_answer_val {
631            if !has_pending {
632                // emit reject event
633                warn!(session_id = self.session_id, "no pending call to accept");
634                let rejet_event = crate::event::SessionEvent::Reject {
635                    track_id: self.session_id.clone(),
636                    timestamp: crate::media::get_timestamp(),
637                    reason: "no pending call".to_string(),
638                    refer: None,
639                    code: Some(486),
640                };
641                self.event_sender.send(rejet_event).ok();
642                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
643                    .await
644                    .ok();
645                return Err(anyhow::anyhow!("no pending call to accept"));
646            }
647            option = self.invite_or_accept(option, "accept".to_string()).await?;
648        } else {
649            option.check_default();
650            self.call_state.write().await.option = Some(option.clone());
651        }
652        info!(session_id = self.session_id, ?option, "accepting call");
653        let ready = self.call_state.write().await.ready_to_answer.take();
654        if let Some((answer, track, dialog)) = ready {
655            info!(
656                session_id = self.session_id,
657                track_id = track.as_ref().map(|t| t.id()),
658                "ready to answer with track"
659            );
660
661            let headers = vec![rsip::Header::ContentType(
662                "application/sdp".to_string().into(),
663            )];
664
665            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
666                Ok(_) => {
667                    {
668                        let mut state = self.call_state.write().await;
669                        state.answer = Some(answer);
670                        state.answer_time = Some(Utc::now());
671                    }
672                    self.finish_caller_stack(&option, track).await?;
673                }
674                Err(e) => {
675                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
676                    return Err(anyhow::anyhow!("failed to accept call"));
677                }
678            }
679        }
680        return Ok(());
681    }
682
683    async fn do_reject(
684        &self,
685        code: Option<rsip::StatusCode>,
686        reason: Option<String>,
687    ) -> Result<()> {
688        match self
689            .invitation
690            .find_dialog_id_by_session_id(&self.session_id)
691        {
692            Some(id) => {
693                info!(
694                    session_id = self.session_id,
695                    ?reason,
696                    ?code,
697                    "rejecting call"
698                );
699                self.invitation.hangup(id, code, reason).await
700            }
701            None => Ok(()),
702        }
703    }
704
705    async fn do_ringing(
706        &self,
707        ringtone: Option<String>,
708        recorder: Option<RecorderOption>,
709        early_media: Option<bool>,
710    ) -> Result<()> {
711        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
712        if ready_to_answer_val {
713            let option = CallOption {
714                recorder,
715                ..Default::default()
716            };
717            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
718        }
719
720        let state = self.call_state.read().await;
721        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
722            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
723                let headers = vec![rsip::Header::ContentType(
724                    "application/sdp".to_string().into(),
725                )];
726                (Some(headers), Some(answer.as_bytes().to_vec()))
727            } else {
728                (None, None)
729            };
730
731            dialog.ringing(headers, body).ok();
732            info!(
733                session_id = self.session_id,
734                ringtone, early_media, "playing ringtone"
735            );
736            if let Some(ringtone_url) = ringtone {
737                drop(state);
738                self.do_play(ringtone_url, None, None, None).await.ok();
739            } else {
740                info!(session_id = self.session_id, "no ringtone to play");
741            }
742        }
743        Ok(())
744    }
745
746    async fn do_tts(
747        &self,
748        text: String,
749        speaker: Option<String>,
750        play_id: Option<String>,
751        auto_hangup: Option<bool>,
752        streaming: bool,
753        end_of_stream: bool,
754        option: Option<SynthesisOption>,
755        wait_input_timeout: Option<u32>,
756        base64: bool,
757    ) -> Result<()> {
758        let tts_option = {
759            let call_state = self.call_state.read().await;
760            match call_state.option.clone().unwrap_or_default().tts {
761                Some(opt) => opt.merge_with(option),
762                None => {
763                    if let Some(opt) = option {
764                        opt
765                    } else {
766                        return Err(anyhow::anyhow!("no tts option available"));
767                    }
768                }
769            }
770        };
771        let speaker = match speaker {
772            Some(s) => Some(s),
773            None => tts_option.speaker.clone(),
774        };
775
776        let mut play_command = SynthesisCommand {
777            text,
778            speaker,
779            play_id: play_id.clone(),
780            streaming,
781            end_of_stream,
782            option: tts_option,
783            base64,
784        };
785        info!(
786            session_id = self.session_id,
787            provider = ?play_command.option.provider,
788            text = %play_command.text.chars().take(10).collect::<String>(),
789            speaker = play_command.speaker.as_deref(),
790            auto_hangup = auto_hangup.unwrap_or_default(),
791            play_id = play_command.play_id.as_deref(),
792            streaming = play_command.streaming,
793            end_of_stream = play_command.end_of_stream,
794            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
795            is_base64 = play_command.base64,
796            "new synthesis"
797        );
798
799        let ssrc = rand::random::<u32>();
800        let should_interrupt = {
801            let mut state = self.call_state.write().await;
802            state.auto_hangup = match auto_hangup {
803                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
804                _ => None,
805            };
806            state.wait_input_timeout = wait_input_timeout;
807
808            let changed = play_id.is_some() && state.current_play_id != play_id;
809            state.current_play_id = play_id.clone();
810            changed
811        };
812
813        if should_interrupt {
814            let _ = self.do_interrupt(false).await;
815        }
816
817        let existing_handle = self.call_state.read().await.tts_handle.clone();
818        if let Some(tts_handle) = existing_handle {
819            match tts_handle.try_send(play_command) {
820                Ok(_) => return Ok(()),
821                Err(e) => {
822                    play_command = e.0;
823                }
824            }
825        }
826
827        let (new_handle, tts_track) = StreamEngine::create_tts_track(
828            self.app_state.stream_engine.clone(),
829            self.cancel_token.child_token(),
830            self.session_id.clone(),
831            self.server_side_track_id.clone(),
832            ssrc,
833            play_id.clone(),
834            streaming,
835            &play_command.option,
836        )
837        .await?;
838
839        new_handle.try_send(play_command)?;
840        self.call_state.write().await.tts_handle = Some(new_handle);
841        self.update_track_wrapper(tts_track, play_id).await;
842        Ok(())
843    }
844
845    async fn do_play(
846        &self,
847        url: String,
848        play_id: Option<String>,
849        auto_hangup: Option<bool>,
850        wait_input_timeout: Option<u32>,
851    ) -> Result<()> {
852        let ssrc = rand::random::<u32>();
853        info!(
854            session_id = self.session_id,
855            ssrc, url, play_id, auto_hangup, "play file track"
856        );
857
858        let play_id = play_id.or(Some(url.clone()));
859
860        let file_track = FileTrack::new(self.server_side_track_id.clone())
861            .with_play_id(play_id.clone())
862            .with_ssrc(ssrc)
863            .with_path(url)
864            .with_cancel_token(self.cancel_token.child_token());
865
866        {
867            let mut state = self.call_state.write().await;
868            state.tts_handle = None;
869            state.auto_hangup = match auto_hangup {
870                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
871                _ => None,
872            };
873            state.wait_input_timeout = wait_input_timeout;
874        }
875
876        self.update_track_wrapper(Box::new(file_track), play_id)
877            .await;
878        Ok(())
879    }
880
881    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
882        self.event_sender
883            .send(SessionEvent::AddHistory {
884                sender: Some(self.session_id.clone()),
885                timestamp: crate::media::get_timestamp(),
886                speaker,
887                text,
888            })
889            .map(|_| ())
890            .map_err(Into::into)
891    }
892
893    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
894        {
895            let mut state = self.call_state.write().await;
896            state.tts_handle = None;
897            state.moh = None;
898        }
899        self.media_stream
900            .remove_track(&self.server_side_track_id, graceful)
901            .await;
902        Ok(())
903    }
904    async fn do_pause(&self) -> Result<()> {
905        Ok(())
906    }
907    async fn do_resume(&self) -> Result<()> {
908        Ok(())
909    }
910    async fn do_hangup(
911        &self,
912        reason: Option<CallRecordHangupReason>,
913        initiator: Option<String>,
914    ) -> Result<()> {
915        info!(
916            session_id = self.session_id,
917            ?reason,
918            ?initiator,
919            "do_hangup"
920        );
921
922        // Set hangup reason based on initiator and reason
923        let hangup_reason = match initiator.as_deref() {
924            Some("caller") => CallRecordHangupReason::ByCaller,
925            Some("callee") => CallRecordHangupReason::ByCallee,
926            Some("system") => CallRecordHangupReason::Autohangup,
927            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
928        };
929
930        self.media_stream
931            .stop(Some(hangup_reason.to_string()), initiator);
932
933        self.call_state
934            .write()
935            .await
936            .set_hangup_reason(hangup_reason);
937        Ok(())
938    }
939
940    async fn do_refer(
941        &self,
942        caller: String,
943        callee: String,
944        refer_option: Option<ReferOption>,
945    ) -> Result<()> {
946        self.do_interrupt(false).await.ok();
947        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
948        if let Some(ref path) = moh {
949            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
950                let fallback = "./config/sounds/refer_moh.wav";
951                if std::path::Path::new(fallback).exists() {
952                    info!(
953                        session_id = self.session_id,
954                        "moh {} not found, using fallback {}", path, fallback
955                    );
956                    moh = Some(fallback.to_string());
957                }
958            }
959        }
960        let session_id = self.session_id.clone();
961        let track_id = self.server_side_track_id.clone();
962
963        let recorder = {
964            let cs = self.call_state.read().await;
965            cs.option
966                .as_ref()
967                .map(|o| o.recorder.clone())
968                .unwrap_or_default()
969        };
970
971        let call_option = CallOption {
972            caller: Some(caller),
973            callee: Some(callee.clone()),
974            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
975            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
976            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
977            recorder,
978            ..Default::default()
979        };
980
981        let mut invite_option = call_option.build_invite_option()?;
982        invite_option.call_id = Some(self.session_id.clone());
983
984        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
985
986        {
987            let cs = self.call_state.read().await;
988            if let Some(opt) = cs.option.as_ref() {
989                if let Some(callee) = opt.callee.as_ref() {
990                    headers.push(rsip::Header::Other(
991                        "X-Referred-To".to_string(),
992                        callee.clone(),
993                    ));
994                }
995                if let Some(caller) = opt.caller.as_ref() {
996                    headers.push(rsip::Header::Other(
997                        "X-Referred-From".to_string(),
998                        caller.clone(),
999                    ));
1000                }
1001            }
1002        }
1003
1004        headers.push(rsip::Header::Other(
1005            "X-Referred-Id".to_string(),
1006            self.session_id.clone(),
1007        ));
1008
1009        let ssrc = rand::random::<u32>();
1010        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1011            start_time: Utc::now(),
1012            ssrc,
1013            option: Some(call_option.clone()),
1014            is_refer: true,
1015            ..Default::default()
1016        }));
1017
1018        {
1019            let mut cs = self.call_state.write().await;
1020            cs.refer_callstate.replace(refer_call_state.clone());
1021        }
1022
1023        let auto_hangup_requested = refer_option
1024            .as_ref()
1025            .and_then(|o| o.auto_hangup)
1026            .unwrap_or(true);
1027
1028        if auto_hangup_requested {
1029            self.call_state.write().await.auto_hangup =
1030                Some((ssrc, CallRecordHangupReason::ByRefer));
1031        } else {
1032            self.call_state.write().await.auto_hangup = None;
1033        }
1034
1035        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1036
1037        info!(
1038            session_id = self.session_id,
1039            ssrc,
1040            auto_hangup = auto_hangup_requested,
1041            callee,
1042            timeout_secs,
1043            "do_refer"
1044        );
1045
1046        let r = tokio::time::timeout(
1047            Duration::from_secs(timeout_secs as u64),
1048            self.create_outgoing_sip_track(
1049                self.cancel_token.child_token(),
1050                refer_call_state.clone(),
1051                &track_id,
1052                invite_option,
1053                &call_option,
1054                moh,
1055                auto_hangup_requested,
1056            ),
1057        )
1058        .await;
1059
1060        {
1061            self.call_state.write().await.moh = None;
1062        }
1063
1064        let result = match r {
1065            Ok(res) => res,
1066            Err(_) => {
1067                warn!(
1068                    session_id = session_id,
1069                    "refer sip track creation timed out after {} seconds", timeout_secs
1070                );
1071                self.event_sender
1072                    .send(SessionEvent::Reject {
1073                        track_id,
1074                        timestamp: crate::media::get_timestamp(),
1075                        reason: "Timeout when refer".into(),
1076                        code: Some(408),
1077                        refer: Some(true),
1078                    })
1079                    .ok();
1080                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1081            }
1082        };
1083
1084        match result {
1085            Ok(answer) => {
1086                self.event_sender
1087                    .send(SessionEvent::Answer {
1088                        timestamp: crate::media::get_timestamp(),
1089                        track_id,
1090                        sdp: answer,
1091                        refer: Some(true),
1092                    })
1093                    .ok();
1094            }
1095            Err(e) => {
1096                warn!(
1097                    session_id = session_id,
1098                    "failed to create refer sip track: {}", e
1099                );
1100                match &e {
1101                    rsipstack::Error::DialogError(reason, _, code) => {
1102                        self.event_sender
1103                            .send(SessionEvent::Reject {
1104                                track_id,
1105                                timestamp: crate::media::get_timestamp(),
1106                                reason: reason.clone(),
1107                                code: Some(code.code() as u32),
1108                                refer: Some(true),
1109                            })
1110                            .ok();
1111                    }
1112                    _ => {}
1113                }
1114                return Err(e.into());
1115            }
1116        }
1117        Ok(())
1118    }
1119
1120    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1121        self.media_stream.mute_track(track_id).await;
1122        Ok(())
1123    }
1124
1125    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1126        self.media_stream.unmute_track(track_id).await;
1127        Ok(())
1128    }
1129
1130    pub async fn cleanup(&self) -> Result<()> {
1131        self.call_state.write().await.tts_handle = None;
1132        self.media_stream.cleanup().await.ok();
1133        Ok(())
1134    }
1135
1136    pub fn get_callrecord(&self) -> Option<CallRecord> {
1137        self.call_state.try_read().ok().map(|call_state| {
1138            call_state.build_callrecord(
1139                self.app_state.clone(),
1140                self.session_id.clone(),
1141                self.call_type.clone(),
1142            )
1143        })
1144    }
1145
1146    async fn dump_to_file(
1147        &self,
1148        dump_file: &mut File,
1149        cmd_receiver: &mut CommandReceiver,
1150        event_receiver: &mut EventReceiver,
1151    ) {
1152        loop {
1153            select! {
1154                _ = self.cancel_token.cancelled() => {
1155                    break;
1156                }
1157                Ok(cmd) = cmd_receiver.recv() => {
1158                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1159                        .await;
1160                }
1161                Ok(event) = event_receiver.recv() => {
1162                    if matches!(event, SessionEvent::Binary{..}) {
1163                        continue;
1164                    }
1165                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1166                        .await;
1167                }
1168            };
1169        }
1170    }
1171
1172    async fn dump_loop(
1173        &self,
1174        dump_events: bool,
1175        mut dump_cmd_receiver: CommandReceiver,
1176        mut dump_event_receiver: EventReceiver,
1177    ) {
1178        if !dump_events {
1179            return;
1180        }
1181
1182        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1183        let mut dump_file = match File::options()
1184            .create(true)
1185            .append(true)
1186            .open(&file_name)
1187            .await
1188        {
1189            Ok(file) => file,
1190            Err(e) => {
1191                warn!(
1192                    session_id = self.session_id,
1193                    file_name, "failed to open dump events file: {}", e
1194                );
1195                return;
1196            }
1197        };
1198        self.dump_to_file(
1199            &mut dump_file,
1200            &mut dump_cmd_receiver,
1201            &mut dump_event_receiver,
1202        )
1203        .await;
1204
1205        while let Ok(event) = dump_event_receiver.try_recv() {
1206            if matches!(event, SessionEvent::Binary { .. }) {
1207                continue;
1208            }
1209            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1210        }
1211    }
1212
1213    pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1214        let mut rtc_config = RtcTrackConfig::default();
1215        rtc_config.mode = rustrtc::TransportMode::Rtp;
1216
1217        if let Some(codecs) = &self.app_state.config.codecs {
1218            let mut codec_types = Vec::new();
1219            for c in codecs {
1220                match c.to_lowercase().as_str() {
1221                    "pcmu" => codec_types.push(CodecType::PCMU),
1222                    "pcma" => codec_types.push(CodecType::PCMA),
1223                    "g722" => codec_types.push(CodecType::G722),
1224                    "g729" => codec_types.push(CodecType::G729),
1225                    "opus" => codec_types.push(CodecType::Opus),
1226                    "dtmf" | "2833" | "telephone_event" => {
1227                        codec_types.push(CodecType::TelephoneEvent)
1228                    }
1229                    _ => {}
1230                }
1231            }
1232            if !codec_types.is_empty() {
1233                rtc_config.preferred_codec = Some(codec_types[0].clone());
1234                rtc_config.codecs = codec_types;
1235            }
1236        }
1237
1238        if rtc_config.preferred_codec.is_none() {
1239            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1240        }
1241
1242        rtc_config.rtp_port_range = self
1243            .app_state
1244            .config
1245            .rtp_start_port
1246            .zip(self.app_state.config.rtp_end_port);
1247
1248        if let Some(ref external_ip) = self.app_state.config.external_ip {
1249            rtc_config.external_ip = Some(external_ip.clone());
1250        }
1251
1252        let mut track = RtcTrack::new(
1253            self.cancel_token.child_token(),
1254            track_id,
1255            self.track_config.clone(),
1256            rtc_config,
1257        )
1258        .with_ssrc(ssrc);
1259
1260        track.create().await?;
1261
1262        Ok(track)
1263    }
1264
1265    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1266        self.call_state.write().await.option = Some(option.clone());
1267        info!(
1268            session_id = self.session_id,
1269            call_type = ?self.call_type,
1270            "setup caller track"
1271        );
1272
1273        let track = match self.call_type {
1274            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1275            ActiveCallType::WebSocket => {
1276                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1277                if let Some(receiver) = audio_receiver {
1278                    Some(self.create_websocket_track(receiver).await?)
1279                } else {
1280                    None
1281                }
1282            }
1283            ActiveCallType::Sip => {
1284                if let Some(dialog_id) = self
1285                    .invitation
1286                    .find_dialog_id_by_session_id(&self.session_id)
1287                {
1288                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1289                        return self
1290                            .prepare_incoming_sip_track(
1291                                self.cancel_token.clone(),
1292                                self.call_state.clone(),
1293                                &self.session_id,
1294                                pending_dialog,
1295                            )
1296                            .await;
1297                    }
1298                }
1299
1300                let mut invite_option = option.build_invite_option()?;
1301                invite_option.call_id = Some(self.session_id.clone());
1302
1303                match self
1304                    .create_outgoing_sip_track(
1305                        self.cancel_token.clone(),
1306                        self.call_state.clone(),
1307                        &self.session_id,
1308                        invite_option,
1309                        &option,
1310                        None,
1311                        false,
1312                    )
1313                    .await
1314                {
1315                    Ok(answer) => {
1316                        self.event_sender
1317                            .send(SessionEvent::Answer {
1318                                timestamp: crate::media::get_timestamp(),
1319                                track_id: self.session_id.clone(),
1320                                sdp: answer,
1321                                refer: Some(false),
1322                            })
1323                            .ok();
1324                        return Ok(());
1325                    }
1326                    Err(e) => {
1327                        warn!(
1328                            session_id = self.session_id,
1329                            "failed to create sip track: {}", e
1330                        );
1331                        match &e {
1332                            rsipstack::Error::DialogError(reason, _, code) => {
1333                                self.event_sender
1334                                    .send(SessionEvent::Reject {
1335                                        track_id: self.session_id.clone(),
1336                                        timestamp: crate::media::get_timestamp(),
1337                                        reason: reason.clone(),
1338                                        code: Some(code.code() as u32),
1339                                        refer: Some(false),
1340                                    })
1341                                    .ok();
1342                            }
1343                            _ => {}
1344                        }
1345                        return Err(e.into());
1346                    }
1347                }
1348            }
1349            ActiveCallType::B2bua => {
1350                if let Some(dialog_id) = self
1351                    .invitation
1352                    .find_dialog_id_by_session_id(&self.session_id)
1353                {
1354                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1355                        return self
1356                            .prepare_incoming_sip_track(
1357                                self.cancel_token.clone(),
1358                                self.call_state.clone(),
1359                                &self.session_id,
1360                                pending_dialog,
1361                            )
1362                            .await;
1363                    }
1364                }
1365
1366                warn!(
1367                    session_id = self.session_id,
1368                    "no pending dialog found for B2BUA call"
1369                );
1370                return Err(anyhow::anyhow!(
1371                    "no pending dialog found for session_id: {}",
1372                    self.session_id
1373                ));
1374            }
1375        };
1376        match track {
1377            Some(track) => {
1378                self.finish_caller_stack(&option, Some(track)).await?;
1379            }
1380            None => {
1381                warn!(session_id = self.session_id, "no track created for caller");
1382                return Err(anyhow::anyhow!("no track created for caller"));
1383            }
1384        }
1385        Ok(())
1386    }
1387
1388    async fn finish_caller_stack(
1389        &self,
1390        option: &CallOption,
1391        track: Option<Box<dyn Track>>,
1392    ) -> Result<()> {
1393        if let Some(track) = track {
1394            self.setup_track_with_stream(&option, track).await?;
1395        }
1396
1397        {
1398            let call_state = self.call_state.read().await;
1399            if let Some(ref answer) = call_state.answer {
1400                info!(
1401                    session_id = self.session_id,
1402                    "sending answer event: {}", answer,
1403                );
1404                self.event_sender
1405                    .send(SessionEvent::Answer {
1406                        timestamp: crate::media::get_timestamp(),
1407                        track_id: self.session_id.clone(),
1408                        sdp: answer.clone(),
1409                        refer: Some(false),
1410                    })
1411                    .ok();
1412            } else {
1413                warn!(
1414                    session_id = self.session_id,
1415                    "no answer in state to send event"
1416                );
1417            }
1418        }
1419        Ok(())
1420    }
1421
1422    pub async fn setup_track_with_stream(
1423        &self,
1424        option: &CallOption,
1425        mut track: Box<dyn Track>,
1426    ) -> Result<()> {
1427        let processors = match StreamEngine::create_processors(
1428            self.app_state.stream_engine.clone(),
1429            track.as_ref(),
1430            self.cancel_token.child_token(),
1431            self.event_sender.clone(),
1432            self.media_stream.packet_sender.clone(),
1433            option,
1434        )
1435        .await
1436        {
1437            Ok(processors) => processors,
1438            Err(e) => {
1439                warn!(
1440                    session_id = self.session_id,
1441                    "failed to prepare stream processors: {}", e
1442                );
1443                vec![]
1444            }
1445        };
1446
1447        // Add all processors from the hook
1448        for processor in processors {
1449            track.append_processor(processor);
1450        }
1451
1452        self.update_track_wrapper(track, None).await;
1453        Ok(())
1454    }
1455
1456    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1457        let ambiance_opt = {
1458            let state = self.call_state.read().await;
1459            let mut opt = state
1460                .option
1461                .as_ref()
1462                .and_then(|o| o.ambiance.clone())
1463                .unwrap_or_default();
1464
1465            if let Some(global) = &self.app_state.config.ambiance {
1466                opt.merge(global);
1467            }
1468            opt
1469        };
1470        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1471            match AmbianceProcessor::new(ambiance_opt).await {
1472                Ok(ambiance) => {
1473                    info!(session_id = self.session_id, "loaded ambiance processor");
1474                    track.append_processor(Box::new(ambiance));
1475                }
1476                Err(e) => {
1477                    tracing::error!("failed to load ambiance wav {}", e);
1478                }
1479            }
1480        }
1481        self.call_state.write().await.current_play_id = play_id.clone();
1482        self.media_stream.update_track(track, play_id).await;
1483    }
1484
1485    pub async fn create_websocket_track(
1486        &self,
1487        audio_receiver: WebsocketBytesReceiver,
1488    ) -> Result<Box<dyn Track>> {
1489        let (ssrc, codec) = {
1490            let call_state = self.call_state.read().await;
1491            (
1492                call_state.ssrc,
1493                call_state
1494                    .option
1495                    .as_ref()
1496                    .map(|o| o.codec.clone())
1497                    .unwrap_or_default(),
1498            )
1499        };
1500
1501        let ws_track = WebsocketTrack::new(
1502            self.cancel_token.child_token(),
1503            self.session_id.clone(),
1504            self.track_config.clone(),
1505            self.event_sender.clone(),
1506            audio_receiver,
1507            codec,
1508            ssrc,
1509        );
1510
1511        {
1512            let mut call_state = self.call_state.write().await;
1513            call_state.answer_time = Some(Utc::now());
1514            call_state.answer = Some("".to_string());
1515            call_state.last_status_code = 200;
1516        }
1517
1518        Ok(Box::new(ws_track))
1519    }
1520
1521    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1522        let (ssrc, option) = {
1523            let call_state = self.call_state.read().await;
1524            (
1525                call_state.ssrc,
1526                call_state.option.clone().unwrap_or_default(),
1527            )
1528        };
1529
1530        let mut rtc_config = RtcTrackConfig::default();
1531        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1532        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1533
1534        if let Some(codecs) = &self.app_state.config.codecs {
1535            let mut codec_types = Vec::new();
1536            for c in codecs {
1537                match c.to_lowercase().as_str() {
1538                    "pcmu" => codec_types.push(CodecType::PCMU),
1539                    "pcma" => codec_types.push(CodecType::PCMA),
1540                    "g722" => codec_types.push(CodecType::G722),
1541                    "g729" => codec_types.push(CodecType::G729),
1542                    "opus" => codec_types.push(CodecType::Opus),
1543                    "dtmf" | "2833" | "telephone_event" => {
1544                        codec_types.push(CodecType::TelephoneEvent)
1545                    }
1546                    _ => {}
1547                }
1548            }
1549            if !codec_types.is_empty() {
1550                rtc_config.preferred_codec = Some(codec_types[0].clone());
1551                rtc_config.codecs = codec_types;
1552            }
1553        }
1554
1555        if let Some(ref external_ip) = self.app_state.config.external_ip {
1556            rtc_config.external_ip = Some(external_ip.clone());
1557        }
1558
1559        let mut webrtc_track = RtcTrack::new(
1560            self.cancel_token.child_token(),
1561            self.session_id.clone(),
1562            self.track_config.clone(),
1563            rtc_config,
1564        )
1565        .with_ssrc(ssrc);
1566
1567        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1568        let offer = match option.enable_ipv6 {
1569            Some(false) | None => {
1570                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1571            }
1572            _ => option.offer.clone().unwrap_or("".to_string()),
1573        };
1574        let answer: Option<String>;
1575        match webrtc_track.handshake(offer, timeout).await {
1576            Ok(answer_sdp) => {
1577                answer = match option.enable_ipv6 {
1578                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1579                    Some(true) => Some(answer_sdp.to_string()),
1580                };
1581            }
1582            Err(e) => {
1583                warn!(session_id = self.session_id, "failed to setup track: {}", e);
1584                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1585            }
1586        }
1587
1588        {
1589            let mut call_state = self.call_state.write().await;
1590            call_state.answer_time = Some(Utc::now());
1591            call_state.answer = answer;
1592            call_state.last_status_code = 200;
1593        }
1594        Ok(Box::new(webrtc_track))
1595    }
1596
1597    async fn create_outgoing_sip_track(
1598        &self,
1599        cancel_token: CancellationToken,
1600        call_state_ref: ActiveCallStateRef,
1601        track_id: &String,
1602        mut invite_option: InviteOption,
1603        call_option: &CallOption,
1604        moh: Option<String>,
1605        auto_hangup: bool,
1606    ) -> Result<String, rsipstack::Error> {
1607        let ssrc = call_state_ref.read().await.ssrc;
1608        let rtp_track = self
1609            .create_rtp_track(track_id.clone(), ssrc)
1610            .await
1611            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1612
1613        let offer = Some(
1614            rtp_track
1615                .local_description()
1616                .await
1617                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1618        );
1619
1620        {
1621            let mut cs = call_state_ref.write().await;
1622            if let Some(o) = cs.option.as_mut() {
1623                o.offer = offer.clone();
1624            }
1625            cs.start_time = Utc::now();
1626        };
1627
1628        invite_option.offer = offer.clone().map(|s| s.into());
1629
1630        // Set contact to local SIP endpoint address if not already set explicitly
1631        // Check if contact is still default (no scheme set) or if host is localhost-like
1632        let needs_contact = invite_option.contact.scheme.is_none()
1633            || invite_option
1634                .contact
1635                .host_with_port
1636                .to_string()
1637                .starts_with("127.0.0.1");
1638
1639        if needs_contact {
1640            if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1641                invite_option.contact = rsip::Uri {
1642                    scheme: Some(rsip::Scheme::Sip),
1643                    auth: None,
1644                    host_with_port: addr.addr.clone(),
1645                    params: vec![],
1646                    headers: vec![],
1647                };
1648            }
1649        }
1650
1651        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1652
1653        if let Some(moh) = moh {
1654            let ssrc_and_moh = {
1655                let mut state = call_state_ref.write().await;
1656                state.moh = Some(moh.clone());
1657                if state.current_play_id.is_none() {
1658                    let ssrc = rand::random::<u32>();
1659                    Some((ssrc, moh.clone()))
1660                } else {
1661                    info!(
1662                        session_id = self.session_id,
1663                        "Something is playing, MOH will start after it ends"
1664                    );
1665                    None
1666                }
1667            };
1668
1669            if let Some((ssrc, moh_path)) = ssrc_and_moh {
1670                let file_track = FileTrack::new(self.server_side_track_id.clone())
1671                    .with_play_id(Some(moh_path.clone()))
1672                    .with_ssrc(ssrc)
1673                    .with_path(moh_path.clone())
1674                    .with_cancel_token(self.cancel_token.child_token());
1675                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1676                    .await;
1677            }
1678        } else {
1679            let track = rtp_track_to_setup.take().unwrap();
1680            self.setup_track_with_stream(&call_option, track)
1681                .await
1682                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1683        }
1684
1685        info!(
1686            session_id = self.session_id,
1687            track_id,
1688            contact = %invite_option.contact,
1689            "invite {} -> {} offer: \n{}",
1690            invite_option.caller,
1691            invite_option.callee,
1692            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1693        );
1694
1695        let (dlg_state_sender, dlg_state_receiver) =
1696            self.invitation.dialog_layer.new_dialog_state_channel();
1697
1698        let states = InviteDialogStates {
1699            is_client: true,
1700            session_id: self.session_id.clone(),
1701            track_id: track_id.clone(),
1702            event_sender: self.event_sender.clone(),
1703            media_stream: self.media_stream.clone(),
1704            call_state: call_state_ref.clone(),
1705            cancel_token,
1706            terminated_reason: None,
1707            has_early_media: false,
1708        };
1709
1710        let mut client_dialog_handler =
1711            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1712
1713        crate::spawn(async move {
1714            client_dialog_handler.process_dialog(states).await;
1715        });
1716
1717        let (dialog_id, answer) = self
1718            .invitation
1719            .invite(invite_option, dlg_state_sender)
1720            .await?;
1721
1722        self.call_state.write().await.moh = None;
1723
1724        if let Some(track) = rtp_track_to_setup {
1725            self.setup_track_with_stream(&call_option, track)
1726                .await
1727                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1728        }
1729
1730        let answer = match answer {
1731            Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1732            None => {
1733                warn!(session_id = self.session_id, "no answer received");
1734                return Err(rsipstack::Error::DialogError(
1735                    "No answer received".to_string(),
1736                    dialog_id,
1737                    rsip::StatusCode::NotAcceptableHere,
1738                ));
1739            }
1740        };
1741
1742        {
1743            let mut cs = call_state_ref.write().await;
1744            if cs.answer.is_none() {
1745                cs.answer = Some(answer.clone());
1746            }
1747            if auto_hangup {
1748                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1749            }
1750        }
1751
1752        self.media_stream
1753            .update_remote_description(&track_id, &answer)
1754            .await
1755            .ok();
1756
1757        Ok(answer)
1758    }
1759
1760    /// Detect if SDP is WebRTC format
1761    pub fn is_webrtc_sdp(sdp: &str) -> bool {
1762        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1763            && sdp.contains("a=fingerprint:")
1764    }
1765
1766    pub async fn setup_answer_track(
1767        &self,
1768        ssrc: u32,
1769        option: &CallOption,
1770        offer: String,
1771    ) -> Result<(String, Box<dyn Track>)> {
1772        let offer = match option.enable_ipv6 {
1773            Some(false) | None => strip_ipv6_candidates(&offer),
1774            _ => offer.clone(),
1775        };
1776
1777        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1778
1779        let mut media_track = if Self::is_webrtc_sdp(&offer) {
1780            let mut rtc_config = RtcTrackConfig::default();
1781            rtc_config.mode = rustrtc::TransportMode::WebRtc;
1782            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1783            if let Some(ref external_ip) = self.app_state.config.external_ip {
1784                rtc_config.external_ip = Some(external_ip.clone());
1785            }
1786
1787            let webrtc_track = RtcTrack::new(
1788                self.cancel_token.child_token(),
1789                self.session_id.clone(),
1790                self.track_config.clone(),
1791                rtc_config,
1792            )
1793            .with_ssrc(ssrc);
1794
1795            Box::new(webrtc_track) as Box<dyn Track>
1796        } else {
1797            let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1798            Box::new(rtp_track) as Box<dyn Track>
1799        };
1800
1801        let answer = match media_track.handshake(offer.clone(), timeout).await {
1802            Ok(answer) => answer,
1803            Err(e) => {
1804                return Err(anyhow::anyhow!("handshake failed: {e}"));
1805            }
1806        };
1807
1808        return Ok((answer, media_track));
1809    }
1810
1811    pub async fn prepare_incoming_sip_track(
1812        &self,
1813        cancel_token: CancellationToken,
1814        call_state_ref: ActiveCallStateRef,
1815        track_id: &String,
1816        pending_dialog: PendingDialog,
1817    ) -> Result<()> {
1818        let state_receiver = pending_dialog.state_receiver;
1819        //let pending_token_clone = pending_dialog.token;
1820
1821        let states = InviteDialogStates {
1822            is_client: false,
1823            session_id: self.session_id.clone(),
1824            track_id: track_id.clone(),
1825            event_sender: self.event_sender.clone(),
1826            media_stream: self.media_stream.clone(),
1827            call_state: self.call_state.clone(),
1828            cancel_token,
1829            terminated_reason: None,
1830            has_early_media: false,
1831        };
1832
1833        let initial_request = pending_dialog.dialog.initial_request();
1834        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
1835
1836        let (ssrc, option) = {
1837            let call_state = call_state_ref.read().await;
1838            (
1839                call_state.ssrc,
1840                call_state.option.clone().unwrap_or_default(),
1841            )
1842        };
1843
1844        match self.setup_answer_track(ssrc, &option, offer).await {
1845            Ok((offer, track)) => {
1846                self.setup_track_with_stream(&option, track).await?;
1847                {
1848                    let mut state = self.call_state.write().await;
1849                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
1850                }
1851            }
1852            Err(e) => {
1853                return Err(anyhow::anyhow!("error creating track: {}", e));
1854            }
1855        }
1856
1857        let mut client_dialog_handler =
1858            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
1859
1860        crate::spawn(async move {
1861            client_dialog_handler.process_dialog(states).await;
1862        });
1863        Ok(())
1864    }
1865}
1866
1867impl Drop for ActiveCall {
1868    fn drop(&mut self) {
1869        info!(session_id = self.session_id, "dropping active call");
1870        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
1871            if let Some(record) = self.get_callrecord() {
1872                if let Err(e) = sender.send(record) {
1873                    warn!(
1874                        session_id = self.session_id,
1875                        "failed to send call record: {}", e
1876                    );
1877                }
1878            }
1879        }
1880    }
1881}
1882
1883impl ActiveCallState {
1884    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
1885        if let Some(existing) = &self.option {
1886            if option.asr.is_none() {
1887                option.asr = existing.asr.clone();
1888            }
1889            if option.tts.is_none() {
1890                option.tts = existing.tts.clone();
1891            }
1892            if option.vad.is_none() {
1893                option.vad = existing.vad.clone();
1894            }
1895            if option.denoise.is_none() {
1896                option.denoise = existing.denoise;
1897            }
1898            if option.recorder.is_none() {
1899                option.recorder = existing.recorder.clone();
1900            }
1901            if option.eou.is_none() {
1902                option.eou = existing.eou.clone();
1903            }
1904            if option.extra.is_none() {
1905                option.extra = existing.extra.clone();
1906            }
1907            if option.ambiance.is_none() {
1908                option.ambiance = existing.ambiance.clone();
1909            }
1910        }
1911        option
1912    }
1913
1914    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
1915        if self.hangup_reason.is_none() {
1916            self.hangup_reason = Some(reason);
1917        }
1918    }
1919
1920    pub fn build_hangup_event(
1921        &self,
1922        track_id: TrackId,
1923        initiator: Option<String>,
1924    ) -> crate::event::SessionEvent {
1925        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
1926        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
1927        let extra = self.extras.clone();
1928
1929        crate::event::SessionEvent::Hangup {
1930            track_id,
1931            timestamp: crate::media::get_timestamp(),
1932            reason: Some(format!("{:?}", self.hangup_reason)),
1933            initiator,
1934            start_time: self.start_time.to_rfc3339(),
1935            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
1936            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
1937            hangup_time: Utc::now().to_rfc3339(),
1938            extra,
1939            from: from.map(|f| f.into()),
1940            to: to.map(|f| f.into()),
1941            refer: Some(self.is_refer),
1942        }
1943    }
1944
1945    pub fn build_callrecord(
1946        &self,
1947        app_state: AppState,
1948        session_id: String,
1949        call_type: ActiveCallType,
1950    ) -> CallRecord {
1951        let option = self.option.clone().unwrap_or_default();
1952        let recorder = if option.recorder.is_some() {
1953            let recorder_file = app_state.get_recorder_file(&session_id);
1954            if std::path::Path::new(&recorder_file).exists() {
1955                let file_size = std::fs::metadata(&recorder_file)
1956                    .map(|m| m.len())
1957                    .unwrap_or(0);
1958                vec![crate::callrecord::CallRecordMedia {
1959                    track_id: session_id.clone(),
1960                    path: recorder_file,
1961                    size: file_size,
1962                    extra: None,
1963                }]
1964            } else {
1965                vec![]
1966            }
1967        } else {
1968            vec![]
1969        };
1970
1971        let dump_event_file = app_state.get_dump_events_file(&session_id);
1972        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
1973            Some(dump_event_file)
1974        } else {
1975            None
1976        };
1977
1978        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
1979            if let Ok(rc) = rc.try_read() {
1980                Some(Box::new(rc.build_callrecord(
1981                    app_state.clone(),
1982                    rc.session_id.clone(),
1983                    ActiveCallType::B2bua,
1984                )))
1985            } else {
1986                None
1987            }
1988        });
1989
1990        let caller = option.caller.clone().unwrap_or_default();
1991        let callee = option.callee.clone().unwrap_or_default();
1992
1993        CallRecord {
1994            option: Some(option),
1995            call_id: session_id,
1996            call_type,
1997            start_time: self.start_time,
1998            ring_time: self.ring_time.clone(),
1999            answer_time: self.answer_time.clone(),
2000            end_time: Utc::now(),
2001            caller,
2002            callee,
2003            hangup_reason: self.hangup_reason.clone(),
2004            hangup_messages: Vec::new(),
2005            status_code: self.last_status_code,
2006            extras: self.extras.clone(),
2007            dump_event_file,
2008            recorder,
2009            refer_callrecord,
2010        }
2011    }
2012}