Skip to main content

active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        processor::SubscribeProcessor,
11        recorder::RecorderOption,
12        stream::{MediaStream, MediaStreamBuilder},
13        track::{
14            Track, TrackConfig,
15            file::FileTrack,
16            media_pass::MediaPassTrack,
17            rtc::{RtcTrack, RtcTrackConfig},
18            tts::SynthesisHandle,
19            websocket::{WebsocketBytesReceiver, WebsocketTrack},
20        },
21    },
22    synthesis::{SynthesisCommand, SynthesisOption},
23};
24use crate::{
25    app::AppState,
26    call::{
27        CommandReceiver, CommandSender,
28        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
29    },
30    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
31    useragent::invitation::PendingDialog,
32};
33use anyhow::Result;
34use audio_codec::CodecType;
35use chrono::{DateTime, Utc};
36use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
37use serde::{Deserialize, Serialize};
38use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
39use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
40use tokio_util::sync::CancellationToken;
41use tracing::{debug, info, warn};
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46    use crate::app::AppStateBuilder;
47    use crate::callrecord::CallRecordHangupReason;
48    use crate::config::Config;
49    use crate::media::track::tts::SynthesisHandle;
50    use crate::synthesis::SynthesisCommand;
51    use tokio::sync::mpsc;
52
53    #[tokio::test]
54    async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
55        let mut config = Config::default();
56        config.udp_port = 0; // Use random port
57        config.media_cache_path = "/tmp/mediacache".to_string();
58        let stream_engine = Arc::new(StreamEngine::default());
59        let app_state = AppStateBuilder::new()
60            .with_config(config)
61            .with_stream_engine(stream_engine)
62            .build()
63            .await?;
64
65        let cancel_token = CancellationToken::new();
66        let session_id = "test-session".to_string();
67        let track_config = TrackConfig::default();
68
69        let mut option = crate::CallOption::default();
70        option.tts = Some(crate::synthesis::SynthesisOption::default());
71
72        let active_call = Arc::new(ActiveCall::new(
73            ActiveCallType::Sip,
74            cancel_token.clone(),
75            session_id.clone(),
76            app_state.invitation.clone(),
77            app_state.clone(),
78            track_config,
79            None,
80            false,
81            None,
82            None,
83        ));
84
85        {
86            let mut state = active_call.call_state.write().await;
87            state.option = Some(option);
88        }
89
90        let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
91        let initial_ssrc = 12345;
92        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
93
94        // 1. Set initial TTS handle
95        {
96            let mut state = active_call.call_state.write().await;
97            state.tts_handle = Some(handle);
98            state.current_play_id = Some("play_1".to_string());
99        }
100
101        // 2. Call do_tts with auto_hangup=true and same play_id
102        active_call
103            .do_tts(
104                "hangup now".to_string(),
105                None,
106                Some("play_1".to_string()),
107                Some(true),
108                false,
109                true,
110                None,
111                None,
112                false,
113            )
114            .await?;
115
116        // 3. Verify auto_hangup state uses the EXISTING SSRC
117        {
118            let state = active_call.call_state.read().await;
119            assert!(state.auto_hangup.is_some());
120            let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
121            assert_eq!(
122                h_ssrc, initial_ssrc,
123                "SSRC should be reused from existing handle"
124            );
125            assert_eq!(reason, CallRecordHangupReason::BySystem);
126        }
127
128        // 4. Verify command was sent to existing channel
129        let cmd = rx.try_recv().expect("Should have received tts command");
130        assert_eq!(cmd.text, "hangup now");
131
132        Ok(())
133    }
134
135    #[tokio::test]
136    async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
137        let mut config = Config::default();
138        config.udp_port = 0; // Use random port
139        config.media_cache_path = "/tmp/mediacache".to_string();
140        let stream_engine = Arc::new(StreamEngine::default());
141        let app_state = AppStateBuilder::new()
142            .with_config(config)
143            .with_stream_engine(stream_engine)
144            .build()
145            .await?;
146
147        let active_call = Arc::new(ActiveCall::new(
148            ActiveCallType::Sip,
149            CancellationToken::new(),
150            "test-session".to_string(),
151            app_state.invitation.clone(),
152            app_state.clone(),
153            TrackConfig::default(),
154            None,
155            false,
156            None,
157            None,
158        ));
159
160        let mut tts_opt = crate::synthesis::SynthesisOption::default();
161        tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
162        let mut option = crate::CallOption::default();
163        option.tts = Some(tts_opt);
164        {
165            let mut state = active_call.call_state.write().await;
166            state.option = Some(option);
167        }
168
169        let (tx, _rx) = mpsc::unbounded_channel();
170        let initial_ssrc = 111;
171        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
172
173        {
174            let mut state = active_call.call_state.write().await;
175            state.tts_handle = Some(handle);
176            state.current_play_id = Some("play_1".to_string());
177        }
178
179        // Call do_tts with DIFFERENT play_id
180        active_call
181            .do_tts(
182                "new play".to_string(),
183                None,
184                Some("play_2".to_string()),
185                Some(true),
186                false,
187                true,
188                None,
189                None,
190                false,
191            )
192            .await?;
193
194        // Verify auto_hangup uses a NEW SSRC (because it should interrupt and start fresh)
195        {
196            let state = active_call.call_state.read().await;
197            let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
198            assert_ne!(
199                h_ssrc, initial_ssrc,
200                "Should use a new SSRC for different play_id"
201            );
202        }
203
204        Ok(())
205    }
206}
207
208#[derive(Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct CallParams {
211    pub id: Option<String>,
212    #[serde(rename = "dump")]
213    pub dump_events: Option<bool>,
214    #[serde(rename = "ping")]
215    pub ping_interval: Option<u32>,
216    pub server_side_track: Option<String>,
217}
218
219#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
220#[serde(rename_all = "camelCase")]
221pub enum ActiveCallType {
222    Webrtc,
223    B2bua,
224    WebSocket,
225    #[default]
226    Sip,
227}
228
229#[derive(Default)]
230pub struct ActiveCallState {
231    pub session_id: String,
232    pub start_time: DateTime<Utc>,
233    pub ring_time: Option<DateTime<Utc>>,
234    pub answer_time: Option<DateTime<Utc>>,
235    pub hangup_reason: Option<CallRecordHangupReason>,
236    pub last_status_code: u16,
237    pub option: Option<CallOption>,
238    pub answer: Option<String>,
239    pub ssrc: u32,
240    pub refer_callstate: Option<ActiveCallStateRef>,
241    pub extras: Option<HashMap<String, serde_json::Value>>,
242    pub is_refer: bool,
243
244    // Runtime state (migrated from ActiveCall to reduce multiple locks)
245    pub tts_handle: Option<SynthesisHandle>,
246    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
247    pub wait_input_timeout: Option<u32>,
248    pub moh: Option<String>,
249    pub current_play_id: Option<String>,
250    pub audio_receiver: Option<WebsocketBytesReceiver>,
251    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
252}
253
254pub type ActiveCallRef = Arc<ActiveCall>;
255pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
256
257pub struct ActiveCall {
258    pub call_state: ActiveCallStateRef,
259    pub cancel_token: CancellationToken,
260    pub call_type: ActiveCallType,
261    pub session_id: String,
262    pub media_stream: Arc<MediaStream>,
263    pub track_config: TrackConfig,
264    pub event_sender: EventSender,
265    pub app_state: AppState,
266    pub invitation: Invitation,
267    pub cmd_sender: CommandSender,
268    pub dump_events: bool,
269    pub server_side_track_id: TrackId,
270}
271
272pub struct ActiveCallGuard {
273    pub call: ActiveCallRef,
274    pub active_calls: usize,
275}
276
277impl ActiveCallGuard {
278    pub fn new(call: ActiveCallRef) -> Self {
279        let active_calls = {
280            call.app_state
281                .total_calls
282                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
283            let mut calls = call.app_state.active_calls.lock().unwrap();
284            calls.insert(call.session_id.clone(), call.clone());
285            calls.len()
286        };
287        Self { call, active_calls }
288    }
289}
290
291impl Drop for ActiveCallGuard {
292    fn drop(&mut self) {
293        self.call
294            .app_state
295            .active_calls
296            .lock()
297            .unwrap()
298            .remove(&self.call.session_id);
299    }
300}
301
302pub struct ActiveCallReceiver {
303    pub cmd_receiver: CommandReceiver,
304    pub dump_cmd_receiver: CommandReceiver,
305    pub dump_event_receiver: EventReceiver,
306}
307
308impl ActiveCall {
309    pub fn new(
310        call_type: ActiveCallType,
311        cancel_token: CancellationToken,
312        session_id: String,
313        invitation: Invitation,
314        app_state: AppState,
315        track_config: TrackConfig,
316        audio_receiver: Option<WebsocketBytesReceiver>,
317        dump_events: bool,
318        server_side_track_id: Option<TrackId>,
319        extras: Option<HashMap<String, serde_json::Value>>,
320    ) -> Self {
321        let event_sender = crate::event::create_event_sender();
322        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
323        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
324            .with_id(session_id.clone())
325            .with_cancel_token(cancel_token.child_token());
326        let media_stream = Arc::new(media_stream_builder.build());
327        let call_state = Arc::new(RwLock::new(ActiveCallState {
328            session_id: session_id.clone(),
329            start_time: Utc::now(),
330            ssrc: rand::random::<u32>(),
331            extras,
332            audio_receiver,
333            ..Default::default()
334        }));
335        Self {
336            cancel_token,
337            call_type,
338            session_id,
339            call_state,
340            media_stream,
341            track_config,
342            event_sender,
343            app_state,
344            invitation,
345            cmd_sender,
346            dump_events,
347            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
348        }
349    }
350
351    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
352        self.cmd_sender
353            .send(command)
354            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
355        Ok(())
356    }
357
358    /// Create a new ActiveCallReceiver for this ActiveCall
359    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
360    /// before calling `serve()`
361    pub fn new_receiver(&self) -> ActiveCallReceiver {
362        ActiveCallReceiver {
363            cmd_receiver: self.cmd_sender.subscribe(),
364            dump_cmd_receiver: self.cmd_sender.subscribe(),
365            dump_event_receiver: self.event_sender.subscribe(),
366        }
367    }
368
369    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
370        let ActiveCallReceiver {
371            mut cmd_receiver,
372            dump_cmd_receiver,
373            dump_event_receiver,
374        } = receiver;
375
376        let process_command_loop = async move {
377            while let Ok(command) = cmd_receiver.recv().await {
378                match self.dispatch(command).await {
379                    Ok(_) => (),
380                    Err(e) => {
381                        warn!(session_id = self.session_id, "{}", e);
382                        self.event_sender
383                            .send(SessionEvent::Error {
384                                track_id: self.session_id.clone(),
385                                timestamp: crate::media::get_timestamp(),
386                                sender: "command".to_string(),
387                                error: e.to_string(),
388                                code: None,
389                            })
390                            .ok();
391                    }
392                }
393            }
394        };
395        self.app_state
396            .total_calls
397            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
398
399        tokio::join!(
400            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
401            async {
402                select! {
403                    _ = process_command_loop => {
404                        info!(session_id = self.session_id, "command loop done");
405                    }
406                    _ = self.process() => {
407                        info!(session_id = self.session_id, "call serve done");
408                    }
409                    _ = self.cancel_token.cancelled() => {
410                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
411                    }
412                }
413                self.cancel_token.cancel();
414            }
415        );
416        Ok(())
417    }
418
419    async fn process(&self) -> Result<()> {
420        let mut event_receiver = self.event_sender.subscribe();
421
422        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
423        let input_timeout_expire_ref = input_timeout_expire.clone();
424        let event_sender = self.event_sender.clone();
425        let wait_input_timeout_loop = async {
426            loop {
427                let (start_time, expire) = { *input_timeout_expire.lock().await };
428                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
429                    info!(session_id = self.session_id, "wait input timeout reached");
430                    *input_timeout_expire.lock().await = (0, 0);
431                    event_sender
432                        .send(SessionEvent::Silence {
433                            track_id: self.server_side_track_id.clone(),
434                            timestamp: crate::media::get_timestamp(),
435                            start_time,
436                            duration: expire as u64,
437                            samples: None,
438                        })
439                        .ok();
440                }
441                sleep(Duration::from_millis(100)).await;
442            }
443        };
444        let server_side_track_id = self.server_side_track_id.clone();
445        let event_hook_loop = async move {
446            while let Ok(event) = event_receiver.recv().await {
447                match event {
448                    SessionEvent::Speaking { .. }
449                    | SessionEvent::Dtmf { .. }
450                    | SessionEvent::AsrDelta { .. }
451                    | SessionEvent::AsrFinal { .. }
452                    | SessionEvent::TrackStart { .. } => {
453                        *input_timeout_expire_ref.lock().await = (0, 0);
454                    }
455                    SessionEvent::TrackEnd {
456                        track_id,
457                        play_id,
458                        ssrc,
459                        ..
460                    } => {
461                        if track_id != server_side_track_id {
462                            continue;
463                        }
464
465                        let (moh_path, auto_hangup, wait_timeout_val) = {
466                            let mut state = self.call_state.write().await;
467                            if play_id != state.current_play_id {
468                                debug!(
469                                    session_id = self.session_id,
470                                    ?play_id,
471                                    current = ?state.current_play_id,
472                                    "ignoring interrupted track end"
473                                );
474                                continue;
475                            }
476                            state.current_play_id = None;
477                            (
478                                state.moh.clone(),
479                                state.auto_hangup.clone(),
480                                state.wait_input_timeout.take(),
481                            )
482                        };
483
484                        if let Some(path) = moh_path {
485                            info!(session_id = self.session_id, "looping moh: {}", path);
486                            let ssrc = rand::random::<u32>();
487                            let file_track = FileTrack::new(self.server_side_track_id.clone())
488                                .with_play_id(Some(path.clone()))
489                                .with_ssrc(ssrc)
490                                .with_path(path.clone())
491                                .with_cancel_token(self.cancel_token.child_token());
492                            self.update_track_wrapper(Box::new(file_track), Some(path))
493                                .await;
494                            continue;
495                        }
496
497                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
498                            if hangup_ssrc == ssrc {
499                                info!(
500                                    session_id = self.session_id,
501                                    ssrc, "auto hangup when track end track_id:{}", track_id
502                                );
503                                self.do_hangup(Some(hangup_reason), None).await.ok();
504                            }
505                        }
506
507                        if let Some(timeout) = wait_timeout_val {
508                            let expire = if timeout > 0 {
509                                (crate::media::get_timestamp(), timeout)
510                            } else {
511                                (0, 0)
512                            };
513                            *input_timeout_expire_ref.lock().await = expire;
514                        }
515                    }
516                    SessionEvent::Interrupt { receiver } => {
517                        let track_id =
518                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
519                        if track_id == self.server_side_track_id {
520                            debug!(
521                                session_id = self.session_id,
522                                "received interrupt event, stopping playback"
523                            );
524                            self.do_interrupt(true).await.ok();
525                        }
526                    }
527                    SessionEvent::Inactivity { track_id, .. } => {
528                        info!(
529                            session_id = self.session_id,
530                            track_id, "inactivity timeout reached, hanging up"
531                        );
532                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
533                            .await
534                            .ok();
535                    }
536                    SessionEvent::Error { track_id, .. } => {
537                        if track_id != server_side_track_id {
538                            continue;
539                        }
540
541                        let moh_info = {
542                            let mut state = self.call_state.write().await;
543                            if let Some(path) = state.moh.clone() {
544                                let fallback = "./config/sounds/refer_moh.wav".to_string();
545                                let next_path = if path != fallback
546                                    && std::path::Path::new(&fallback).exists()
547                                {
548                                    info!(
549                                        session_id = self.session_id,
550                                        "moh error, switching to fallback: {}", fallback
551                                    );
552                                    state.moh = Some(fallback.clone());
553                                    fallback
554                                } else {
555                                    info!(
556                                        session_id = self.session_id,
557                                        "looping moh on error: {}", path
558                                    );
559                                    path
560                                };
561                                Some(next_path)
562                            } else {
563                                None
564                            }
565                        };
566
567                        if let Some(next_path) = moh_info {
568                            let ssrc = rand::random::<u32>();
569                            let file_track = FileTrack::new(self.server_side_track_id.clone())
570                                .with_play_id(Some(next_path.clone()))
571                                .with_ssrc(ssrc)
572                                .with_path(next_path.clone())
573                                .with_cancel_token(self.cancel_token.child_token());
574                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
575                                .await;
576                            continue;
577                        }
578                    }
579                    _ => {}
580                }
581            }
582        };
583
584        select! {
585            _ = wait_input_timeout_loop=>{
586                info!(session_id = self.session_id, "wait input timeout loop done");
587            }
588            _ = self.media_stream.serve() => {
589                info!(session_id = self.session_id, "media stream loop done");
590            }
591            _ = event_hook_loop => {
592                info!(session_id = self.session_id, "event loop done");
593            }
594        }
595        Ok(())
596    }
597
598    async fn dispatch(&self, command: Command) -> Result<()> {
599        match command {
600            Command::Invite { option } => self.do_invite(option).await,
601            Command::Accept { option } => self.do_accept(option).await,
602            Command::Reject { reason, code } => {
603                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
604                    .await
605            }
606            Command::Ringing {
607                ringtone,
608                recorder,
609                early_media,
610            } => self.do_ringing(ringtone, recorder, early_media).await,
611            Command::Tts {
612                text,
613                speaker,
614                play_id,
615                auto_hangup,
616                streaming,
617                end_of_stream,
618                option,
619                wait_input_timeout,
620                base64,
621            } => {
622                self.do_tts(
623                    text,
624                    speaker,
625                    play_id,
626                    auto_hangup,
627                    streaming.unwrap_or_default(),
628                    end_of_stream.unwrap_or_default(),
629                    option,
630                    wait_input_timeout,
631                    base64.unwrap_or_default(),
632                )
633                .await
634            }
635            Command::Play {
636                url,
637                play_id,
638                auto_hangup,
639                wait_input_timeout,
640            } => {
641                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
642                    .await
643            }
644            Command::Hangup { reason, initiator } => {
645                let reason = reason.map(|r| {
646                    r.parse::<CallRecordHangupReason>()
647                        .unwrap_or(CallRecordHangupReason::BySystem)
648                });
649                self.do_hangup(reason, initiator).await
650            }
651            Command::Refer {
652                caller,
653                callee,
654                options,
655            } => self.do_refer(caller, callee, options).await,
656            Command::Mute { track_id } => self.do_mute(track_id).await,
657            Command::Unmute { track_id } => self.do_unmute(track_id).await,
658            Command::Pause {} => self.do_pause().await,
659            Command::Resume {} => self.do_resume().await,
660            Command::Interrupt {
661                graceful: passage,
662                fade_out_ms: _,
663            } => self.do_interrupt(passage.unwrap_or_default()).await,
664            Command::History { speaker, text } => self.do_history(speaker, text).await,
665        }
666    }
667
668    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
669        if let Some(recorder_option) = &option.recorder {
670            let mut recorder_file = recorder_option.recorder_file.clone();
671            if recorder_file.contains("{id}") {
672                recorder_file = recorder_file.replace("{id}", &self.session_id);
673            }
674
675            let recorder_file = if recorder_file.is_empty() {
676                self.app_state.get_recorder_file(&self.session_id)
677            } else {
678                let p = Path::new(&recorder_file);
679                p.is_absolute()
680                    .then(|| recorder_file.clone())
681                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
682            };
683            info!(
684                session_id = self.session_id,
685                recorder_file, "created recording file"
686            );
687
688            let track_samplerate = self.track_config.samplerate;
689            let recorder_samplerate = if track_samplerate > 0 {
690                track_samplerate
691            } else {
692                recorder_option.samplerate
693            };
694            let recorder_ptime = if recorder_option.ptime == 0 {
695                200
696            } else {
697                recorder_option.ptime
698            };
699            let requested_format = recorder_option
700                .format
701                .unwrap_or(self.app_state.config.recorder_format());
702            let format = requested_format.effective();
703            if requested_format != format {
704                warn!(
705                    session_id = self.session_id,
706                    requested = requested_format.extension(),
707                    "Recorder format fallback to wav due to unsupported feature"
708                );
709            }
710            let mut recorder_config = RecorderOption {
711                recorder_file,
712                samplerate: recorder_samplerate,
713                ptime: recorder_ptime,
714                format: Some(format),
715            };
716            recorder_config.ensure_path_extension(format);
717            Some(recorder_config)
718        } else {
719            None
720        }
721    }
722
723    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
724        // Merge with existing configuration (e.g., from playbook)
725        {
726            let state = self.call_state.read().await;
727            option = state.merge_option(option);
728        }
729
730        option.check_default();
731        if let Some(opt) = self.build_record_option(&option) {
732            self.media_stream.update_recorder_option(opt).await;
733        }
734
735        if let Some(opt) = &option.media_pass {
736            let track_id = self.server_side_track_id.clone();
737            let cancel_token = self.cancel_token.child_token();
738            let ssrc = rand::random::<u32>();
739            let media_pass_track = MediaPassTrack::new(
740                self.session_id.clone(),
741                ssrc,
742                track_id,
743                cancel_token,
744                opt.clone(),
745            );
746            self.update_track_wrapper(Box::new(media_pass_track), None)
747                .await;
748        }
749
750        info!(
751            session_id = self.session_id,
752            call_type = ?self.call_type,
753            sender,
754            ?option,
755            "caller with option"
756        );
757
758        match self.setup_caller_track(&option).await {
759            Ok(_) => return Ok(option),
760            Err(e) => {
761                self.app_state
762                    .total_failed_calls
763                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
764                let error_event = crate::event::SessionEvent::Error {
765                    track_id: self.session_id.clone(),
766                    timestamp: crate::media::get_timestamp(),
767                    sender,
768                    error: e.to_string(),
769                    code: None,
770                };
771                self.event_sender.send(error_event).ok();
772                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
773                    .await
774                    .ok();
775                return Err(e);
776            }
777        }
778    }
779
780    async fn do_invite(&self, option: CallOption) -> Result<()> {
781        self.invite_or_accept(option, "invite".to_string())
782            .await
783            .map(|_| ())
784    }
785
786    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
787        let has_pending = self
788            .invitation
789            .find_dialog_id_by_session_id(&self.session_id)
790            .is_some();
791        let ready_to_answer_val = {
792            let state = self.call_state.read().await;
793            state.ready_to_answer.is_none()
794        };
795
796        if ready_to_answer_val {
797            if !has_pending {
798                // emit reject event
799                warn!(session_id = self.session_id, "no pending call to accept");
800                let rejet_event = crate::event::SessionEvent::Reject {
801                    track_id: self.session_id.clone(),
802                    timestamp: crate::media::get_timestamp(),
803                    reason: "no pending call".to_string(),
804                    refer: None,
805                    code: Some(486),
806                };
807                self.event_sender.send(rejet_event).ok();
808                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
809                    .await
810                    .ok();
811                return Err(anyhow::anyhow!("no pending call to accept"));
812            }
813            option = self.invite_or_accept(option, "accept".to_string()).await?;
814        } else {
815            option.check_default();
816            self.call_state.write().await.option = Some(option.clone());
817        }
818        info!(session_id = self.session_id, ?option, "accepting call");
819        let ready = self.call_state.write().await.ready_to_answer.take();
820        if let Some((answer, track, dialog)) = ready {
821            info!(
822                session_id = self.session_id,
823                track_id = track.as_ref().map(|t| t.id()),
824                "ready to answer with track"
825            );
826
827            let headers = vec![rsip::Header::ContentType(
828                "application/sdp".to_string().into(),
829            )];
830
831            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
832                Ok(_) => {
833                    {
834                        let mut state = self.call_state.write().await;
835                        state.answer = Some(answer);
836                        state.answer_time = Some(Utc::now());
837                    }
838                    self.finish_caller_stack(&option, track).await?;
839                }
840                Err(e) => {
841                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
842                    return Err(anyhow::anyhow!("failed to accept call"));
843                }
844            }
845        }
846        return Ok(());
847    }
848
849    async fn do_reject(
850        &self,
851        code: Option<rsip::StatusCode>,
852        reason: Option<String>,
853    ) -> Result<()> {
854        match self
855            .invitation
856            .find_dialog_id_by_session_id(&self.session_id)
857        {
858            Some(id) => {
859                info!(
860                    session_id = self.session_id,
861                    ?reason,
862                    ?code,
863                    "rejecting call"
864                );
865                self.invitation.hangup(id, code, reason).await
866            }
867            None => Ok(()),
868        }
869    }
870
871    async fn do_ringing(
872        &self,
873        ringtone: Option<String>,
874        recorder: Option<RecorderOption>,
875        early_media: Option<bool>,
876    ) -> Result<()> {
877        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
878        if ready_to_answer_val {
879            let option = CallOption {
880                recorder,
881                ..Default::default()
882            };
883            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
884        }
885
886        let state = self.call_state.read().await;
887        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
888            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
889                let headers = vec![rsip::Header::ContentType(
890                    "application/sdp".to_string().into(),
891                )];
892                (Some(headers), Some(answer.as_bytes().to_vec()))
893            } else {
894                (None, None)
895            };
896
897            dialog.ringing(headers, body).ok();
898            info!(
899                session_id = self.session_id,
900                ringtone, early_media, "playing ringtone"
901            );
902            if let Some(ringtone_url) = ringtone {
903                drop(state);
904                self.do_play(ringtone_url, None, None, None).await.ok();
905            } else {
906                info!(session_id = self.session_id, "no ringtone to play");
907            }
908        }
909        Ok(())
910    }
911
912    async fn do_tts(
913        &self,
914        text: String,
915        speaker: Option<String>,
916        play_id: Option<String>,
917        auto_hangup: Option<bool>,
918        streaming: bool,
919        end_of_stream: bool,
920        option: Option<SynthesisOption>,
921        wait_input_timeout: Option<u32>,
922        base64: bool,
923    ) -> Result<()> {
924        let tts_option = {
925            let call_state = self.call_state.read().await;
926            match call_state.option.clone().unwrap_or_default().tts {
927                Some(opt) => opt.merge_with(option),
928                None => {
929                    if let Some(opt) = option {
930                        opt
931                    } else {
932                        return Err(anyhow::anyhow!("no tts option available"));
933                    }
934                }
935            }
936        };
937        let speaker = match speaker {
938            Some(s) => Some(s),
939            None => tts_option.speaker.clone(),
940        };
941
942        let mut play_command = SynthesisCommand {
943            text,
944            speaker,
945            play_id: play_id.clone(),
946            streaming,
947            end_of_stream,
948            option: tts_option,
949            base64,
950        };
951        info!(
952            session_id = self.session_id,
953            provider = ?play_command.option.provider,
954            text = %play_command.text.chars().take(10).collect::<String>(),
955            speaker = play_command.speaker.as_deref(),
956            auto_hangup = auto_hangup.unwrap_or_default(),
957            play_id = play_command.play_id.as_deref(),
958            streaming = play_command.streaming,
959            end_of_stream = play_command.end_of_stream,
960            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
961            is_base64 = play_command.base64,
962            "new synthesis"
963        );
964
965        let ssrc = rand::random::<u32>();
966        let (should_interrupt, picked_ssrc) = {
967            let mut state = self.call_state.write().await;
968
969            let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
970                if play_id.is_some() && state.current_play_id != play_id {
971                    (ssrc, true)
972                } else {
973                    (handle.ssrc, false)
974                }
975            } else {
976                (ssrc, false)
977            };
978
979            state.auto_hangup = match auto_hangup {
980                Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
981                _ => state.auto_hangup.clone(),
982            };
983            state.wait_input_timeout = wait_input_timeout;
984
985            state.current_play_id = play_id.clone();
986            (changed, target_ssrc)
987        };
988
989        if should_interrupt {
990            let _ = self.do_interrupt(false).await;
991        }
992
993        let existing_handle = self.call_state.read().await.tts_handle.clone();
994        if let Some(tts_handle) = existing_handle {
995            match tts_handle.try_send(play_command) {
996                Ok(_) => return Ok(()),
997                Err(e) => {
998                    play_command = e.0;
999                }
1000            }
1001        }
1002
1003        let (new_handle, tts_track) = StreamEngine::create_tts_track(
1004            self.app_state.stream_engine.clone(),
1005            self.cancel_token.child_token(),
1006            self.session_id.clone(),
1007            self.server_side_track_id.clone(),
1008            picked_ssrc,
1009            play_id.clone(),
1010            streaming,
1011            &play_command.option,
1012        )
1013        .await?;
1014
1015        new_handle.try_send(play_command)?;
1016        self.call_state.write().await.tts_handle = Some(new_handle);
1017        self.update_track_wrapper(tts_track, play_id).await;
1018        Ok(())
1019    }
1020
1021    async fn do_play(
1022        &self,
1023        url: String,
1024        play_id: Option<String>,
1025        auto_hangup: Option<bool>,
1026        wait_input_timeout: Option<u32>,
1027    ) -> Result<()> {
1028        let ssrc = rand::random::<u32>();
1029        info!(
1030            session_id = self.session_id,
1031            ssrc, url, play_id, auto_hangup, "play file track"
1032        );
1033
1034        let play_id = play_id.or(Some(url.clone()));
1035
1036        let file_track = FileTrack::new(self.server_side_track_id.clone())
1037            .with_play_id(play_id.clone())
1038            .with_ssrc(ssrc)
1039            .with_path(url)
1040            .with_cancel_token(self.cancel_token.child_token());
1041
1042        {
1043            let mut state = self.call_state.write().await;
1044            state.tts_handle = None;
1045            state.auto_hangup = match auto_hangup {
1046                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1047                _ => None,
1048            };
1049            state.wait_input_timeout = wait_input_timeout;
1050        }
1051
1052        self.update_track_wrapper(Box::new(file_track), play_id)
1053            .await;
1054        Ok(())
1055    }
1056
1057    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1058        self.event_sender
1059            .send(SessionEvent::AddHistory {
1060                sender: Some(self.session_id.clone()),
1061                timestamp: crate::media::get_timestamp(),
1062                speaker,
1063                text,
1064            })
1065            .map(|_| ())
1066            .map_err(Into::into)
1067    }
1068
1069    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1070        {
1071            let mut state = self.call_state.write().await;
1072            state.tts_handle = None;
1073            state.moh = None;
1074        }
1075        self.media_stream
1076            .remove_track(&self.server_side_track_id, graceful)
1077            .await;
1078        Ok(())
1079    }
1080    async fn do_pause(&self) -> Result<()> {
1081        Ok(())
1082    }
1083    async fn do_resume(&self) -> Result<()> {
1084        Ok(())
1085    }
1086    async fn do_hangup(
1087        &self,
1088        reason: Option<CallRecordHangupReason>,
1089        initiator: Option<String>,
1090    ) -> Result<()> {
1091        info!(
1092            session_id = self.session_id,
1093            ?reason,
1094            ?initiator,
1095            "do_hangup"
1096        );
1097
1098        // Set hangup reason based on initiator and reason
1099        let hangup_reason = match initiator.as_deref() {
1100            Some("caller") => CallRecordHangupReason::ByCaller,
1101            Some("callee") => CallRecordHangupReason::ByCallee,
1102            Some("system") => CallRecordHangupReason::Autohangup,
1103            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1104        };
1105
1106        self.media_stream
1107            .stop(Some(hangup_reason.to_string()), initiator);
1108
1109        self.call_state
1110            .write()
1111            .await
1112            .set_hangup_reason(hangup_reason);
1113        Ok(())
1114    }
1115
1116    async fn do_refer(
1117        &self,
1118        caller: String,
1119        callee: String,
1120        refer_option: Option<ReferOption>,
1121    ) -> Result<()> {
1122        self.do_interrupt(false).await.ok();
1123        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1124        if let Some(ref path) = moh {
1125            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1126                let fallback = "./config/sounds/refer_moh.wav";
1127                if std::path::Path::new(fallback).exists() {
1128                    info!(
1129                        session_id = self.session_id,
1130                        "moh {} not found, using fallback {}", path, fallback
1131                    );
1132                    moh = Some(fallback.to_string());
1133                }
1134            }
1135        }
1136        let session_id = self.session_id.clone();
1137        let track_id = self.server_side_track_id.clone();
1138
1139        let recorder = {
1140            let cs = self.call_state.read().await;
1141            cs.option
1142                .as_ref()
1143                .map(|o| o.recorder.clone())
1144                .unwrap_or_default()
1145        };
1146
1147        let call_option = CallOption {
1148            caller: Some(caller),
1149            callee: Some(callee.clone()),
1150            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1151            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1152            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1153            recorder,
1154            ..Default::default()
1155        };
1156
1157        let mut invite_option = call_option.build_invite_option()?;
1158        invite_option.call_id = Some(self.session_id.clone());
1159
1160        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1161
1162        {
1163            let cs = self.call_state.read().await;
1164            if let Some(opt) = cs.option.as_ref() {
1165                if let Some(callee) = opt.callee.as_ref() {
1166                    headers.push(rsip::Header::Other(
1167                        "X-Referred-To".to_string(),
1168                        callee.clone(),
1169                    ));
1170                }
1171                if let Some(caller) = opt.caller.as_ref() {
1172                    headers.push(rsip::Header::Other(
1173                        "X-Referred-From".to_string(),
1174                        caller.clone(),
1175                    ));
1176                }
1177            }
1178        }
1179
1180        headers.push(rsip::Header::Other(
1181            "X-Referred-Id".to_string(),
1182            self.session_id.clone(),
1183        ));
1184
1185        let ssrc = rand::random::<u32>();
1186        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1187            start_time: Utc::now(),
1188            ssrc,
1189            option: Some(call_option.clone()),
1190            is_refer: true,
1191            ..Default::default()
1192        }));
1193
1194        {
1195            let mut cs = self.call_state.write().await;
1196            cs.refer_callstate.replace(refer_call_state.clone());
1197        }
1198
1199        let auto_hangup_requested = refer_option
1200            .as_ref()
1201            .and_then(|o| o.auto_hangup)
1202            .unwrap_or(true);
1203
1204        if auto_hangup_requested {
1205            self.call_state.write().await.auto_hangup =
1206                Some((ssrc, CallRecordHangupReason::ByRefer));
1207        } else {
1208            self.call_state.write().await.auto_hangup = None;
1209        }
1210
1211        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1212
1213        info!(
1214            session_id = self.session_id,
1215            ssrc,
1216            auto_hangup = auto_hangup_requested,
1217            callee,
1218            timeout_secs,
1219            "do_refer"
1220        );
1221
1222        let r = tokio::time::timeout(
1223            Duration::from_secs(timeout_secs as u64),
1224            self.create_outgoing_sip_track(
1225                self.cancel_token.child_token(),
1226                refer_call_state.clone(),
1227                &track_id,
1228                invite_option,
1229                &call_option,
1230                moh,
1231                auto_hangup_requested,
1232            ),
1233        )
1234        .await;
1235
1236        {
1237            self.call_state.write().await.moh = None;
1238        }
1239
1240        let result = match r {
1241            Ok(res) => res,
1242            Err(_) => {
1243                warn!(
1244                    session_id = session_id,
1245                    "refer sip track creation timed out after {} seconds", timeout_secs
1246                );
1247                self.event_sender
1248                    .send(SessionEvent::Reject {
1249                        track_id,
1250                        timestamp: crate::media::get_timestamp(),
1251                        reason: "Timeout when refer".into(),
1252                        code: Some(408),
1253                        refer: Some(true),
1254                    })
1255                    .ok();
1256                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1257            }
1258        };
1259
1260        match result {
1261            Ok(answer) => {
1262                self.event_sender
1263                    .send(SessionEvent::Answer {
1264                        timestamp: crate::media::get_timestamp(),
1265                        track_id,
1266                        sdp: answer,
1267                        refer: Some(true),
1268                    })
1269                    .ok();
1270            }
1271            Err(e) => {
1272                warn!(
1273                    session_id = session_id,
1274                    "failed to create refer sip track: {}", e
1275                );
1276                match &e {
1277                    rsipstack::Error::DialogError(reason, _, code) => {
1278                        self.event_sender
1279                            .send(SessionEvent::Reject {
1280                                track_id,
1281                                timestamp: crate::media::get_timestamp(),
1282                                reason: reason.clone(),
1283                                code: Some(code.code() as u32),
1284                                refer: Some(true),
1285                            })
1286                            .ok();
1287                    }
1288                    _ => {}
1289                }
1290                return Err(e.into());
1291            }
1292        }
1293        Ok(())
1294    }
1295
1296    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1297        self.media_stream.mute_track(track_id).await;
1298        Ok(())
1299    }
1300
1301    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1302        self.media_stream.unmute_track(track_id).await;
1303        Ok(())
1304    }
1305
1306    pub async fn cleanup(&self) -> Result<()> {
1307        self.call_state.write().await.tts_handle = None;
1308        self.media_stream.cleanup().await.ok();
1309        Ok(())
1310    }
1311
1312    pub fn get_callrecord(&self) -> Option<CallRecord> {
1313        self.call_state.try_read().ok().map(|call_state| {
1314            call_state.build_callrecord(
1315                self.app_state.clone(),
1316                self.session_id.clone(),
1317                self.call_type.clone(),
1318            )
1319        })
1320    }
1321
1322    async fn dump_to_file(
1323        &self,
1324        dump_file: &mut File,
1325        cmd_receiver: &mut CommandReceiver,
1326        event_receiver: &mut EventReceiver,
1327    ) {
1328        loop {
1329            select! {
1330                _ = self.cancel_token.cancelled() => {
1331                    break;
1332                }
1333                Ok(cmd) = cmd_receiver.recv() => {
1334                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1335                        .await;
1336                }
1337                Ok(event) = event_receiver.recv() => {
1338                    if matches!(event, SessionEvent::Binary{..}) {
1339                        continue;
1340                    }
1341                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1342                        .await;
1343                }
1344            };
1345        }
1346    }
1347
1348    async fn dump_loop(
1349        &self,
1350        dump_events: bool,
1351        mut dump_cmd_receiver: CommandReceiver,
1352        mut dump_event_receiver: EventReceiver,
1353    ) {
1354        if !dump_events {
1355            return;
1356        }
1357
1358        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1359        let mut dump_file = match File::options()
1360            .create(true)
1361            .append(true)
1362            .open(&file_name)
1363            .await
1364        {
1365            Ok(file) => file,
1366            Err(e) => {
1367                warn!(
1368                    session_id = self.session_id,
1369                    file_name, "failed to open dump events file: {}", e
1370                );
1371                return;
1372            }
1373        };
1374        self.dump_to_file(
1375            &mut dump_file,
1376            &mut dump_cmd_receiver,
1377            &mut dump_event_receiver,
1378        )
1379        .await;
1380
1381        while let Ok(event) = dump_event_receiver.try_recv() {
1382            if matches!(event, SessionEvent::Binary { .. }) {
1383                continue;
1384            }
1385            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1386        }
1387    }
1388
1389    pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1390        let mut rtc_config = RtcTrackConfig::default();
1391        rtc_config.mode = rustrtc::TransportMode::Rtp;
1392
1393        if let Some(codecs) = &self.app_state.config.codecs {
1394            let mut codec_types = Vec::new();
1395            for c in codecs {
1396                match c.to_lowercase().as_str() {
1397                    "pcmu" => codec_types.push(CodecType::PCMU),
1398                    "pcma" => codec_types.push(CodecType::PCMA),
1399                    "g722" => codec_types.push(CodecType::G722),
1400                    "g729" => codec_types.push(CodecType::G729),
1401                    "opus" => codec_types.push(CodecType::Opus),
1402                    "dtmf" | "2833" | "telephone_event" => {
1403                        codec_types.push(CodecType::TelephoneEvent)
1404                    }
1405                    _ => {}
1406                }
1407            }
1408            if !codec_types.is_empty() {
1409                rtc_config.preferred_codec = Some(codec_types[0].clone());
1410                rtc_config.codecs = codec_types;
1411            }
1412        }
1413
1414        if rtc_config.preferred_codec.is_none() {
1415            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1416        }
1417
1418        rtc_config.rtp_port_range = self
1419            .app_state
1420            .config
1421            .rtp_start_port
1422            .zip(self.app_state.config.rtp_end_port);
1423
1424        if let Some(ref external_ip) = self.app_state.config.external_ip {
1425            rtc_config.external_ip = Some(external_ip.clone());
1426        }
1427        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1428            rtc_config.bind_ip = Some(bind_ip.clone());
1429        }
1430
1431        rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1432
1433        let mut track = RtcTrack::new(
1434            self.cancel_token.child_token(),
1435            track_id,
1436            self.track_config.clone(),
1437            rtc_config,
1438        )
1439        .with_ssrc(ssrc);
1440
1441        track.create().await?;
1442
1443        Ok(track)
1444    }
1445
1446    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1447        let hangup_headers = option
1448            .sip
1449            .as_ref()
1450            .and_then(|s| s.hangup_headers.as_ref())
1451            .map(|headers_map| {
1452                headers_map
1453                    .iter()
1454                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1455                    .collect::<Vec<rsip::Header>>()
1456            });
1457        self.call_state.write().await.option = Some(option.clone());
1458        info!(
1459            session_id = self.session_id,
1460            call_type = ?self.call_type,
1461            "setup caller track"
1462        );
1463
1464        let track = match self.call_type {
1465            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1466            ActiveCallType::WebSocket => {
1467                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1468                if let Some(receiver) = audio_receiver {
1469                    Some(self.create_websocket_track(receiver).await?)
1470                } else {
1471                    None
1472                }
1473            }
1474            ActiveCallType::Sip => {
1475                if let Some(dialog_id) = self
1476                    .invitation
1477                    .find_dialog_id_by_session_id(&self.session_id)
1478                {
1479                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1480                        return self
1481                            .prepare_incoming_sip_track(
1482                                self.cancel_token.clone(),
1483                                self.call_state.clone(),
1484                                &self.session_id,
1485                                pending_dialog,
1486                                hangup_headers,
1487                            )
1488                            .await;
1489                    }
1490                }
1491
1492                // Auto-inject credentials from registered users if not already provided
1493                let mut option = option.clone();
1494                if option.sip.is_none()
1495                    || option
1496                        .sip
1497                        .as_ref()
1498                        .and_then(|s| s.username.as_ref())
1499                        .is_none()
1500                {
1501                    if let Some(callee) = &option.callee {
1502                        if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1503                            if option.sip.is_none() {
1504                                option.sip = Some(crate::SipOption {
1505                                    username: Some(cred.username.clone()),
1506                                    password: Some(cred.password.clone()),
1507                                    realm: cred.realm.clone(),
1508                                    ..Default::default()
1509                                });
1510                            }
1511                        }
1512                    }
1513                }
1514
1515                let mut invite_option = option.build_invite_option()?;
1516                invite_option.call_id = Some(self.session_id.clone());
1517
1518                match self
1519                    .create_outgoing_sip_track(
1520                        self.cancel_token.clone(),
1521                        self.call_state.clone(),
1522                        &self.session_id,
1523                        invite_option,
1524                        &option,
1525                        None,
1526                        false,
1527                    )
1528                    .await
1529                {
1530                    Ok(answer) => {
1531                        self.event_sender
1532                            .send(SessionEvent::Answer {
1533                                timestamp: crate::media::get_timestamp(),
1534                                track_id: self.session_id.clone(),
1535                                sdp: answer,
1536                                refer: Some(false),
1537                            })
1538                            .ok();
1539                        return Ok(());
1540                    }
1541                    Err(e) => {
1542                        warn!(
1543                            session_id = self.session_id,
1544                            "failed to create sip track: {}", e
1545                        );
1546                        match &e {
1547                            rsipstack::Error::DialogError(reason, _, code) => {
1548                                self.event_sender
1549                                    .send(SessionEvent::Reject {
1550                                        track_id: self.session_id.clone(),
1551                                        timestamp: crate::media::get_timestamp(),
1552                                        reason: reason.clone(),
1553                                        code: Some(code.code() as u32),
1554                                        refer: Some(false),
1555                                    })
1556                                    .ok();
1557                            }
1558                            _ => {}
1559                        }
1560                        return Err(e.into());
1561                    }
1562                }
1563            }
1564            ActiveCallType::B2bua => {
1565                if let Some(dialog_id) = self
1566                    .invitation
1567                    .find_dialog_id_by_session_id(&self.session_id)
1568                {
1569                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1570                        return self
1571                            .prepare_incoming_sip_track(
1572                                self.cancel_token.clone(),
1573                                self.call_state.clone(),
1574                                &self.session_id,
1575                                pending_dialog,
1576                                hangup_headers,
1577                            )
1578                            .await;
1579                    }
1580                }
1581
1582                warn!(
1583                    session_id = self.session_id,
1584                    "no pending dialog found for B2BUA call"
1585                );
1586                return Err(anyhow::anyhow!(
1587                    "no pending dialog found for session_id: {}",
1588                    self.session_id
1589                ));
1590            }
1591        };
1592        match track {
1593            Some(track) => {
1594                self.finish_caller_stack(&option, Some(track)).await?;
1595            }
1596            None => {
1597                warn!(session_id = self.session_id, "no track created for caller");
1598                return Err(anyhow::anyhow!("no track created for caller"));
1599            }
1600        }
1601        Ok(())
1602    }
1603
1604    async fn finish_caller_stack(
1605        &self,
1606        option: &CallOption,
1607        track: Option<Box<dyn Track>>,
1608    ) -> Result<()> {
1609        if let Some(track) = track {
1610            self.setup_track_with_stream(&option, track).await?;
1611        }
1612
1613        {
1614            let call_state = self.call_state.read().await;
1615            if let Some(ref answer) = call_state.answer {
1616                info!(
1617                    session_id = self.session_id,
1618                    "sending answer event: {}", answer,
1619                );
1620                self.event_sender
1621                    .send(SessionEvent::Answer {
1622                        timestamp: crate::media::get_timestamp(),
1623                        track_id: self.session_id.clone(),
1624                        sdp: answer.clone(),
1625                        refer: Some(false),
1626                    })
1627                    .ok();
1628            } else {
1629                warn!(
1630                    session_id = self.session_id,
1631                    "no answer in state to send event"
1632                );
1633            }
1634        }
1635        Ok(())
1636    }
1637
1638    pub async fn setup_track_with_stream(
1639        &self,
1640        option: &CallOption,
1641        mut track: Box<dyn Track>,
1642    ) -> Result<()> {
1643        let processors = match StreamEngine::create_processors(
1644            self.app_state.stream_engine.clone(),
1645            track.as_ref(),
1646            self.cancel_token.child_token(),
1647            self.event_sender.clone(),
1648            self.media_stream.packet_sender.clone(),
1649            option,
1650        )
1651        .await
1652        {
1653            Ok(processors) => processors,
1654            Err(e) => {
1655                warn!(
1656                    session_id = self.session_id,
1657                    "failed to prepare stream processors: {}", e
1658                );
1659                vec![]
1660            }
1661        };
1662
1663        // Add all processors from the hook
1664        for processor in processors {
1665            track.append_processor(processor);
1666        }
1667
1668        self.update_track_wrapper(track, None).await;
1669        Ok(())
1670    }
1671
1672    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1673        let (ambiance_opt, subscribe) = {
1674            let state = self.call_state.read().await;
1675            let mut opt = state
1676                .option
1677                .as_ref()
1678                .and_then(|o| o.ambiance.clone())
1679                .unwrap_or_default();
1680
1681            if let Some(global) = &self.app_state.config.ambiance {
1682                opt.merge(global);
1683            }
1684
1685            let subscribe = state
1686                .option
1687                .as_ref()
1688                .and_then(|o| o.subscribe)
1689                .unwrap_or_default();
1690
1691            (opt, subscribe)
1692        };
1693        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1694            match AmbianceProcessor::new(ambiance_opt).await {
1695                Ok(ambiance) => {
1696                    info!(session_id = self.session_id, "loaded ambiance processor");
1697                    track.append_processor(Box::new(ambiance));
1698                }
1699                Err(e) => {
1700                    tracing::error!("failed to load ambiance wav {}", e);
1701                }
1702            }
1703        }
1704
1705        if subscribe && self.call_type != ActiveCallType::WebSocket {
1706            let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1707                (0, self.server_side_track_id.clone())
1708            } else {
1709                (1, self.session_id.clone())
1710            };
1711            let sub_processor =
1712                SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1713            track.append_processor(Box::new(sub_processor));
1714        }
1715
1716        self.call_state.write().await.current_play_id = play_id.clone();
1717        self.media_stream.update_track(track, play_id).await;
1718    }
1719
1720    pub async fn create_websocket_track(
1721        &self,
1722        audio_receiver: WebsocketBytesReceiver,
1723    ) -> Result<Box<dyn Track>> {
1724        let (ssrc, codec) = {
1725            let call_state = self.call_state.read().await;
1726            (
1727                call_state.ssrc,
1728                call_state
1729                    .option
1730                    .as_ref()
1731                    .map(|o| o.codec.clone())
1732                    .unwrap_or_default(),
1733            )
1734        };
1735
1736        let ws_track = WebsocketTrack::new(
1737            self.cancel_token.child_token(),
1738            self.session_id.clone(),
1739            self.track_config.clone(),
1740            self.event_sender.clone(),
1741            audio_receiver,
1742            codec,
1743            ssrc,
1744        );
1745
1746        {
1747            let mut call_state = self.call_state.write().await;
1748            call_state.answer_time = Some(Utc::now());
1749            call_state.answer = Some("".to_string());
1750            call_state.last_status_code = 200;
1751        }
1752
1753        Ok(Box::new(ws_track))
1754    }
1755
1756    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1757        let (ssrc, option) = {
1758            let call_state = self.call_state.read().await;
1759            (
1760                call_state.ssrc,
1761                call_state.option.clone().unwrap_or_default(),
1762            )
1763        };
1764
1765        let mut rtc_config = RtcTrackConfig::default();
1766        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1767        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1768
1769        if let Some(codecs) = &self.app_state.config.codecs {
1770            let mut codec_types = Vec::new();
1771            for c in codecs {
1772                match c.to_lowercase().as_str() {
1773                    "pcmu" => codec_types.push(CodecType::PCMU),
1774                    "pcma" => codec_types.push(CodecType::PCMA),
1775                    "g722" => codec_types.push(CodecType::G722),
1776                    "g729" => codec_types.push(CodecType::G729),
1777                    "opus" => codec_types.push(CodecType::Opus),
1778                    "dtmf" | "2833" | "telephone_event" => {
1779                        codec_types.push(CodecType::TelephoneEvent)
1780                    }
1781                    _ => {}
1782                }
1783            }
1784            if !codec_types.is_empty() {
1785                rtc_config.preferred_codec = Some(codec_types[0].clone());
1786                rtc_config.codecs = codec_types;
1787            }
1788        }
1789
1790        if let Some(ref external_ip) = self.app_state.config.external_ip {
1791            rtc_config.external_ip = Some(external_ip.clone());
1792        }
1793        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1794            rtc_config.bind_ip = Some(bind_ip.clone());
1795        }
1796
1797        let mut webrtc_track = RtcTrack::new(
1798            self.cancel_token.child_token(),
1799            self.session_id.clone(),
1800            self.track_config.clone(),
1801            rtc_config,
1802        )
1803        .with_ssrc(ssrc);
1804
1805        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1806        let offer = match option.enable_ipv6 {
1807            Some(false) | None => {
1808                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1809            }
1810            _ => option.offer.clone().unwrap_or("".to_string()),
1811        };
1812        let answer: Option<String>;
1813        match webrtc_track.handshake(offer, timeout).await {
1814            Ok(answer_sdp) => {
1815                answer = match option.enable_ipv6 {
1816                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1817                    Some(true) => Some(answer_sdp.to_string()),
1818                };
1819            }
1820            Err(e) => {
1821                warn!(session_id = self.session_id, "failed to setup track: {}", e);
1822                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1823            }
1824        }
1825
1826        {
1827            let mut call_state = self.call_state.write().await;
1828            call_state.answer_time = Some(Utc::now());
1829            call_state.answer = answer;
1830            call_state.last_status_code = 200;
1831        }
1832        Ok(Box::new(webrtc_track))
1833    }
1834
1835    async fn create_outgoing_sip_track(
1836        &self,
1837        cancel_token: CancellationToken,
1838        call_state_ref: ActiveCallStateRef,
1839        track_id: &String,
1840        mut invite_option: InviteOption,
1841        call_option: &CallOption,
1842        moh: Option<String>,
1843        auto_hangup: bool,
1844    ) -> Result<String, rsipstack::Error> {
1845        let ssrc = call_state_ref.read().await.ssrc;
1846        let rtp_track = self
1847            .create_rtp_track(track_id.clone(), ssrc)
1848            .await
1849            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1850
1851        let offer = Some(
1852            rtp_track
1853                .local_description()
1854                .await
1855                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1856        );
1857
1858        {
1859            let mut cs = call_state_ref.write().await;
1860            if let Some(o) = cs.option.as_mut() {
1861                o.offer = offer.clone();
1862            }
1863            cs.start_time = Utc::now();
1864        };
1865
1866        invite_option.offer = offer.clone().map(|s| s.into());
1867
1868        // Set contact to local SIP endpoint address if not already set explicitly
1869        // Check if contact is still default (no scheme set) or if host is localhost-like
1870        let needs_contact = invite_option.contact.scheme.is_none()
1871            || invite_option
1872                .contact
1873                .host_with_port
1874                .to_string()
1875                .starts_with("127.0.0.1");
1876
1877        if needs_contact {
1878            if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1879                invite_option.contact = rsip::Uri {
1880                    scheme: Some(rsip::Scheme::Sip),
1881                    auth: None,
1882                    host_with_port: addr.addr.clone(),
1883                    params: vec![],
1884                    headers: vec![],
1885                };
1886            }
1887        }
1888
1889        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1890
1891        if let Some(moh) = moh {
1892            let ssrc_and_moh = {
1893                let mut state = call_state_ref.write().await;
1894                state.moh = Some(moh.clone());
1895                if state.current_play_id.is_none() {
1896                    let ssrc = rand::random::<u32>();
1897                    Some((ssrc, moh.clone()))
1898                } else {
1899                    info!(
1900                        session_id = self.session_id,
1901                        "Something is playing, MOH will start after it ends"
1902                    );
1903                    None
1904                }
1905            };
1906
1907            if let Some((ssrc, moh_path)) = ssrc_and_moh {
1908                let file_track = FileTrack::new(self.server_side_track_id.clone())
1909                    .with_play_id(Some(moh_path.clone()))
1910                    .with_ssrc(ssrc)
1911                    .with_path(moh_path.clone())
1912                    .with_cancel_token(self.cancel_token.child_token());
1913                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1914                    .await;
1915            }
1916        } else {
1917            let track = rtp_track_to_setup.take().unwrap();
1918            self.setup_track_with_stream(&call_option, track)
1919                .await
1920                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1921        }
1922
1923        info!(
1924            session_id = self.session_id,
1925            track_id,
1926            contact = %invite_option.contact,
1927            "invite {} -> {} offer: \n{}",
1928            invite_option.caller,
1929            invite_option.callee,
1930            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1931        );
1932
1933        let (dlg_state_sender, dlg_state_receiver) =
1934            self.invitation.dialog_layer.new_dialog_state_channel();
1935
1936        let states = InviteDialogStates {
1937            is_client: true,
1938            session_id: self.session_id.clone(),
1939            track_id: track_id.clone(),
1940            event_sender: self.event_sender.clone(),
1941            media_stream: self.media_stream.clone(),
1942            call_state: call_state_ref.clone(),
1943            cancel_token,
1944            terminated_reason: None,
1945            has_early_media: false,
1946        };
1947
1948        let hangup_headers = call_option
1949            .sip
1950            .as_ref()
1951            .and_then(|s| s.hangup_headers.as_ref())
1952            .map(|headers_map| {
1953                headers_map
1954                    .iter()
1955                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1956                    .collect::<Vec<rsip::Header>>()
1957            });
1958
1959        let mut client_dialog_handler = DialogStateReceiverGuard::new(
1960            self.invitation.dialog_layer.clone(),
1961            dlg_state_receiver,
1962            hangup_headers,
1963        );
1964
1965        crate::spawn(async move {
1966            client_dialog_handler.process_dialog(states).await;
1967        });
1968
1969        let (dialog_id, answer) = self
1970            .invitation
1971            .invite(invite_option, dlg_state_sender)
1972            .await?;
1973
1974        self.call_state.write().await.moh = None;
1975
1976        if let Some(track) = rtp_track_to_setup {
1977            self.setup_track_with_stream(&call_option, track)
1978                .await
1979                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1980        }
1981
1982        let answer = match answer {
1983            Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1984            None => {
1985                warn!(session_id = self.session_id, "no answer received");
1986                return Err(rsipstack::Error::DialogError(
1987                    "No answer received".to_string(),
1988                    dialog_id,
1989                    rsip::StatusCode::NotAcceptableHere,
1990                ));
1991            }
1992        };
1993
1994        {
1995            let mut cs = call_state_ref.write().await;
1996            if cs.answer.is_none() {
1997                cs.answer = Some(answer.clone());
1998            }
1999            if auto_hangup {
2000                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2001            }
2002        }
2003
2004        self.media_stream
2005            .update_remote_description(&track_id, &answer)
2006            .await
2007            .ok();
2008
2009        Ok(answer)
2010    }
2011
2012    /// Detect if SDP is WebRTC format
2013    pub fn is_webrtc_sdp(sdp: &str) -> bool {
2014        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2015            && sdp.contains("a=fingerprint:")
2016    }
2017
2018    pub async fn setup_answer_track(
2019        &self,
2020        ssrc: u32,
2021        option: &CallOption,
2022        offer: String,
2023    ) -> Result<(String, Box<dyn Track>)> {
2024        let offer = match option.enable_ipv6 {
2025            Some(false) | None => strip_ipv6_candidates(&offer),
2026            _ => offer.clone(),
2027        };
2028
2029        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2030
2031        let mut media_track = if Self::is_webrtc_sdp(&offer) {
2032            let mut rtc_config = RtcTrackConfig::default();
2033            rtc_config.mode = rustrtc::TransportMode::WebRtc;
2034            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2035            if let Some(ref external_ip) = self.app_state.config.external_ip {
2036                rtc_config.external_ip = Some(external_ip.clone());
2037            }
2038            if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2039                rtc_config.bind_ip = Some(bind_ip.clone());
2040            }
2041            rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2042
2043            let webrtc_track = RtcTrack::new(
2044                self.cancel_token.child_token(),
2045                self.session_id.clone(),
2046                self.track_config.clone(),
2047                rtc_config,
2048            )
2049            .with_ssrc(ssrc);
2050
2051            Box::new(webrtc_track) as Box<dyn Track>
2052        } else {
2053            let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
2054            Box::new(rtp_track) as Box<dyn Track>
2055        };
2056
2057        let answer = match media_track.handshake(offer.clone(), timeout).await {
2058            Ok(answer) => answer,
2059            Err(e) => {
2060                return Err(anyhow::anyhow!("handshake failed: {e}"));
2061            }
2062        };
2063
2064        return Ok((answer, media_track));
2065    }
2066
2067    pub async fn prepare_incoming_sip_track(
2068        &self,
2069        cancel_token: CancellationToken,
2070        call_state_ref: ActiveCallStateRef,
2071        track_id: &String,
2072        pending_dialog: PendingDialog,
2073        hangup_headers: Option<Vec<rsip::Header>>,
2074    ) -> Result<()> {
2075        let state_receiver = pending_dialog.state_receiver;
2076        //let pending_token_clone = pending_dialog.token;
2077
2078        let states = InviteDialogStates {
2079            is_client: false,
2080            session_id: self.session_id.clone(),
2081            track_id: track_id.clone(),
2082            event_sender: self.event_sender.clone(),
2083            media_stream: self.media_stream.clone(),
2084            call_state: self.call_state.clone(),
2085            cancel_token,
2086            terminated_reason: None,
2087            has_early_media: false,
2088        };
2089
2090        let initial_request = pending_dialog.dialog.initial_request();
2091        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2092
2093        let (ssrc, option) = {
2094            let call_state = call_state_ref.read().await;
2095            (
2096                call_state.ssrc,
2097                call_state.option.clone().unwrap_or_default(),
2098            )
2099        };
2100
2101        match self.setup_answer_track(ssrc, &option, offer).await {
2102            Ok((offer, track)) => {
2103                self.setup_track_with_stream(&option, track).await?;
2104                {
2105                    let mut state = self.call_state.write().await;
2106                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2107                }
2108            }
2109            Err(e) => {
2110                return Err(anyhow::anyhow!("error creating track: {}", e));
2111            }
2112        }
2113
2114        let mut client_dialog_handler = DialogStateReceiverGuard::new(
2115            self.invitation.dialog_layer.clone(),
2116            state_receiver,
2117            hangup_headers,
2118        );
2119
2120        crate::spawn(async move {
2121            client_dialog_handler.process_dialog(states).await;
2122        });
2123        Ok(())
2124    }
2125}
2126
2127impl Drop for ActiveCall {
2128    fn drop(&mut self) {
2129        info!(session_id = self.session_id, "dropping active call");
2130        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2131            if let Some(record) = self.get_callrecord() {
2132                if let Err(e) = sender.send(record) {
2133                    warn!(
2134                        session_id = self.session_id,
2135                        "failed to send call record: {}", e
2136                    );
2137                }
2138            }
2139        }
2140    }
2141}
2142
2143impl ActiveCallState {
2144    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2145        if let Some(existing) = &self.option {
2146            if option.asr.is_none() {
2147                option.asr = existing.asr.clone();
2148            }
2149            if option.tts.is_none() {
2150                option.tts = existing.tts.clone();
2151            }
2152            if option.vad.is_none() {
2153                option.vad = existing.vad.clone();
2154            }
2155            if option.denoise.is_none() {
2156                option.denoise = existing.denoise;
2157            }
2158            if option.recorder.is_none() {
2159                option.recorder = existing.recorder.clone();
2160            }
2161            if option.eou.is_none() {
2162                option.eou = existing.eou.clone();
2163            }
2164            if option.extra.is_none() {
2165                option.extra = existing.extra.clone();
2166            }
2167            if option.ambiance.is_none() {
2168                option.ambiance = existing.ambiance.clone();
2169            }
2170        }
2171        option
2172    }
2173
2174    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2175        if self.hangup_reason.is_none() {
2176            self.hangup_reason = Some(reason);
2177        }
2178    }
2179
2180    pub fn build_hangup_event(
2181        &self,
2182        track_id: TrackId,
2183        initiator: Option<String>,
2184    ) -> crate::event::SessionEvent {
2185        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2186        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2187        let extra = self.extras.clone();
2188
2189        crate::event::SessionEvent::Hangup {
2190            track_id,
2191            timestamp: crate::media::get_timestamp(),
2192            reason: Some(format!("{:?}", self.hangup_reason)),
2193            initiator,
2194            start_time: self.start_time.to_rfc3339(),
2195            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2196            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2197            hangup_time: Utc::now().to_rfc3339(),
2198            extra,
2199            from: from.map(|f| f.into()),
2200            to: to.map(|f| f.into()),
2201            refer: Some(self.is_refer),
2202        }
2203    }
2204
2205    pub fn build_callrecord(
2206        &self,
2207        app_state: AppState,
2208        session_id: String,
2209        call_type: ActiveCallType,
2210    ) -> CallRecord {
2211        let option = self.option.clone().unwrap_or_default();
2212        let recorder = if option.recorder.is_some() {
2213            let recorder_file = app_state.get_recorder_file(&session_id);
2214            if std::path::Path::new(&recorder_file).exists() {
2215                let file_size = std::fs::metadata(&recorder_file)
2216                    .map(|m| m.len())
2217                    .unwrap_or(0);
2218                vec![crate::callrecord::CallRecordMedia {
2219                    track_id: session_id.clone(),
2220                    path: recorder_file,
2221                    size: file_size,
2222                    extra: None,
2223                }]
2224            } else {
2225                vec![]
2226            }
2227        } else {
2228            vec![]
2229        };
2230
2231        let dump_event_file = app_state.get_dump_events_file(&session_id);
2232        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2233            Some(dump_event_file)
2234        } else {
2235            None
2236        };
2237
2238        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2239            if let Ok(rc) = rc.try_read() {
2240                Some(Box::new(rc.build_callrecord(
2241                    app_state.clone(),
2242                    rc.session_id.clone(),
2243                    ActiveCallType::B2bua,
2244                )))
2245            } else {
2246                None
2247            }
2248        });
2249
2250        let caller = option.caller.clone().unwrap_or_default();
2251        let callee = option.callee.clone().unwrap_or_default();
2252
2253        CallRecord {
2254            option: Some(option),
2255            call_id: session_id,
2256            call_type,
2257            start_time: self.start_time,
2258            ring_time: self.ring_time.clone(),
2259            answer_time: self.answer_time.clone(),
2260            end_time: Utc::now(),
2261            caller,
2262            callee,
2263            hangup_reason: self.hangup_reason.clone(),
2264            hangup_messages: Vec::new(),
2265            status_code: self.last_status_code,
2266            extras: self.extras.clone(),
2267            dump_event_file,
2268            recorder,
2269            refer_callrecord,
2270        }
2271    }
2272}